rule.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "go-common/library/conf/env"
  9. "go-common/library/net/metadata"
  10. "go-common/app/job/live/xroom-feed/internal/model"
  11. "go-common/library/log"
  12. "go-common/library/sync/errgroup"
  13. "go-common/library/xstr"
  14. )
  15. const (
  16. _whiteListType = 1
  17. _condType = 2
  18. _fileListType = 3
  19. _areaType = "area"
  20. _onlineType = "online"
  21. _incomeType = "income"
  22. _dmsType = "dms"
  23. _liveDaysType = "live_days"
  24. _hourRankType = "hour_rank"
  25. _anchorCateType = "anchor_cate"
  26. _roomStatusType = "room_status"
  27. _condAnd = "and"
  28. _condOr = "or"
  29. _confTypeString = "string"
  30. _confTypeRange = "range"
  31. _confTypeTop = "top"
  32. )
  33. // JobTTl ...
  34. type JobTTl struct {
  35. ttl int
  36. }
  37. func (s *Service) reloadConfFromDb() {
  38. t1 := time.NewTicker(time.Second * 5)
  39. defer t1.Stop()
  40. for {
  41. select {
  42. case <-t1.C:
  43. confList, err := s.dao.GetConfFromDb()
  44. if err != nil {
  45. log.Error("[reloadConfFromDb]getConfFromDb_error:%+v", err)
  46. continue
  47. }
  48. key, expire := s.getRecConfKey()
  49. listStr, err := json.Marshal(confList)
  50. if err == nil {
  51. s.dao.SetRecPoolCache(context.TODO(), key, string(listStr), expire)
  52. }
  53. s.ruleConf.Store(confList)
  54. }
  55. }
  56. }
  57. func (s *Service) loadConfFromDb() {
  58. confList, err := s.dao.GetConfFromDb()
  59. if err != nil {
  60. log.Error("[loadConfFromDb]getConfFromDb_error:%+v", err)
  61. return
  62. }
  63. s.ruleConf.Store(confList)
  64. }
  65. func (s *Service) parseCondConf(cond string) (condConf *model.RuleProtocol, err error) {
  66. condConf = new(model.RuleProtocol)
  67. err = json.Unmarshal([]byte(cond), &condConf)
  68. if err != nil {
  69. log.Error("[parseCondConf]Unmarshal err: %+v", err)
  70. return
  71. }
  72. return
  73. }
  74. func (s *Service) reloadRecList() {
  75. t1 := time.NewTicker(time.Second * 20)
  76. defer t1.Stop()
  77. for {
  78. select {
  79. case <-t1.C:
  80. s.genRecListJob()
  81. }
  82. }
  83. }
  84. // 对所有配置的规则取数据的主程序
  85. func (s *Service) genRecListJob() {
  86. log.Info("[genRecListJob]genRecListJob start")
  87. ttl := 20
  88. ttl, err := s.ac.Get("JobTtl").Int()
  89. if err != nil {
  90. log.Error("[genRecListJob]getJobTtlConfFromSven_error:%+v", err)
  91. }
  92. confList := s.ruleConf.Load()
  93. // assert
  94. res, ok := confList.([]*model.RecPoolConf)
  95. if !ok {
  96. log.Error("[genRecListJob]conf assert error! %+v", confList)
  97. return
  98. }
  99. if len(res) <= 0 {
  100. log.Warn("[genRecListJob]confList_empty")
  101. return
  102. }
  103. //可配置时间结束子任务 内部要有阻塞操作
  104. cCtx := metadata.NewContext(context.TODO(), metadata.MD{metadata.Color: env.Color})
  105. ctx, cancel := context.WithTimeout(cCtx, time.Duration(ttl)*time.Second)
  106. wg := errgroup.Group{}
  107. // 每条规则之间无关联,可以不互相cancel,只看超时ctx (可在内部对每个操作设置)
  108. // 规则获取loop
  109. for _, rule := range res {
  110. // 白名单规则
  111. if rule.ConfType == _whiteListType || rule.ConfType == _fileListType {
  112. ruleId := rule.Id
  113. wg.Go(func() error {
  114. listStr, err := s.dao.GetWhiteList(ctx, ruleId)
  115. if err != nil {
  116. log.Error("[genRecListJob]GetWhiteList_err:%+v", err)
  117. return nil
  118. }
  119. listStrArr := strings.Split(listStr, ",")
  120. listIntArr := make([]int64, 0)
  121. for _, roomIdStr := range listStrArr {
  122. roomIdInt, _ := strconv.ParseInt(roomIdStr, 10, 64)
  123. listIntArr = append(listIntArr, roomIdInt)
  124. }
  125. if len(listIntArr) > 0 {
  126. ids := s.setRecInfoCache(ctx, listIntArr)
  127. key, expire := s.getRecPoolKey(ruleId)
  128. s.dao.SetRecPoolCache(ctx, key, xstr.JoinInts(ids), expire)
  129. }
  130. return nil
  131. })
  132. }
  133. // 条件规则
  134. if rule.ConfType == _condType {
  135. ruleId := rule.Id
  136. ruleStr := rule.Rules
  137. wg.Go(func() error {
  138. condConf, err := s.parseCondConf(ruleStr)
  139. if err != nil {
  140. log.Error("[genRecListJob]parseCondConf_err:%+v", err)
  141. return nil
  142. }
  143. if len(condConf.Condition) == 0 {
  144. log.Error("[genRecListJob]parseCondConf_empty")
  145. return nil
  146. }
  147. roomIds := s.genCondConfRoomList(ctx, condConf.Condition, condConf.Cond, ruleId)
  148. if len(roomIds) > 0 {
  149. ids := s.setRecInfoCache(ctx, roomIds)
  150. key, expire := s.getRecPoolKey(ruleId)
  151. s.dao.SetRecPoolCache(ctx, key, xstr.JoinInts(ids), expire)
  152. }
  153. return nil
  154. })
  155. }
  156. }
  157. // 超时检测
  158. select {
  159. case <-ctx.Done():
  160. {
  161. log.Info("[genRecListJob]job time out: %d", ttl)
  162. cancel()
  163. }
  164. }
  165. err = wg.Wait()
  166. if err != nil {
  167. log.Error("[genRecListJob]rule loop wait err:%+v", err)
  168. }
  169. log.Info("[genRecListJob]genRecListJob done")
  170. }