notice.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/bbq/video/dao"
  5. "go-common/library/log"
  6. "go-common/library/sync/errgroup"
  7. "sync/atomic"
  8. "time"
  9. notice "go-common/app/service/bbq/notice-service/api/v1"
  10. )
  11. // 信号量,为了避免task执行超过周期,连续执行两个任务会出现问题
  12. var i32 int32
  13. // SysMsgTask 推送消息
  14. func (s *Service) SysMsgTask() {
  15. ctx := context.Background()
  16. newValue := atomic.AddInt32(&i32, 1)
  17. defer atomic.AddInt32(&i32, -1)
  18. if newValue > 1 {
  19. log.Errorv(ctx, log.KV("log", "sysMsgTask pending"))
  20. return
  21. }
  22. res, err := s.dao.RawCheckTask(ctx, "checkSysMsg")
  23. if err != nil {
  24. log.Errorv(ctx, log.KV("log", "get last sysMsgTask id fail"))
  25. return
  26. }
  27. lastSysMsgID := res.LastCheck
  28. curSysMsgID := res.LastCheck
  29. list, err := s.dao.GetNewSysMsg(ctx, curSysMsgID)
  30. if err != nil {
  31. log.Errorv(ctx, log.KV("log", "get new sysMsg fail"))
  32. return
  33. }
  34. if len(list) == 0 {
  35. log.Infov(ctx, log.KV("log", "no new sys msg to be sync to notice"))
  36. return
  37. }
  38. var mids []int64
  39. for _, item := range list {
  40. curSysMsgID = item.Id
  41. notice := notice.NoticeBase{
  42. Mid: item.Receiver,
  43. ActionMid: item.Sender,
  44. SvId: 0,
  45. NoticeType: 4,
  46. Text: item.Text,
  47. JumpUrl: item.JumpUrl,
  48. BizType: dao.NoticeBizTypeSysMsg,
  49. BizId: item.Id,
  50. }
  51. // 全量系统消息
  52. if item.Receiver == 0 {
  53. lastUserID := int64(0)
  54. if len(mids) == 0 {
  55. for {
  56. if userBases, err := s.dao.UsersByLast(ctx, lastUserID); err != nil {
  57. log.Errorv(ctx, log.KV("log", "sys msg task: get user base fail"))
  58. break
  59. } else {
  60. for _, userBase := range userBases {
  61. mids = append(mids, userBase.MID)
  62. }
  63. if len(userBases) > 0 {
  64. lastUserID = userBases[len(userBases)-1].ID
  65. } else {
  66. break
  67. }
  68. }
  69. }
  70. }
  71. midChan := make(chan int64, 20)
  72. go func() {
  73. for _, mid := range mids {
  74. midChan <- mid
  75. }
  76. close(midChan)
  77. }()
  78. startTime := time.Now()
  79. g := errgroup.Group{}
  80. for i := 0; i < 10; i++ {
  81. g.Go(func() error {
  82. subNotice := notice
  83. for mid := range midChan {
  84. subNotice.Mid = mid
  85. s.dao.CreateNotice(ctx, &subNotice)
  86. }
  87. return nil
  88. })
  89. }
  90. g.Wait()
  91. log.Info("total sys msg notice push: cost_time=%f, mid_len=%d", time.Since(startTime).Seconds(), len(mids))
  92. } else {
  93. s.dao.CreateNotice(ctx, &notice)
  94. }
  95. }
  96. if _, err := s.dao.UpdateTaskLastCheck(ctx, "checkSysMsg", curSysMsgID); err != nil {
  97. log.Errorv(ctx, log.KV("log", "update check_task mysql fail"))
  98. return
  99. }
  100. log.Infov(ctx, log.KV("log", "no new sys msg to be sync to notice"), log.KV("last_sys_id", lastSysMsgID), log.KV("cur_sys_id", curSysMsgID))
  101. }