weight.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "go-common/app/job/main/aegis/model"
  9. "go-common/library/log"
  10. )
  11. // WeightManager weight manager
  12. type WeightManager struct {
  13. s *Service
  14. businessID, flowID int64
  15. toplen, batchlen int64
  16. minute int64
  17. // cache
  18. topweightList []*model.WeightItem
  19. // channel
  20. redisWeightList chan *model.WeightItem
  21. dbWeightList chan *model.WeightItem
  22. asignList chan *model.Task
  23. //dbstartSig, dbstopSig chan struct{}
  24. redisFinish chan struct{}
  25. //closeChan chan struct{}
  26. close bool
  27. }
  28. var _defaultopt = &model.WeightOPT{
  29. TopListLen: 1000,
  30. BatchListLen: 1000,
  31. RedisListLen: 10000,
  32. DbListLen: 2000,
  33. AssignLen: 100,
  34. Minute: 3,
  35. }
  36. // NewWeightManager new
  37. func NewWeightManager(s *Service, opt *model.WeightOPT, key string) (wm *WeightManager) {
  38. if opt == nil {
  39. opt = _defaultopt
  40. } else {
  41. if opt.TopListLen <= 0 {
  42. opt.TopListLen = _defaultopt.TopListLen
  43. }
  44. if opt.BatchListLen <= 0 {
  45. opt.BatchListLen = _defaultopt.BatchListLen
  46. }
  47. if opt.RedisListLen <= 0 {
  48. opt.RedisListLen = _defaultopt.RedisListLen
  49. }
  50. if opt.DbListLen <= 0 {
  51. opt.DbListLen = _defaultopt.DbListLen
  52. }
  53. if opt.AssignLen <= 0 {
  54. opt.AssignLen = _defaultopt.AssignLen
  55. }
  56. if opt.Minute <= 0 {
  57. opt.Minute = _defaultopt.Minute
  58. }
  59. }
  60. if len(key) > 0 {
  61. bizid, flowid := parseKey(key)
  62. opt.BusinessID = int64(bizid)
  63. opt.FlowID = int64(flowid)
  64. }
  65. wm = &WeightManager{
  66. s: s,
  67. businessID: opt.BusinessID,
  68. flowID: opt.FlowID,
  69. toplen: opt.TopListLen,
  70. batchlen: opt.BatchListLen,
  71. minute: opt.Minute,
  72. redisWeightList: make(chan *model.WeightItem, opt.RedisListLen),
  73. dbWeightList: make(chan *model.WeightItem, opt.DbListLen),
  74. asignList: make(chan *model.Task, opt.AssignLen),
  75. redisFinish: make(chan struct{}),
  76. }
  77. go wm.weightProc()
  78. go wm.weightWatcher()
  79. log.Info("启动权重计算器 bizid(%d) flowid(%d) opt(%+v)", wm.businessID, wm.flowID, opt)
  80. return
  81. }
  82. func parseKey(key string) (bizid, flowid int) {
  83. pos := strings.Index(key, "-")
  84. bizids := key[:pos]
  85. flowids := key[pos+1:]
  86. bizid, _ = strconv.Atoi(bizids)
  87. flowid, _ = strconv.Atoi(flowids)
  88. return
  89. }
  90. func (s *Service) startWeightManager() {
  91. // 1.当前的所有业务线,需要计算权重的先枚举出来
  92. s.wmHash = make(map[string]*WeightManager)
  93. for key := range s.newactiveBizFlow {
  94. bizid, _ := parseKey(key)
  95. s.wmHash[key] = NewWeightManager(s, s.getWeightOpt(bizid), key)
  96. }
  97. }
  98. func (w *WeightManager) weightProc() {
  99. for !w.close {
  100. if err := w.weightRedisProcess(); err != nil {
  101. w.weightDBProcess()
  102. }
  103. time.Sleep(time.Duration(w.minute) * time.Minute)
  104. }
  105. }
  106. func (w *WeightManager) weightWatcher() {
  107. for !w.close {
  108. select {
  109. case <-w.redisFinish: //取出权重最大的一批,更新到数据库
  110. log.Info("redisFinish(%d-%d:%d)", w.businessID, w.flowID, w.toplen)
  111. w.handleRedisFinish(context.Background())
  112. case wi := <-w.redisWeightList:
  113. w.handleRedisWeightList(context.Background(), wi)
  114. case wi := <-w.dbWeightList:
  115. w.handleDBWeightList(context.Background(), wi)
  116. case task := <-w.asignList:
  117. w.handleAssign(context.Background(), task)
  118. }
  119. }
  120. }
  121. func (w *WeightManager) weightRedisProcess() (err error) {
  122. var c = context.Background()
  123. if err = w.s.dao.CreateUnionSet(c, w.businessID, w.flowID); err != nil {
  124. return
  125. }
  126. var (
  127. start = int64(0)
  128. stop = w.batchlen
  129. )
  130. for {
  131. wis, err := w.s.dao.RangeUinonSet(c, w.businessID, w.flowID, start, stop)
  132. if err != nil {
  133. return err
  134. }
  135. log.Info("weightRedisProcess length(%d) start(%d) stop(%d)", len(wis), start, stop)
  136. start += w.batchlen
  137. stop += w.batchlen
  138. if len(wis) == 0 {
  139. break
  140. }
  141. for _, wi := range wis {
  142. if w.caculateWeight(c, wi) {
  143. log.Warn("weightRedisProcess 任务未找到 wi(%+v)", wi)
  144. continue
  145. }
  146. w.s.dao.SetWeight(c, w.businessID, w.flowID, wi.ID, wi.Weight)
  147. }
  148. time.Sleep(time.Second)
  149. }
  150. w.redisFinish <- struct{}{}
  151. w.s.dao.DeleteUinonSet(c, w.businessID, w.flowID)
  152. return nil
  153. }
  154. func (w *WeightManager) caculateWeight(c context.Context, wi *model.WeightItem) (skip bool) {
  155. task, err := w.s.dao.GetTask(c, wi.ID)
  156. if err != nil {
  157. return true
  158. }
  159. w.reAssign(c, task)
  160. wm := int64(time.Since(task.Ctime.Time()).Minutes())
  161. wl := &model.WeightLog{
  162. UPtime: time.Now().Format("2006-01-02 15:04:05"),
  163. Mid: task.MID,
  164. Fans: task.Fans,
  165. Group: task.Group,
  166. WaitTime: model.WaitTime(task.Ctime.Time()),
  167. }
  168. var wtRange, wtEqual int64
  169. wci, ewc := w.s.getWeightCache(c, task.BusinessID, task.FlowID)
  170. if wci != nil {
  171. wtRange = w.rangeCaculate(c, wci, task, wm, wl)
  172. }
  173. if ewc != nil {
  174. wtEqual = w.equalCaculate(c, ewc, task, wm, wl)
  175. }
  176. wi.Weight = wtRange + wtEqual
  177. wl.Weight = wi.Weight
  178. w.s.sendWeightLog(c, task, wl)
  179. return
  180. }
  181. func (w *WeightManager) rangeCaculate(c context.Context, wci map[string]*model.RangeWeightConfig, task *model.Task, wt int64, wl *model.WeightLog) (weight int64) {
  182. var wtWeight, fanWeight, groupWeight int64
  183. if cfg, ok := wci["waittime"]; ok {
  184. if wtlen := len(cfg.Range); wtlen > 0 { // 等待时长,要把之前等级的权重加上去
  185. for i := wtlen - 1; i >= 0; i-- {
  186. if wt >= cfg.Range[i].Threshold { // 命中配置
  187. wtWeight += cfg.Range[i].Weight * ((wt - cfg.Range[i].Threshold) / w.minute)
  188. // 计算0 到 (i-1) 累计权重
  189. for j := 0; j <= i-1; j++ {
  190. wtWeight += cfg.Range[j].Weight * ((cfg.Range[j+1].Threshold - cfg.Range[j].Threshold) / w.minute)
  191. }
  192. break
  193. }
  194. }
  195. }
  196. }
  197. if cfg, ok := wci["fans"]; ok {
  198. if fanLen := len(cfg.Range); fanLen > 0 {
  199. for i := fanLen - 1; i >= 0; i-- {
  200. if task.Fans >= cfg.Range[i].Threshold {
  201. fanWeight = cfg.Range[i].Weight * (wt / w.minute)
  202. break
  203. }
  204. }
  205. }
  206. }
  207. if cfg, ok := wci["group"]; ok {
  208. if len(cfg.Range) > 0 {
  209. for _, item := range cfg.Range {
  210. if strings.Contains(","+task.Group+",", fmt.Sprintf(",%d,", item.Threshold)) {
  211. groupWeight = item.Weight * (wt / w.minute)
  212. }
  213. }
  214. }
  215. }
  216. weight = wtWeight + fanWeight + groupWeight
  217. wl.WaitWeight = wtWeight
  218. wl.FansWeight = fanWeight
  219. wl.GroupWeight = groupWeight
  220. return
  221. }
  222. func (w *WeightManager) equalCaculate(c context.Context, ewc []*model.EqualWeightConfig, task *model.Task, wt int64, wl *model.WeightLog) (weight int64) {
  223. var midweight, taskweight int64
  224. for _, item := range ewc {
  225. if item.Name == "mid" {
  226. if strings.Contains(","+item.IDs+",", fmt.Sprintf(",%d,", task.MID)) {
  227. if item.Type == model.WeightTypeCycle {
  228. midweight += item.Weight * (wt / w.minute)
  229. } else {
  230. midweight += item.Weight
  231. }
  232. log.Info("equalCaculate task(%+v) hit (%+v)", task, item)
  233. wl.ConfigItems = append(wl.ConfigItems, &model.ConfigItem{
  234. Name: item.Name,
  235. Desc: item.Description,
  236. Uname: item.Uname,
  237. })
  238. }
  239. }
  240. if item.Name == "taskid" || item.Name == "task_id" {
  241. if strings.Contains(","+item.IDs+",", fmt.Sprintf(",%d,", task.ID)) {
  242. if item.Type == model.WeightTypeCycle {
  243. taskweight += item.Weight * (wt / w.minute)
  244. } else {
  245. taskweight += item.Weight
  246. }
  247. log.Info("equalCaculate task(%+v) hit (%+v)", task, item)
  248. wl.ConfigItems = append(wl.ConfigItems, &model.ConfigItem{
  249. Name: item.Name,
  250. Desc: item.Description,
  251. Uname: item.Uname,
  252. })
  253. }
  254. }
  255. }
  256. weight = midweight + taskweight
  257. wl.EqualWeight = weight
  258. return
  259. }
  260. func (w *WeightManager) reAssign(c context.Context, task *model.Task) {
  261. if task.UID == 0 {
  262. select {
  263. case w.asignList <- task:
  264. log.Info("指派判断 reAssign(%+v)", task)
  265. case <-time.NewTimer(10 * time.Millisecond).C:
  266. log.Warn("chan asignList full,len:%d", len(w.dbWeightList))
  267. }
  268. }
  269. }
  270. func (w *WeightManager) weightDBProcess() (err error) {
  271. // TODO 只用db更新权重的策略
  272. return nil
  273. }
  274. func (w *WeightManager) handleAssign(c context.Context, task *model.Task) (err error) {
  275. if w.s.setAssign(c, task) {
  276. if rows, err := w.s.dao.AssignTask(c, task); err == nil && rows == 1 {
  277. w.s.dao.SetTask(c, task)
  278. }
  279. }
  280. return
  281. }
  282. func (w *WeightManager) handleRedisWeightList(c context.Context, wi *model.WeightItem) (err error) {
  283. return w.s.dao.SetWeight(c, w.businessID, w.flowID, wi.ID, wi.Weight)
  284. }
  285. func (w *WeightManager) handleDBWeightList(c context.Context, wi *model.WeightItem) (rows int64, err error) {
  286. return w.s.dao.SetWeightDB(c, wi.ID, wi.Weight)
  287. }
  288. func (w *WeightManager) handleRedisFinish(c context.Context) (err error) {
  289. log.Info("handleRedisFinish")
  290. wis, err := w.s.dao.TopWeights(c, w.businessID, w.flowID, w.toplen)
  291. if err != nil {
  292. return
  293. }
  294. tempMap := make(map[int64]struct{})
  295. for _, wi := range wis {
  296. log.Info("handleRedisFinish:(%+v)", wi)
  297. w.addToDBList(wi)
  298. tempMap[wi.ID] = struct{}{}
  299. }
  300. for _, wi := range w.topweightList {
  301. if _, ok := tempMap[wi.ID]; !ok {
  302. weight, err := w.s.dao.GetWeight(c, w.businessID, w.flowID, wi.ID)
  303. if err != nil {
  304. continue
  305. }
  306. wi.Weight = weight
  307. w.addToDBList(wi)
  308. }
  309. }
  310. w.topweightList = wis
  311. log.Info("handleRedisFinish:topweightList(%d)", len(wis))
  312. return
  313. }
  314. func (w *WeightManager) addToDBList(wi *model.WeightItem) {
  315. select {
  316. case w.dbWeightList <- wi:
  317. log.Info("addToDBList (%+v)", wi)
  318. case <-time.NewTimer(10 * time.Millisecond).C:
  319. log.Warn("chan dbWeightList full,len:%d", len(w.dbWeightList))
  320. }
  321. }