123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- package service
- import (
- "bufio"
- "bytes"
- "context"
- "encoding/csv"
- "fmt"
- "io/ioutil"
- "os"
- "strconv"
- "strings"
- "time"
- "go-common/app/admin/main/coupon/model"
- "go-common/library/database/sql"
- "go-common/library/ecode"
- "go-common/library/log"
- "go-common/library/net/metadata"
- xtime "go-common/library/time"
- "go-common/library/xstr"
- "github.com/pkg/errors"
- )
- // ActivitySalaryCoupon activity salary coupon.
- func (s *Service) ActivitySalaryCoupon(c context.Context, req *model.ArgBatchSalaryCoupon) (err error) {
- var (
- ok bool
- r *model.CouponBatchInfo
- )
- if r, err = s.dao.BatchInfo(c, req.BranchToken); err != nil {
- return
- }
- if r == nil {
- return ecode.CouPonBatchNotExistErr
- }
- if r.State != model.BatchStateNormal {
- return ecode.CouPonHadBlockErr
- }
- if r.ExpireDay != -1 {
- return ecode.CouponTypeNotSupportErr
- }
- if ok = s.dao.AddGrantUniqueLock(c, req.BranchToken, _lockseconds); !ok {
- return ecode.CouponSalaryHadRunErr
- }
- // mids index
- s.runSalary(metadata.String(c, metadata.RemoteIP), r, req)
- return
- }
- // runSalary run batch salary.
- func (s *Service) runSalary(ip string, b *model.CouponBatchInfo, req *model.ArgBatchSalaryCoupon) (err error) {
- go func() {
- var (
- err error
- fails []int64
- tmp []int64
- mids = []int64{}
- total int64
- midmap map[int64][]int64
- c = context.Background()
- )
- defer func() {
- if x := recover(); x != nil {
- x = errors.WithStack(x.(error))
- log.Error("runtime error caught: %+v", x)
- }
- //Write fail mids
- if len(fails) > 0 {
- s.OutFile(c, []byte(xstr.JoinInts(fails)), req.BranchToken+"_failmids.csv")
- }
- }()
- log.Info("batch salary analysis slary file[req(%v)]", req)
- // analysis slary file
- if midmap, total, err = s.AnalysisFile(c, req.FileURL); err != nil {
- return
- }
- if total != req.Count {
- log.Error("[batch salary count err req(%v),filetotal(%d)]", req, total)
- s.dao.DelGrantUniqueLock(c, b.BatchToken)
- return
- }
- log.Info("batch salary start[count(%d),token(%s)]", total, req.BranchToken)
- var crrent int
- for _, allmids := range midmap {
- for i, v := range allmids {
- mids = append(mids, v)
- if len(mids)%req.SliceSize != 0 && i != len(allmids)-1 {
- continue
- }
- crrent = crrent + len(mids)
- log.Info("batch salary ing[mid(%d),i(%d),token(%s),crrent(%d)]", v, i, req.BranchToken, crrent)
- if tmp, err = s.batchSalary(c, mids, ip, b); err != nil && len(tmp) > 0 {
- fails = append(fails, tmp...)
- log.Error("batch salary err[mids(%v),i(%d),token(%s)] err(%v)", mids, i, req.BranchToken, err)
- }
- mids = []int64{}
- }
- }
- log.Info("batch salary end[count(%d),token(%s),faillen(%d)]", total, req.BranchToken, len(fails))
- }()
- return
- }
- // batchSalary batch salary.
- func (s *Service) batchSalary(c context.Context, mids []int64, ip string, b *model.CouponBatchInfo) (falls []int64, err error) {
- var (
- tx *sql.Tx
- aff int64
- )
- if tx, err = s.dao.BeginTran(c); err != nil {
- return
- }
- defer func() {
- if err != nil {
- falls = mids
- if err1 := tx.Rollback(); err1 != nil {
- log.Error("batch salary rollback: %+v", err1)
- }
- return
- }
- if err = tx.Commit(); err != nil {
- falls = mids
- log.Error("batch salary commit: %+v", err)
- tx.Rollback()
- return
- }
- //del cache
- s.cache.Do(c, func(c context.Context) {
- for _, v := range mids {
- if err = s.dao.DelCouponAllowancesKey(context.Background(), v, model.NotUsed); err != nil {
- log.Error("batch salary cache del(%d) err: %+v", v, err)
- }
- }
- })
- //send msg
- if s.c.Prop.SalaryMsgOpen {
- s.sendMsg(mids, ip, 1, true, _msgMeng)
- }
- time.Sleep(time.Duration(s.c.Prop.SalarySleepTime))
- }()
- cps := make([]*model.CouponAllowanceInfo, len(mids))
- for i, v := range mids {
- cps[i] = &model.CouponAllowanceInfo{
- CouponToken: s.tokeni(i),
- Mid: v,
- State: model.NotUsed,
- StartTime: b.StartTime,
- ExpireTime: b.ExpireTime,
- Origin: model.AllowanceSystemAdmin,
- CTime: xtime.Time(time.Now().Unix()),
- BatchToken: b.BatchToken,
- Amount: b.Amount,
- FullAmount: b.FullAmount,
- AppID: b.AppID,
- }
- }
- // mid should be same index.
- if aff, err = s.dao.BatchAddAllowanceCoupon(c, tx, cps); err != nil {
- return
- }
- if len(mids) != int(aff) {
- err = fmt.Errorf("batch salary midslen(%d) != aff(%d)", len(mids), aff)
- return
- }
- if _, err = s.dao.UpdateBatchInfo(c, tx, b.BatchToken, len(mids)); err != nil {
- return
- }
- return
- }
- // AnalysisFile analysis slary file.
- func (s *Service) AnalysisFile(c context.Context, fileURL string) (midmap map[int64][]int64, total int64, err error) {
- cntb, err := ioutil.ReadFile(fileURL)
- if err != nil {
- return
- }
- var (
- mid int64
- records [][]string
- )
- r := csv.NewReader(strings.NewReader(string(cntb)))
- records, err = r.ReadAll()
- if err != nil {
- err = errors.WithStack(err)
- return
- }
- midmap = make(map[int64][]int64, s.c.Prop.AllowanceTableCount)
- for i, v := range records {
- if len(v) <= 0 {
- continue
- }
- if mid, err = strconv.ParseInt(v[0], 10, 64); err != nil {
- err = errors.Wrapf(err, "read csv line err(%d,%v)", i, v)
- break
- }
- index := mid % s.c.Prop.AllowanceTableCount
- if midmap[index] == nil {
- midmap[index] = []int64{}
- }
- midmap[index] = append(midmap[index], mid)
- total++
- }
- return
- }
- // OutFile out file.
- func (s *Service) OutFile(c context.Context, bs []byte, url string) (err error) {
- var outFile *os.File
- outFile, err = os.Create(url)
- if err != nil {
- os.Exit(1)
- }
- defer outFile.Close()
- b := bufio.NewWriter(outFile)
- _, err = b.Write(bs)
- if err != nil {
- os.Exit(1)
- }
- err = b.Flush()
- if err != nil {
- os.Exit(1)
- }
- return
- }
- // get coupon tokeni
- func (s *Service) tokeni(i int) string {
- var b bytes.Buffer
- b.WriteString(fmt.Sprintf("%05d", i))
- b.WriteString(fmt.Sprintf("%02d", s.r.Int63n(99)))
- b.WriteString(fmt.Sprintf("%03d", time.Now().UnixNano()/1e6%1000))
- b.WriteString(time.Now().Format("20060102150405"))
- return b.String()
- }
- func (s *Service) sendMsg(mids []int64, ip string, times int, b bool, msgType string) (err error) {
- msg, ok := msgMap[msgType]
- if !ok {
- log.Warn("sendMsg not support msgType(%s)", msgType)
- return
- }
- for i := 1; i <= times; i++ {
- if err = s.dao.SendMessage(context.Background(), xstr.JoinInts(mids), msg["title"], msg["content"], ip); err != nil {
- if i != times {
- continue
- }
- if b {
- log.Error("batch salary msg send mids(%v) err: %+v", mids, err)
- }
- }
- break
- }
- return
- }
- const (
- _msgMeng = "meng"
- )
- var (
- msgMap = map[string]map[string]string{
- "vip": {
- "title": "大会员代金券到账通知",
- "content": "大会员代金券已到账,快到“我的代金券”看看吧!IOS端需要在网页使用。#{\"传送门→\"}{\"https://account.bilibili.com/account/big/voucher\"}",
- },
- "meng": {
- "title": "大会员代金券到账提醒",
- "content": "您的专属大会员代金券即将在10月10日0点到账,年费半价基础上再享折上折,请注意查收!代金券有效期1天,暂不支持苹果支付 #{\"点击查看详情\"}{\"https://www.bilibili.com/read/cv1291213\"}",
- },
- }
- )
|