push.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/vip/model"
  6. "go-common/library/log"
  7. "github.com/pkg/errors"
  8. )
  9. const (
  10. _fail = 2
  11. _handlering = 2
  12. _finish = 3
  13. _nomarl = 0
  14. _statusnomarl = 1
  15. )
  16. func (s *Service) pushDataJob() {
  17. log.Info("push data job start..................")
  18. if succeed := s.dao.AddTransferLock(context.TODO(), "lock:pushDatajob"); succeed {
  19. if err := s.pushData(context.TODO()); err != nil {
  20. log.Error("error(%+v)", err)
  21. }
  22. }
  23. log.Info("push data job end.....................")
  24. }
  25. func (s *Service) pushData(c context.Context) (err error) {
  26. var (
  27. res []*model.VipPushData
  28. pushDataMap = make(map[int64]*model.VipPushData)
  29. pushMidsMap = make(map[int64][]int64)
  30. maxID int
  31. size = s.c.Property.BatchSize
  32. vips []*model.VipUserInfo
  33. curDate time.Time
  34. rel *model.VipPushResq
  35. )
  36. now := time.Now()
  37. format := now.Format("2006-01-02")
  38. if curDate, err = time.ParseInLocation("2006-01-02", format, time.Local); err != nil {
  39. err = errors.WithStack(err)
  40. return
  41. }
  42. if res, err = s.dao.PushDatas(c, format); err != nil {
  43. err = errors.WithStack(err)
  44. return
  45. }
  46. if len(res) == 0 {
  47. log.Info("not need reduce push data.........")
  48. return
  49. }
  50. for _, v := range res {
  51. pushDataMap[v.ID] = v
  52. }
  53. if maxID, err = s.dao.SelMaxID(c); err != nil {
  54. err = errors.WithStack(err)
  55. return
  56. }
  57. page := maxID / size
  58. if maxID%size != 0 {
  59. page++
  60. }
  61. for i := 0; i < page; i++ {
  62. startID := i * size
  63. endID := (i + 1) * size
  64. if vips, err = s.dao.SelUserInfos(context.TODO(), startID, endID); err != nil {
  65. err = errors.WithStack(err)
  66. return
  67. }
  68. for _, v := range vips {
  69. for key, val := range pushDataMap {
  70. startDate := curDate.AddDate(0, 0, int(val.ExpiredDayStart))
  71. endDate := curDate.AddDate(0, 0, int(val.ExpiredDayEnd))
  72. if !(v.OverdueTime.Time().Before(startDate) || v.OverdueTime.Time().After(endDate)) && v.PayType == model.Normal && val.DisableType == _nomarl && val.Status != _fail {
  73. mids := pushMidsMap[key]
  74. mids = append(mids, v.Mid)
  75. pushMidsMap[key] = mids
  76. }
  77. }
  78. }
  79. }
  80. for key, val := range pushMidsMap {
  81. data := pushDataMap[key]
  82. var status int8
  83. progressStatus := data.ProgressStatus
  84. pushedCount := data.PushedCount
  85. if rel, err = s.dao.PushData(context.TODO(), val, data, format); err != nil {
  86. log.Error("push data error(%+v)", err)
  87. continue
  88. }
  89. if rel.Code != 0 {
  90. status = _fail
  91. } else {
  92. pushedCount++
  93. if pushedCount == data.PushTotalCount {
  94. progressStatus = _finish
  95. } else {
  96. progressStatus = _handlering
  97. }
  98. status = _statusnomarl
  99. }
  100. if err = s.dao.UpdatePushData(context.TODO(), status, progressStatus, pushedCount, rel.Code, rel.Data, data.ID); err != nil {
  101. err = errors.WithStack(err)
  102. return
  103. }
  104. }
  105. return
  106. }