service.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "time"
  7. "go-common/app/job/main/coupon/conf"
  8. "go-common/app/job/main/coupon/dao"
  9. "go-common/app/job/main/coupon/model"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. "github.com/robfig/cron"
  13. )
  14. const (
  15. _couponTable = "coupon_info_"
  16. _orderTable = "coupon_order"
  17. _couponAllowanceTable = "coupon_allowance_info"
  18. _updateAct = "update"
  19. _insertAct = "insert"
  20. )
  21. // Service struct
  22. type Service struct {
  23. c *conf.Config
  24. dao *dao.Dao
  25. couponDatabus *databus.Databus
  26. waiter sync.WaitGroup
  27. notifyChan chan *model.NotifyParam
  28. close bool
  29. }
  30. // New init
  31. func New(c *conf.Config) (s *Service) {
  32. s = &Service{
  33. c: c,
  34. dao: dao.New(c),
  35. notifyChan: make(chan *model.NotifyParam, 10240),
  36. }
  37. if c.DataBus.CouponBinlog != nil {
  38. s.couponDatabus = databus.New(c.DataBus.CouponBinlog)
  39. s.waiter.Add(1)
  40. go s.couponbinlogproc()
  41. }
  42. go s.notifyproc()
  43. t := cron.New()
  44. t.AddFunc(s.c.Properties.CheckInUseCouponCron, s.CheckInUseCoupon)
  45. t.AddFunc(s.c.Properties.CheckInUseCouponCartoonCron, s.CheckOrderInPayCoupon)
  46. t.Start()
  47. return s
  48. }
  49. // Ping Service
  50. func (s *Service) Ping(c context.Context) (err error) {
  51. return s.dao.Ping(c)
  52. }
  53. // Close Service
  54. func (s *Service) Close() {
  55. s.close = true
  56. s.couponDatabus.Close()
  57. s.dao.Close()
  58. s.waiter.Wait()
  59. }
  60. func (s *Service) couponbinlogproc() {
  61. defer s.waiter.Done()
  62. var (
  63. err error
  64. msg *databus.Message
  65. msgChan = s.couponDatabus.Messages()
  66. ok bool
  67. c = context.Background()
  68. )
  69. for {
  70. msg, ok = <-msgChan
  71. if !ok || s.close {
  72. log.Info("couponbinlogproc closed")
  73. return
  74. }
  75. if err = msg.Commit(); err != nil {
  76. log.Error("msg.Commit err(%+v)", err)
  77. }
  78. v := &model.MsgCanal{}
  79. if err = json.Unmarshal([]byte(msg.Value), v); err != nil {
  80. log.Error("json.Unmarshal(%v) err(%v)", v, err)
  81. continue
  82. }
  83. log.Info("couponbinlogproc log(%+v)", v)
  84. if err = s.Notify(c, v); err != nil {
  85. log.Error("s.Notify(%v) err(%v)", v, err)
  86. }
  87. }
  88. }
  89. func (s *Service) notifyproc() {
  90. var (
  91. msg *model.NotifyParam
  92. ticker = time.NewTicker(time.Duration(s.c.Properties.NotifyTimeInterval))
  93. mergeMap = make(map[string]*model.NotifyParam)
  94. maxMergeSize = 1000
  95. full bool
  96. ok bool
  97. err error
  98. )
  99. for {
  100. select {
  101. case msg, ok = <-s.notifyChan:
  102. if !ok {
  103. log.Info("notifyproc msgChan closed")
  104. return
  105. }
  106. if msg == nil {
  107. continue
  108. }
  109. if _, ok := mergeMap[msg.CouponToken]; !ok {
  110. mergeMap[msg.CouponToken] = msg
  111. }
  112. if len(mergeMap) < maxMergeSize {
  113. continue
  114. }
  115. full = true
  116. case <-ticker.C:
  117. }
  118. if len(mergeMap) > 0 {
  119. for _, v := range mergeMap {
  120. log.Info("retry notify coupon arg(%v)", v)
  121. if err = s.CheckCouponDeliver(context.TODO(), v); err != nil {
  122. log.Error("CheckCouponDeliver fail arg(%v) err(%v)", v, err)
  123. v.NotifyCount++
  124. if v.NotifyCount < s.c.Properties.MaxRetries {
  125. s.notifyChan <- v
  126. }
  127. }
  128. }
  129. mergeMap = make(map[string]*model.NotifyParam)
  130. }
  131. if full {
  132. time.Sleep(time.Duration(s.c.Properties.NotifyTimeInterval))
  133. }
  134. }
  135. }