123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- package service
- import (
- "context"
- "time"
- "go-common/app/job/main/vip/model"
- "go-common/library/log"
- "github.com/pkg/errors"
- )
- const (
- _fail = 2
- _handlering = 2
- _finish = 3
- _nomarl = 0
- _statusnomarl = 1
- )
- func (s *Service) pushDataJob() {
- log.Info("push data job start..................")
- if succeed := s.dao.AddTransferLock(context.TODO(), "lock:pushDatajob"); succeed {
- if err := s.pushData(context.TODO()); err != nil {
- log.Error("error(%+v)", err)
- }
- }
- log.Info("push data job end.....................")
- }
- func (s *Service) pushData(c context.Context) (err error) {
- var (
- res []*model.VipPushData
- pushDataMap = make(map[int64]*model.VipPushData)
- pushMidsMap = make(map[int64][]int64)
- maxID int
- size = s.c.Property.BatchSize
- vips []*model.VipUserInfo
- curDate time.Time
- rel *model.VipPushResq
- )
- now := time.Now()
- format := now.Format("2006-01-02")
- if curDate, err = time.ParseInLocation("2006-01-02", format, time.Local); err != nil {
- err = errors.WithStack(err)
- return
- }
- if res, err = s.dao.PushDatas(c, format); err != nil {
- err = errors.WithStack(err)
- return
- }
- if len(res) == 0 {
- log.Info("not need reduce push data.........")
- return
- }
- for _, v := range res {
- pushDataMap[v.ID] = v
- }
- if maxID, err = s.dao.SelMaxID(c); err != nil {
- err = errors.WithStack(err)
- return
- }
- page := maxID / size
- if maxID%size != 0 {
- page++
- }
- for i := 0; i < page; i++ {
- startID := i * size
- endID := (i + 1) * size
- if vips, err = s.dao.SelUserInfos(context.TODO(), startID, endID); err != nil {
- err = errors.WithStack(err)
- return
- }
- for _, v := range vips {
- for key, val := range pushDataMap {
- startDate := curDate.AddDate(0, 0, int(val.ExpiredDayStart))
- endDate := curDate.AddDate(0, 0, int(val.ExpiredDayEnd))
- if !(v.OverdueTime.Time().Before(startDate) || v.OverdueTime.Time().After(endDate)) && v.PayType == model.Normal && val.DisableType == _nomarl && val.Status != _fail {
- mids := pushMidsMap[key]
- mids = append(mids, v.Mid)
- pushMidsMap[key] = mids
- }
- }
- }
- }
- for key, val := range pushMidsMap {
- data := pushDataMap[key]
- var status int8
- progressStatus := data.ProgressStatus
- pushedCount := data.PushedCount
- if rel, err = s.dao.PushData(context.TODO(), val, data, format); err != nil {
- log.Error("push data error(%+v)", err)
- continue
- }
- if rel.Code != 0 {
- status = _fail
- } else {
- pushedCount++
- if pushedCount == data.PushTotalCount {
- progressStatus = _finish
- } else {
- progressStatus = _handlering
- }
- status = _statusnomarl
- }
- if err = s.dao.UpdatePushData(context.TODO(), status, progressStatus, pushedCount, rel.Code, rel.Data, data.ID); err != nil {
- err = errors.WithStack(err)
- return
- }
- }
- return
- }
|