cache.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "go-common/app/job/main/aegis/model"
  9. "go-common/library/log"
  10. "go-common/library/xstr"
  11. )
  12. func (s *Service) initCache() {
  13. s.newactiveBizFlow = make(map[string]struct{})
  14. s.syncConfigCache(context.Background())
  15. s.syncConsumerCache(context.Background())
  16. s.oldactiveBizFlow = s.newactiveBizFlow
  17. }
  18. func (s *Service) cacheProc() {
  19. for {
  20. s.syncTaskCache()
  21. time.Sleep(3 * time.Minute)
  22. s.syncConfigCache(context.Background())
  23. s.syncWeightWatch(context.Background())
  24. s.syncConsumerCache(context.Background())
  25. }
  26. }
  27. func (s *Service) syncTaskCache() {
  28. var (
  29. tasks []*model.Task
  30. lastid = int64(0)
  31. err error
  32. )
  33. // 1.停滞任务,10分钟未变化,检查是否遗漏
  34. lastid = 0
  35. for {
  36. mtime := time.Now().Add(-10 * time.Minute)
  37. if tasks, lastid, err = s.dao.QueryTask(context.Background(), model.TaskStateDispatch, mtime, lastid, 1000); err != nil || len(tasks) == 0 {
  38. break
  39. }
  40. for _, task := range tasks {
  41. log.Info("检测到遗漏 停滞任务(%+v)", task)
  42. s.dao.SetTask(context.Background(), task)
  43. s.dao.PushPersonalTask(context.Background(), task.BusinessID, task.FlowID, task.UID, task.ID)
  44. }
  45. time.Sleep(time.Second)
  46. }
  47. // 2.延迟任务,半小时未变化,检查是否遗漏
  48. lastid = 0
  49. for {
  50. mtime := time.Now().Add(-30 * time.Minute)
  51. if tasks, lastid, err = s.dao.QueryTask(context.Background(), model.TaskStateDelay, mtime, lastid, 1000); err != nil || len(tasks) == 0 {
  52. break
  53. }
  54. for _, task := range tasks {
  55. log.Info("检测到遗漏 延迟任务(%+v)", task)
  56. s.dao.SetTask(context.Background(), task)
  57. s.dao.PushDelayTask(context.Background(), task.BusinessID, task.FlowID, task.UID, task.ID)
  58. }
  59. time.Sleep(time.Second)
  60. }
  61. // 3.实时任务,1小时未变化,检查是否遗漏
  62. lastid = 0
  63. for {
  64. mtime := time.Now().Add(-60 * time.Minute)
  65. if tasks, lastid, err = s.dao.QueryTask(context.Background(), model.TaskStateInit, mtime, lastid, 1000); err != nil || len(tasks) == 0 {
  66. break
  67. }
  68. for _, task := range tasks {
  69. log.Info("检测到遗漏 实时任务(%+v)", task)
  70. s.dao.SetTask(context.Background(), task)
  71. }
  72. s.dao.PushPublicTask(context.Background(), tasks...)
  73. time.Sleep(time.Second)
  74. }
  75. }
  76. func (s *Service) syncConfigCache(c context.Context) (err error) {
  77. s.oldactiveBizFlow = s.newactiveBizFlow
  78. configs, err := s.dao.TaskActiveConfigs(c)
  79. if err != nil {
  80. return
  81. }
  82. rangeWCCache := make(map[int64]map[string]*model.RangeWeightConfig)
  83. equalWCCache := make(map[string][]*model.EqualWeightConfig)
  84. assignCache := make(map[string][]*model.AssignConfig)
  85. activeBizFlow := make(map[string]struct{})
  86. for _, item := range configs {
  87. key := fmt.Sprintf("%d-%d", item.BusinessID, item.FlowID)
  88. activeBizFlow[key] = struct{}{}
  89. switch item.ConfType {
  90. case model.TaskConfigAssign:
  91. assign := new(struct {
  92. Mids string `json:"mids"`
  93. Uids string `json:"uids"`
  94. })
  95. if err = json.Unmarshal([]byte(item.ConfJSON), assign); err != nil {
  96. log.Error("json.Unmarshal error(%v)", err)
  97. continue
  98. }
  99. ac := &model.AssignConfig{}
  100. if item.UID > 0 {
  101. ac.Admin = item.UID
  102. } else {
  103. ac.Admin = 399
  104. }
  105. assign.Mids = strings.TrimSpace(assign.Mids)
  106. assign.Uids = strings.TrimSpace(assign.Uids)
  107. if ac.Mids, err = xstr.SplitInts(assign.Mids); err != nil {
  108. log.Error("xstr.SplitInts error(%v)", err)
  109. continue
  110. }
  111. if ac.Uids, err = xstr.SplitInts(assign.Uids); err != nil {
  112. log.Error("xstr.SplitInts error(%v)", err)
  113. continue
  114. }
  115. if _, ok := assignCache[key]; ok {
  116. assignCache[key] = append(assignCache[key], ac)
  117. } else {
  118. assignCache[key] = []*model.AssignConfig{ac}
  119. }
  120. case model.TaskConfigRangeWeight:
  121. wcitem := &model.RangeWeightConfig{}
  122. if err = json.Unmarshal([]byte(item.ConfJSON), wcitem); err != nil {
  123. log.Error("json.Unmarshal error(%v)", err)
  124. continue
  125. }
  126. if _, ok := rangeWCCache[item.BusinessID]; ok {
  127. rangeWCCache[item.BusinessID][wcitem.Name] = wcitem
  128. } else {
  129. rangeWCCache[item.BusinessID] = map[string]*model.RangeWeightConfig{
  130. wcitem.Name: wcitem,
  131. }
  132. }
  133. case model.TaskConfigEqualWeight:
  134. ewcitem := &model.EqualWeightConfig{}
  135. if err = json.Unmarshal([]byte(item.ConfJSON), ewcitem); err != nil {
  136. log.Error("json.Unmarshal error(%v)", err)
  137. continue
  138. }
  139. ewcitem.Uname = item.Uname
  140. ewcitem.Description = item.Description
  141. ewcitem.IDs = strings.TrimSpace(ewcitem.IDs)
  142. if _, ok := equalWCCache[key]; ok {
  143. equalWCCache[key] = append(equalWCCache[key], ewcitem)
  144. } else {
  145. equalWCCache[key] = []*model.EqualWeightConfig{ewcitem}
  146. }
  147. }
  148. }
  149. s.rangeWeightCfg = rangeWCCache
  150. s.equalWeightCfg = equalWCCache
  151. s.assignConfig = assignCache
  152. s.newactiveBizFlow = activeBizFlow
  153. return
  154. }
  155. func (s *Service) syncWeightWatch(c context.Context) {
  156. for key := range s.oldactiveBizFlow {
  157. if _, ok := s.newactiveBizFlow[key]; !ok {
  158. if wm, ok := s.wmHash[key]; ok {
  159. wm.close = true
  160. log.Info("关闭权重计算器 bizid(%d) flowid(%d)", wm.businessID, wm.flowID)
  161. delete(s.wmHash, key)
  162. }
  163. }
  164. }
  165. for key := range s.newactiveBizFlow {
  166. if _, ok := s.oldactiveBizFlow[key]; !ok {
  167. bizid, _ := parseKey(key)
  168. s.wmHash[key] = NewWeightManager(s, s.getWeightOpt(bizid), key)
  169. }
  170. }
  171. }
  172. func (s *Service) getWeightOpt(bizid int) *model.WeightOPT {
  173. for _, item := range s.c.BizCfg.WeightOpt {
  174. if item.BusinessID == int64(bizid) {
  175. return item
  176. }
  177. }
  178. return nil
  179. }
  180. func (s *Service) syncConsumerCache(c context.Context) (err error) {
  181. s.ccMux.Lock()
  182. defer s.ccMux.Unlock()
  183. consumerCache, err := s.dao.TaskActiveConsumer(c)
  184. if err != nil {
  185. return
  186. }
  187. s.consumerCache = consumerCache
  188. return
  189. }
  190. // getWeightCache .
  191. func (s *Service) getWeightCache(c context.Context, businessid, flowid int64) (rwc map[string]*model.RangeWeightConfig, ewc []*model.EqualWeightConfig) {
  192. key := fmt.Sprintf("%d-%d", businessid, flowid)
  193. rwc = s.rangeWeightCfg[businessid]
  194. ewc = s.equalWeightCfg[key]
  195. return
  196. }