service.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "go-common/app/job/main/push/conf"
  9. "go-common/app/job/main/push/dao"
  10. pushrpc "go-common/app/service/main/push/api/grpc/v1"
  11. pushmdl "go-common/app/service/main/push/model"
  12. "go-common/library/cache"
  13. "go-common/library/conf/env"
  14. "go-common/library/log"
  15. "go-common/library/queue/databus"
  16. )
  17. const (
  18. _max = 1024
  19. _retry = 3
  20. )
  21. // Service .
  22. type Service struct {
  23. c *conf.Config
  24. dao *dao.Dao
  25. waiter sync.WaitGroup
  26. addTaskWg sync.WaitGroup
  27. cache *cache.Cache
  28. pushRPC pushrpc.PushClient
  29. reportSub *databus.Databus // consumer for new reports
  30. callbackSub *databus.Databus // consumer for callback
  31. reportCh chan []*pushmdl.Report
  32. callbackCh chan []*pushmdl.Callback
  33. addTaskCh chan *pushmdl.Task
  34. reportCnt int64
  35. callbackCnt int64
  36. closedCnt int64
  37. closed bool
  38. }
  39. // New creates a Service instance.
  40. func New(c *conf.Config) (s *Service) {
  41. s = &Service{
  42. c: c,
  43. dao: dao.New(c),
  44. cache: cache.New(1, 102400),
  45. reportSub: databus.New(c.ReportSub),
  46. callbackSub: databus.New(c.CallbackSub),
  47. reportCh: make(chan []*pushmdl.Report, 1024),
  48. callbackCh: make(chan []*pushmdl.Callback, 1024),
  49. addTaskCh: make(chan *pushmdl.Task, 10240),
  50. }
  51. var err error
  52. if s.pushRPC, err = pushrpc.NewClient(c.PushRPC); err != nil {
  53. panic(err)
  54. }
  55. if env.DeployEnv == env.DeployEnvProd {
  56. go s.delInvalidReportsproc() // 主动删除无效token
  57. }
  58. for i := 0; i < s.c.Job.ReportShard; i++ {
  59. s.waiter.Add(1)
  60. go s.reportproc()
  61. }
  62. for i := 0; i < s.c.Job.CallbackShard; i++ {
  63. s.waiter.Add(1)
  64. go s.callbackproc()
  65. }
  66. if s.c.Job.PretreatTask {
  67. for i := 0; i < s.c.Job.PretreatmentTaskShard; i++ {
  68. s.waiter.Add(1)
  69. go s.pretreatTaskproc() // 预处理任务,将任务转化成按平台分的token任务
  70. }
  71. }
  72. s.addTaskWg.Add(1)
  73. go s.addTaskproc()
  74. s.waiter.Add(1)
  75. go s.consumeReport()
  76. s.waiter.Add(1)
  77. go s.consumeCallback()
  78. go s.checkConsumer()
  79. // 删除过期的数据
  80. go s.delCallbacksproc()
  81. go s.delTasksproc()
  82. // 定期更新token缓存
  83. go s.refreshTokensproc()
  84. // data platform
  85. s.waiter.Add(1)
  86. go s.dpQueryproc()
  87. s.waiter.Add(1)
  88. go s.dpFileproc()
  89. return
  90. }
  91. // consumeReport consumes report.
  92. func (s *Service) consumeReport() {
  93. defer s.waiter.Done()
  94. reports := make([]*pushmdl.Report, _max)
  95. ticker := time.NewTicker(time.Duration(s.c.Job.ReportTicker))
  96. for {
  97. select {
  98. case msg, ok := <-s.reportSub.Messages():
  99. if !ok {
  100. log.Info("databus: push-job report consumer exit!")
  101. if len(reports) > 0 {
  102. s.reportCh <- reports
  103. }
  104. if !atomic.CompareAndSwapInt64(&s.closedCnt, 0, 1) {
  105. close(s.reportCh)
  106. }
  107. return
  108. }
  109. s.reportCnt++
  110. msg.Commit()
  111. m := &pushmdl.Report{}
  112. if err := json.Unmarshal(msg.Value, m); err != nil {
  113. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  114. dao.PromError("service:解析计数databus消息")
  115. continue
  116. }
  117. log.Info("consumeReport key(%s) partition(%d) offset(%d) msg(%+v)", msg.Key, msg.Partition, msg.Offset, m)
  118. reports = append(reports, m)
  119. if len(reports) < _max {
  120. continue
  121. }
  122. case <-ticker.C:
  123. }
  124. if len(reports) > 0 {
  125. temp := make([]*pushmdl.Report, len(reports))
  126. copy(temp, reports)
  127. reports = []*pushmdl.Report{}
  128. s.reportCh <- temp
  129. }
  130. }
  131. }
  132. // checkConsumer checks consumer state.
  133. func (s *Service) checkConsumer() {
  134. if env.DeployEnv != env.DeployEnvProd {
  135. return
  136. }
  137. var c1, c2 int64
  138. for {
  139. time.Sleep(5 * time.Minute)
  140. if s.reportCnt-c1 == 0 {
  141. msg := "push-job report did not consume within 5 minute"
  142. s.dao.SendWechat(msg)
  143. log.Warn(msg)
  144. }
  145. c1 = s.reportCnt
  146. if s.callbackCnt-c2 == 0 {
  147. msg := "push-job callback did not consume within 5 minute"
  148. s.dao.SendWechat(msg)
  149. log.Warn(msg)
  150. }
  151. c2 = s.callbackCnt
  152. }
  153. }
  154. // Ping reports the heath of services.
  155. func (s *Service) Ping(c context.Context) (err error) {
  156. return s.dao.Ping(c)
  157. }
  158. // Close releases resources which owned by the Service instance.
  159. func (s *Service) Close() {
  160. s.closed = true
  161. s.reportSub.Close()
  162. s.callbackSub.Close()
  163. s.dao.Close()
  164. s.waiter.Wait()
  165. close(s.addTaskCh)
  166. s.addTaskWg.Wait()
  167. }