service.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/interface/main/push-archive/conf"
  7. "go-common/app/interface/main/push-archive/dao"
  8. "go-common/app/interface/main/push-archive/model"
  9. accrpc "go-common/app/service/main/account/rpc/client"
  10. pb "go-common/app/service/main/push/api/grpc/v1"
  11. pushrpc "go-common/app/service/main/push/api/grpc/v1"
  12. "go-common/library/cache"
  13. "go-common/library/conf/env"
  14. "go-common/library/log"
  15. bm "go-common/library/net/http/blademaster"
  16. "go-common/library/queue/databus"
  17. )
  18. // Service push service.
  19. type Service struct {
  20. c *conf.Config
  21. dao *dao.Dao
  22. cache *cache.Cache
  23. accRPC *accrpc.Service3
  24. wg sync.WaitGroup
  25. archiveSub *databus.Databus
  26. relationSub *databus.Databus
  27. userSettings map[int64]*model.Setting
  28. arcMo, relMo int64
  29. CloseCh chan bool
  30. ForbidTimes []conf.ForbidTime
  31. settingCh chan *pb.SetSettingRequest
  32. pushRPC pushrpc.PushClient
  33. }
  34. // New creates a push service instance.
  35. func New(c *conf.Config) *Service {
  36. s := &Service{
  37. c: c,
  38. dao: dao.New(c),
  39. cache: cache.New(1, 102400),
  40. accRPC: accrpc.New3(c.AccountRPC),
  41. archiveSub: databus.New(c.ArchiveSub),
  42. relationSub: databus.New(c.RelationSub),
  43. userSettings: make(map[int64]*model.Setting),
  44. CloseCh: make(chan bool),
  45. ForbidTimes: c.ArcPush.ForbidTimes,
  46. settingCh: make(chan *pb.SetSettingRequest, 3072),
  47. }
  48. var err error
  49. if s.pushRPC, err = pushrpc.NewClient(c.PushRPC); err != nil {
  50. panic(err)
  51. }
  52. s.mappingAbtest()
  53. go s.loadUserSettingsproc()
  54. time.Sleep(2 * time.Second) // consumeArchive will notice upper's fans, it depends on user's setting
  55. s.wg.Add(1)
  56. go s.loadUserSettingsproc()
  57. if c.Push.ProdSwitch {
  58. s.wg.Add(1)
  59. go s.consumeRelationproc()
  60. s.wg.Add(1)
  61. go s.consumeArchiveproc()
  62. go s.monitorConsume()
  63. }
  64. go s.clearStatisticsProc()
  65. s.wg.Add(1)
  66. go s.saveStatisticsProc()
  67. go s.setSettingProc()
  68. return s
  69. }
  70. // loadUserSettingsproc 若是用户没有设置推送开关,默认会被推送消息;因此,若是新设置load出错/无值,则不替换旧设置
  71. func (s *Service) loadUserSettingsproc() {
  72. defer s.wg.Done()
  73. var (
  74. ps int64 = 30000
  75. start int64
  76. end int64
  77. mxID int64
  78. err error
  79. res map[int64]*model.Setting
  80. )
  81. for {
  82. select {
  83. case _, ok := <-s.CloseCh:
  84. if !ok {
  85. log.Info("CloseCh is closed, close the loadUserSettingsproc")
  86. return
  87. }
  88. default:
  89. }
  90. start = 0
  91. end = 0
  92. err = nil
  93. res = make(map[int64]*model.Setting)
  94. mxID, err = s.dao.SettingsMaxID(context.TODO())
  95. if err != nil || mxID == 0 {
  96. time.Sleep(10 * time.Millisecond)
  97. continue
  98. }
  99. for {
  100. start = end
  101. end += ps
  102. err = s.dao.SettingsAll(context.TODO(), start, end, &res)
  103. if err != nil {
  104. break
  105. }
  106. if end >= mxID {
  107. break
  108. }
  109. }
  110. if err != nil || len(res) == 0 {
  111. time.Sleep(10 * time.Millisecond)
  112. continue
  113. }
  114. s.userSettings = res
  115. time.Sleep(time.Duration(s.c.Push.LoadSettingsInterval))
  116. }
  117. }
  118. func (s *Service) monitorConsume() {
  119. if env.DeployEnv != env.DeployEnvProd {
  120. return
  121. }
  122. var (
  123. arc int64 // archive result count
  124. rel int64 // relation count
  125. )
  126. for {
  127. time.Sleep(10 * time.Minute)
  128. if s.arcMo-arc == 0 {
  129. msg := "databus: push-archive archiveResult did not consume within ten minute"
  130. s.dao.WechatMessage(msg)
  131. log.Warn(msg)
  132. }
  133. arc = s.arcMo
  134. if s.relMo-rel == 0 {
  135. msg := "databus: push-archive relation did not consume within ten minute"
  136. s.dao.WechatMessage(msg)
  137. log.Warn(msg)
  138. }
  139. rel = s.relMo
  140. }
  141. }
  142. // Close closes service.
  143. func (s *Service) Close() {
  144. s.archiveSub.Close()
  145. s.relationSub.Close()
  146. close(s.CloseCh)
  147. s.wg.Wait()
  148. s.dao.Close()
  149. }
  150. // Ping checks service.
  151. func (s *Service) Ping(c *bm.Context) (err error) {
  152. err = s.dao.Ping(c)
  153. return
  154. }
  155. // getTodayTime 获取当日的某个时间点的时间
  156. func (s *Service) getTodayTime(tm string) (todayTime time.Time, err error) {
  157. now := time.Now()
  158. today := now.Format("2006-01-02")
  159. todayTime, err = time.ParseInLocation("2006-01-02 15:04:05", today+" "+tm, time.Local)
  160. if err != nil {
  161. log.Error("clearStatisticsProc time.ParseInLocation error(%v)", err)
  162. }
  163. return
  164. }