callback.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/push/dao"
  7. pb "go-common/app/service/main/push/api/grpc/v1"
  8. pushmdl "go-common/app/service/main/push/model"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _retryCallback = 5
  13. _delCallbackLimit = 5000
  14. )
  15. func (s *Service) callbackproc() {
  16. defer s.waiter.Done()
  17. var err error
  18. for {
  19. msg, ok := <-s.callbackCh
  20. if !ok {
  21. log.Warn("s.callbackproc() closed")
  22. return
  23. }
  24. for _, v := range msg {
  25. if v == nil {
  26. continue
  27. }
  28. arg := &pb.AddCallbackRequest{
  29. Task: v.Task,
  30. APP: v.APP,
  31. Platform: int32(v.Platform),
  32. Mid: v.Mid,
  33. Pid: int32(v.Pid),
  34. Token: v.Token,
  35. Buvid: v.Buvid,
  36. Click: int32(v.Click),
  37. }
  38. if v.Extra != nil {
  39. arg.Extra = &pb.CallbackExtra{Status: int32(v.Extra.Status), Channel: int32(v.Extra.Channel)}
  40. }
  41. for i := 0; i < _retryCallback; i++ {
  42. if _, err = s.pushRPC.AddCallback(context.Background(), arg); err == nil {
  43. break
  44. }
  45. time.Sleep(20 * time.Millisecond)
  46. }
  47. if err != nil {
  48. log.Error("s.pushRPC.AddCallback(%+v) error(%v)", arg, err)
  49. dao.PromError("report:新增callback")
  50. continue
  51. }
  52. log.Info("add callback success task(%s) token(%s)", v.Task, v.Token)
  53. time.Sleep(time.Millisecond)
  54. }
  55. }
  56. }
  57. // consumeCallback consumes callback.
  58. func (s *Service) consumeCallback() {
  59. defer s.waiter.Done()
  60. for {
  61. msg, ok := <-s.callbackSub.Messages()
  62. if !ok {
  63. log.Info("databus: push-job callback consumer exit!")
  64. close(s.callbackCh)
  65. return
  66. }
  67. s.callbackCnt++
  68. msg.Commit()
  69. var cbs []*pushmdl.Callback
  70. if err := json.Unmarshal(msg.Value, &cbs); err != nil {
  71. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  72. dao.PromError("service:解析databus中callback消息")
  73. continue
  74. }
  75. log.Info("consumeCallback key(%s) partition(%d) offset(%d) msg(%v)", msg.Key, msg.Partition, msg.Offset, string(msg.Value))
  76. s.callbackCh <- cbs
  77. }
  78. }
  79. func (s *Service) delCallbacksproc() {
  80. for {
  81. now := time.Now()
  82. // 每天4点时删除七天前的callback数据
  83. if now.Hour() == 4 {
  84. var (
  85. err error
  86. deleted int64
  87. b = now.Add(time.Duration(-s.c.Job.DelCallbackInterval*24) * time.Hour)
  88. loc, _ = time.LoadLocation("Local")
  89. t = time.Date(b.Year(), b.Month(), b.Day(), 23, 59, 59, 0, loc)
  90. )
  91. for {
  92. if deleted, err = s.dao.DelCallbacks(context.TODO(), t, _delCallbackLimit); err != nil {
  93. log.Error("s.delCallbacks(%v) error(%v)", t, err)
  94. s.dao.SendWechat("DB操作失败:push-job删除callback数据错误")
  95. time.Sleep(time.Second)
  96. continue
  97. }
  98. if deleted < _delCallbackLimit {
  99. break
  100. }
  101. time.Sleep(time.Second)
  102. }
  103. log.Info("delCallbacksproc success date(%v)", t)
  104. time.Sleep(time.Hour)
  105. }
  106. time.Sleep(time.Minute)
  107. }
  108. }