databus.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math/rand"
  7. "time"
  8. "go-common/app/job/main/aegis/model"
  9. moniMdl "go-common/app/job/main/aegis/model/monitor"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. var (
  14. _taskTable = "task"
  15. )
  16. func (s *Service) taskconsumeproc() {
  17. defer func() {
  18. log.Warn("taskconsumeproc exited.")
  19. s.wg.Done()
  20. }()
  21. var (
  22. binLogMsgs = s.binLogDataBus.Messages()
  23. )
  24. for {
  25. select {
  26. case msg, ok := <-binLogMsgs:
  27. if !ok {
  28. log.Warn("binLogDataBus has been closed.")
  29. return
  30. }
  31. log.Info("binLogDataBus key(%s) offset(%d) message(%s)",
  32. msg.Key, msg.Offset, msg.Value)
  33. s.handleBinLog(msg)
  34. case rpi := <-s.chanReport:
  35. s.reportResource(context.Background(), rpi.BizID, rpi.FlowID, rpi.RID, rpi.UID)
  36. default:
  37. time.Sleep(time.Second)
  38. }
  39. }
  40. }
  41. func (s *Service) archiveConsumeProc() {
  42. defer func() {
  43. log.Warn("archiveConsumeProc exited.")
  44. s.wg.Done()
  45. }()
  46. var (
  47. msgs = s.archiveDataBus.Messages()
  48. )
  49. for {
  50. var (
  51. msg *databus.Message
  52. ok bool
  53. err error
  54. )
  55. if msg, ok = <-msgs; !ok {
  56. log.Error("s.archiveDataBus.Messages() closed.")
  57. return
  58. }
  59. msg.Commit()
  60. m := &model.BinLog{}
  61. if err = json.Unmarshal(msg.Value, m); err != nil {
  62. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  63. continue
  64. }
  65. if m.Table == "archive" {
  66. s.handleArchiveBinlog(m)
  67. } else if m.Table == "archive_video" {
  68. s.handleVideoBinlog(m)
  69. }
  70. }
  71. }
  72. func (s *Service) handleArchiveBinlog(m *model.BinLog) {
  73. var (
  74. err error
  75. )
  76. na := &moniMdl.BinlogArchive{}
  77. oa := &moniMdl.BinlogArchive{}
  78. if err = json.Unmarshal(m.New, na); err != nil {
  79. log.Error("json.Unmarshal(%s,%+v) error(%v)", m.New, na, err)
  80. return
  81. }
  82. if err = json.Unmarshal(m.New, oa); err != nil {
  83. log.Error("json.Unmarshal(%s,%+v) error(%v)", m.New, oa, err)
  84. return
  85. }
  86. s.monitorArchive(m.Action, na, oa)
  87. }
  88. func (s *Service) handleVideoBinlog(m *model.BinLog) {
  89. var (
  90. err error
  91. )
  92. nv := &moniMdl.BinlogVideo{}
  93. ov := &moniMdl.BinlogVideo{}
  94. if err = json.Unmarshal(m.New, nv); err != nil {
  95. log.Error("json.Unmarshal(%s,%+v) error(%v)", m.New, nv, err)
  96. return
  97. }
  98. if err = json.Unmarshal(m.New, ov); err != nil {
  99. log.Error("json.Unmarshal(%s,%+v) error(%v)", m.New, ov, err)
  100. return
  101. }
  102. s.monitorVideo(m.Action, nv, ov)
  103. }
  104. func (s *Service) handleBinLog(msg *databus.Message) {
  105. defer msg.Commit()
  106. bmsg := new(model.BinLog)
  107. if err := json.Unmarshal(msg.Value, bmsg); err != nil {
  108. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  109. return
  110. }
  111. if bmsg.Table == _taskTable {
  112. old := new(model.Task)
  113. new := new(model.Task)
  114. if err := json.Unmarshal(bmsg.New, new); err != nil {
  115. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  116. return
  117. }
  118. if bmsg.Action == "update" {
  119. if err := json.Unmarshal(bmsg.Old, old); err != nil {
  120. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  121. return
  122. }
  123. }
  124. s.handleBinLogMsg(context.Background(), bmsg.Action, old, new)
  125. }
  126. // use specify goroutine to merge messages
  127. log.Info("handleBinlog table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset)
  128. }
  129. //各种状态简写
  130. const (
  131. INT = model.TaskStateInit
  132. DSP = model.TaskStateDispatch
  133. DEY = model.TaskStateDelay
  134. SUB = model.TaskStateSubmit
  135. RSB = model.TaskStateRscSb
  136. CLO = model.TaskStateClosed
  137. LTD = model.LogTypeTaskDispatch
  138. )
  139. func (s *Service) handleRelease(c context.Context, old, new *model.Task) {
  140. s.dao.RemovePersonalTask(c, old.BusinessID, old.FlowID, old.UID, old.ID)
  141. s.dao.PushPublicTask(c, new)
  142. s.sendTaskLog(c, new, LTD, "release", new.UID, "")
  143. s.dao.IncresByField(c, old.BusinessID, old.FlowID, old.UID, model.Release, 1)
  144. }
  145. func (s *Service) handleDisptach(c context.Context, old, new *model.Task) {
  146. //这里不做缓存同步,顺序会乱
  147. s.sendTaskLog(c, new, LTD, "dispatch", new.UID, "")
  148. s.dao.IncresByField(c, new.BusinessID, new.FlowID, new.UID, model.Dispatch, 1)
  149. }
  150. func (s *Service) handleDelay(c context.Context, old, new *model.Task) {
  151. //这里不做缓存同步,顺序会乱
  152. s.sendTaskLog(c, new, LTD, "delay", new.UID, "")
  153. s.dao.IncresByField(c, new.BusinessID, new.FlowID, new.UID, model.Delay, 1)
  154. }
  155. /*
  156. 数据统计时,容易产生误差的几种数据
  157. 1. 任务被a领取,被b在资源列表提交
  158. 2. 任务被a延迟,被b在资源列表 或者 延迟列表提交
  159. */
  160. func (s *Service) handleSubmit(c context.Context, old, new *model.Task) {
  161. switch old.State {
  162. case INT: // 未分配直接提交,资源列表里操作
  163. s.dao.RemovePublicTask(c, old.BusinessID, old.FlowID, old.ID)
  164. case DSP: // 领取后提交,也可能是资源列表操作
  165. s.dao.RemovePersonalTask(c, old.BusinessID, old.FlowID, old.UID, old.ID)
  166. case DEY: // 延迟列表提交,也可能是资源列表操作
  167. s.dao.RemoveDelayTask(c, old.BusinessID, old.FlowID, old.UID, old.ID)
  168. default: // 其他未知情况
  169. log.Error("handleSubmit UNEXPECTED old(%+v) new(%v)", old, new)
  170. }
  171. switch new.State {
  172. case SUB:
  173. s.sendTaskLog(c, new, LTD, "tasksubmit", new.UID, "")
  174. case RSB:
  175. s.sendTaskLog(c, new, LTD, "rscsubmit", new.UID, "")
  176. case CLO:
  177. s.sendTaskLog(c, new, LTD, "close", new.UID, "")
  178. }
  179. s.reportSubmit(c, old, new)
  180. }
  181. func (s *Service) handleCreate(c context.Context, new *model.Task) {
  182. s.dao.PushPublicTask(c, new)
  183. s.sendTaskLog(c, new, LTD, "create", 399, "aegis-job")
  184. s.reportTaskCreate(c, new)
  185. }
  186. func (s *Service) handleBinLogMsg(c context.Context, act string, old, new *model.Task) {
  187. log.Info("handleTaskBinlog act(%s) old(%+v) new(%+v)", act, old, new)
  188. s.dao.SetTask(c, new)
  189. if act == "insert" {
  190. s.handleCreate(c, new)
  191. }
  192. if act == "update" {
  193. switch {
  194. case old.State != new.State: //状态变更
  195. switch new.State {
  196. case INT: // 初始
  197. switch old.State {
  198. case DSP: //释放
  199. s.handleRelease(c, old, new)
  200. default: //其他情况
  201. s.dao.PushPublicTask(c, new)
  202. log.Error("handleTaskBinlog UNEXPECTED INT old(%+v) new(%+v)", old, new)
  203. }
  204. case DSP: // 领取
  205. switch old.State {
  206. case INT:
  207. s.handleDisptach(c, old, new)
  208. default:
  209. log.Error("handleTaskBinlog UNEXPECTED DSP old(%+v) new(%+v)", old, new)
  210. }
  211. case DEY: // 延迟
  212. switch old.State {
  213. case DSP:
  214. s.handleDelay(c, old, new)
  215. default:
  216. log.Error("handleTaskBinlog UNEXPECTED DEY old(%+v) new(%+v)", old, new)
  217. }
  218. case SUB, RSB, CLO: // 提交,关闭
  219. s.handleSubmit(c, old, new)
  220. }
  221. case old.AdminID != new.AdminID: //指派变更
  222. default:
  223. log.Info("其他变更 old(%+v)->new(%+v)", old, new)
  224. }
  225. }
  226. }
  227. func (s *Service) setAssign(c context.Context, task *model.Task) bool {
  228. log.Info("指派判断 setAssign(%+v)", task)
  229. auids := s.hitAssignUids(c, task)
  230. log.Info("指派判断 hitAssignUids(%v)", auids)
  231. if len(auids) == 0 {
  232. return false
  233. }
  234. log.Info("task(%d) 命中指派配置 (%v)", task.ID, auids)
  235. var huids []int64
  236. for auid, uids := range auids {
  237. task.AdminID = auid
  238. huids = s.hitActiveUids(c, task, uids)
  239. length := len(huids)
  240. if length != 0 {
  241. break
  242. }
  243. }
  244. log.Info("task(%d) 指派在线 (%v)", task.ID, huids)
  245. length := len(huids)
  246. if length == 0 {
  247. return false
  248. }
  249. if length == 1 {
  250. task.UID = huids[0]
  251. } else {
  252. // 随机数选一个
  253. task.UID = huids[rand.Intn(length)]
  254. }
  255. log.Info("task(%d) admin(%d) 指派成功 (%d)", task.ID, task.AdminID, task.UID)
  256. return true
  257. }
  258. func (s *Service) hitAssignUids(c context.Context, task *model.Task) (uids map[int64][]int64) {
  259. key := fmt.Sprintf("%d-%d", task.BusinessID, task.FlowID)
  260. uids = make(map[int64][]int64)
  261. if assignC, ok := s.assignConfig[key]; ok {
  262. for _, item := range assignC {
  263. log.Info("指派判断 task(%+v) item(%+v)", task, item)
  264. for _, mid := range item.Mids {
  265. if mid == task.MID {
  266. if aus, ok := uids[item.Admin]; ok {
  267. uids[item.Admin] = append(aus, item.Uids...)
  268. } else {
  269. uids[item.Admin] = item.Uids
  270. }
  271. }
  272. }
  273. }
  274. }
  275. return
  276. }
  277. func (s *Service) hitActiveUids(c context.Context, task *model.Task, uids []int64) (hitid []int64) {
  278. s.ccMux.RLock()
  279. defer s.ccMux.RUnlock()
  280. key := fmt.Sprintf("%d-%d", task.BusinessID, task.FlowID)
  281. if uidCache, ok := s.consumerCache[key]; ok {
  282. for _, uid := range uids {
  283. if _, ok := uidCache[uid]; ok {
  284. if on, _ := s.dao.IsConsumerOn(c, int(task.BusinessID), int(task.FlowID), uid); on {
  285. hitid = append(hitid, uid)
  286. }
  287. }
  288. }
  289. }
  290. return
  291. }