task_dispatch.go 12 KB


  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "time"
  6. "go-common/app/admin/main/aegis/model/common"
  7. taskmod "go-common/app/admin/main/aegis/model/task"
  8. "go-common/library/cache/redis"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. "github.com/jinzhu/gorm"
  12. )
  13. //ERROR
  14. var (
  15. ErrTaskMiss = errors.New("task miss")
  16. )
  17. // NextTask 下一个任务
  18. func (s *Service) NextTask(c context.Context, opt *taskmod.NextOptions) (tasks []*taskmod.Task, count int64, err error) {
  19. log.Info("task-NextTask opt(%+v)", opt)
  20. if count, err = s.countPersonalTask(c, &opt.BaseOptions, opt.NoCache); err != nil {
  21. return
  22. }
  23. if count < opt.DispatchCount {
  24. if count, err = s.syncSeize(c, opt); err != nil {
  25. return
  26. }
  27. }
  28. /* 去掉异步抢占
  29. else if count < opt.SeizeCount {
  30. s.asyncSeize(opt)
  31. }
  32. */
  33. if count == 0 {
  34. return
  35. }
  36. return s.dispatch(c, opt)
  37. }
  38. // ListTasks 实时列表,停滞列表,延迟列表
  39. func (s *Service) ListTasks(c context.Context, opt *taskmod.ListOptions) (tasks []*taskmod.Task, count int64, err error) {
  40. switch opt.State {
  41. case 1: // 实时任务,从redis取出,在数据库校验
  42. tasks, count, err = s.listUnseized(c, opt)
  43. case 2: // 停滞任务,组员的直接从redis取。组长的从数据库取id,redis取任务
  44. tasks, count, err = s.listMyTasks(c, "seized", opt)
  45. case 3: // 延迟任务,组员的直接从redis取。组长的从数据库取id,redis取任务
  46. tasks, count, err = s.listMyTasks(c, "delayd", opt)
  47. case 4: // 指派停滞任务,从数据库取id,redis取任务
  48. tasks, count, err = s.listMyTasks(c, "assignd", opt)
  49. default: // 所有未完成任务
  50. }
  51. if err != nil {
  52. tasks, count, err = s.mysql.ListTasks(c, opt)
  53. }
  54. opt.Total = int(count)
  55. return
  56. }
  57. // Task 直接读取某个任务
  58. func (s *Service) Task(c context.Context, tid int64) (task *taskmod.Task, err error) {
  59. return s.mysql.TaskFromDB(c, tid)
  60. }
  61. // TxSubmitTask 提交任务
  62. func (s *Service) TxSubmitTask(c context.Context, ormTx *gorm.DB, opt *common.BaseOptions, state int8) (ostate int8, otaskid, ouid int64, err error) {
  63. var (
  64. t *taskmod.Task
  65. rows int64
  66. )
  67. // 根据rid,flowid检索最新的未完成任务
  68. if t, err = s.gorm.TaskByRID(c, opt.RID, opt.FlowID); err != nil || t == nil || t.ID == 0 {
  69. log.Warn("TaskByRID(%d,%d) miss(%v)", opt.RID, opt.FlowID, err)
  70. t, err = s.gorm.TaskByRID(c, opt.RID, 0)
  71. }
  72. // TODO 先默认一个资源只在一个flow下分发,解决目前存在flow状态与task状态不同步
  73. if err != nil || t == nil || t.ID == 0 {
  74. log.Warn("s.gorm.TaskByRID(%d,%d) miss(%v)", opt.RID, 0, err)
  75. err = nil
  76. return
  77. }
  78. ostate = t.State
  79. ouid = t.UID
  80. otaskid = t.ID
  81. var utime uint64
  82. if !t.Gtime.Time().IsZero() {
  83. utime = uint64(time.Since(t.Gtime.Time()).Seconds())
  84. }
  85. subopt := &taskmod.SubmitOptions{
  86. BaseOptions: *opt,
  87. TaskID: t.ID,
  88. OldUID: t.UID,
  89. Utime: utime,
  90. OldState: t.State,
  91. }
  92. // 1. 改数据库
  93. if rows, err = s.gorm.TxSubmit(ormTx, subopt, state); err != nil {
  94. return
  95. }
  96. if rows != 1 {
  97. err = ecode.NothingFound
  98. log.Error("Submit (%v) error(%v)", opt, err)
  99. return
  100. }
  101. return
  102. }
  103. func (s *Service) submitTaskCache(c context.Context, opt *common.BaseOptions, ostate int8, taskid, ouid int64) {
  104. log.Info("SubmitTaskCache opt(%+v) ostate(%d) taskid(%d) ouid(%d)", opt, ostate, taskid, ouid)
  105. optc := &common.BaseOptions{
  106. BusinessID: opt.BusinessID,
  107. FlowID: opt.FlowID,
  108. UID: ouid,
  109. }
  110. if ostate == taskmod.TaskStateDelay {
  111. s.redis.RemoveDelayTask(c, optc, taskid)
  112. return
  113. }
  114. s.redis.RemovePersonalTask(c, optc, taskid)
  115. }
  116. // Delay 延迟任务
  117. func (s *Service) Delay(c context.Context, opt *taskmod.DelayOptions) (err error) {
  118. var (
  119. taskmod *taskmod.Task
  120. rows int64
  121. )
  122. if taskmod, err = s.mysql.TaskFromDB(c, opt.TaskID); err != nil || taskmod == nil {
  123. return
  124. }
  125. if !s.checkDelayOption(c, opt, taskmod) {
  126. log.Error("checkDelayOption error opt(%+v) taskmod(%+v)", opt, taskmod)
  127. return ecode.AegisTaskFinish
  128. }
  129. if rows, err = s.mysql.Delay(c, opt); err != nil {
  130. return
  131. }
  132. if rows != 1 {
  133. err = ecode.AegisTaskFinish
  134. log.Error("Submit (%v) error(%v)", opt, err)
  135. return
  136. }
  137. if err = s.redis.RemovePersonalTask(c, &opt.BaseOptions, opt.TaskID); err != nil {
  138. return
  139. }
  140. s.redis.PushDelayTask(c, &opt.BaseOptions, opt.TaskID)
  141. return
  142. }
  143. // Release 释放任务
  144. func (s *Service) Release(c context.Context, opt *common.BaseOptions, delay bool) (rows int64, err error) {
  145. if rows, err = s.mysql.Release(c, opt, delay); err != nil {
  146. return
  147. }
  148. //err = s.redis.Release(c, opt, delay)
  149. return
  150. }
  151. // MaxWeight 当前最高权重
  152. func (s *Service) MaxWeight(c context.Context, opt *common.BaseOptions) (max int64, err error) {
  153. return s.gorm.MaxWeight(c, opt.BusinessID, opt.FlowID)
  154. }
  155. // UnDoStat undo stat
  156. func (s *Service) UnDoStat(c context.Context, opt *common.BaseOptions) (stat *taskmod.UnDOStat, err error) {
  157. return s.gorm.UndoStat(c, opt.BusinessID, opt.FlowID, opt.UID)
  158. }
  159. // TaskStat task stat
  160. func (s *Service) TaskStat(c context.Context, opt *common.BaseOptions) (stat *taskmod.Stat, err error) {
  161. return s.gorm.TaskStat(c, opt.BusinessID, opt.FlowID, opt.UID)
  162. }
  163. func (s *Service) countPersonalTask(c context.Context, opt *common.BaseOptions, nocache bool) (count int64, err error) {
  164. log.Info("task-countPersonalTask opt(%+v) nocache(%v)", opt, nocache)
  165. defer func() { log.Info("task-countPersonalTask count(%d) err(%v)", count, err) }()
  166. if nocache {
  167. return s.mysql.CountPersonal(c, opt)
  168. }
  169. if count, err = s.redis.CountPersonalTask(c, opt); err != nil {
  170. // redis 挂了
  171. if count, err = s.mysql.CountPersonal(c, opt); err != nil {
  172. return
  173. }
  174. }
  175. return
  176. }
  177. func (s *Service) syncSeize(c context.Context, opt *taskmod.NextOptions) (count int64, err error) {
  178. return s.seize(c, opt)
  179. }
  180. func (s *Service) seize(c context.Context, opt *taskmod.NextOptions) (count int64, err error) {
  181. log.Info("task-seize opt(%+v)", opt)
  182. defer func() { log.Info("task-seize count(%d) err(%v)", count, err) }()
  183. var (
  184. hitids, missids []int64
  185. others map[int64]int64
  186. )
  187. // TODO: 抢占任务要根据用户是否在线,处理任务指派
  188. if opt.NoCache {
  189. hitids, err = s.mysql.QueryForSeize(c, opt.BusinessID, opt.FlowID, opt.UID, opt.SeizeCount)
  190. } else {
  191. hitids, missids, others, err = s.redis.SeizeTask(c, opt.BusinessID, opt.FlowID, opt.UID, opt.SeizeCount)
  192. if err != nil {
  193. hitids, err = s.mysql.QueryForSeize(c, opt.BusinessID, opt.FlowID, opt.UID, opt.SeizeCount)
  194. }
  195. }
  196. if err != nil {
  197. return
  198. }
  199. log.Info("seize uid(%d) hitids(%v), missids(%v), others(%v)", opt.UID, hitids, missids, others)
  200. if !opt.NoCache && len(missids) > 0 {
  201. log.Error("seize uid(%d) missids(%v)", opt.UID, missids)
  202. for _, id := range missids {
  203. if err = s.syncTask(c, id); err != nil {
  204. s.redis.RemovePublicTask(c, &opt.BaseOptions, id)
  205. }
  206. }
  207. }
  208. if len(hitids) > 0 {
  209. log.Info("seize uid(%d) hitids(%v)", opt.UID, hitids)
  210. mhits := make(map[int64]int64)
  211. for _, id := range hitids {
  212. mhits[id] = opt.UID
  213. }
  214. if count, err = s.mysql.Seize(c, mhits); err != nil || count == 0 {
  215. return
  216. }
  217. return
  218. }
  219. return
  220. }
  221. func (s *Service) dispatch(c context.Context, opt *taskmod.NextOptions) (tasks []*taskmod.Task, count int64, err error) {
  222. log.Info("task-dispatch opt(%+v)", opt)
  223. defer func() { log.Info("task-dispatch tasks(%+v) count(%d) err(%v)", tasks, count, err) }()
  224. listopt := &taskmod.ListOptions{
  225. BaseOptions: opt.BaseOptions,
  226. Pager: common.Pager{
  227. Pn: 1,
  228. Ps: int(opt.DispatchCount),
  229. }}
  230. tasks, count, err = s.calibur(c, listopt, s.redis.RangePersonalTask, s.mysql.DispatchByID, s.redis.RemovePersonalTask)
  231. if err != nil {
  232. tasks, count, err = s.mysql.DBDispatch(c, opt)
  233. }
  234. return
  235. }
  236. func (s *Service) syncTask(c context.Context, taskID int64) (err error) {
  237. var task *taskmod.Task
  238. if task, err = s.mysql.TaskFromDB(c, taskID); err != nil || task == nil {
  239. return ErrTaskMiss
  240. }
  241. var option = &common.BaseOptions{
  242. BusinessID: task.BusinessID,
  243. FlowID: task.FlowID,
  244. UID: task.UID,
  245. }
  246. s.redis.SetTask(c, task)
  247. switch task.State {
  248. case taskmod.TaskStateInit:
  249. s.redis.PushPublicTask(c, task)
  250. case taskmod.TaskStateDispatch:
  251. s.redis.RemovePublicTask(c, option, task.ID)
  252. s.redis.PushPersonalTask(c, option, task.ID)
  253. case taskmod.TaskStateDelay:
  254. s.redis.RemovePublicTask(c, option, task.ID)
  255. s.redis.PushDelayTask(c, option, task.ID)
  256. default:
  257. s.redis.RemovePublicTask(c, option, task.ID)
  258. }
  259. return
  260. }
  261. func (s *Service) listUnseized(c context.Context, opt *taskmod.ListOptions) (tasks []*taskmod.Task, count int64, err error) {
  262. return s.calibur(c, opt, s.redis.RangePublicTask, s.mysql.ListCheckUnSeized, s.redis.RemovePublicTask)
  263. }
  264. func (s *Service) listMyTasks(c context.Context, ltp string, opt *taskmod.ListOptions) (tasks []*taskmod.Task, count int64, err error) {
  265. if !opt.BisLeader {
  266. if ltp == "delayd" {
  267. return s.calibur(c, opt, s.redis.RangeDealyTask, s.mysql.ListCheckDelay, s.redis.RemoveDelayTask)
  268. }
  269. if ltp == "seized" {
  270. return s.calibur(c, opt, s.redis.RangePersonalTask, s.mysql.ListCheckSeized, s.redis.RemovePersonalTask)
  271. }
  272. }
  273. if opt.BisLeader {
  274. opt.UID = 0
  275. }
  276. var ids []int64
  277. switch ltp {
  278. case "delayd":
  279. ids, count, err = s.gorm.TaskListDelayd(c, opt)
  280. case "seized":
  281. ids, count, err = s.gorm.TaskListSeized(c, opt)
  282. case "assignd":
  283. ids, count, err = s.gorm.TaskListAssignd(c, opt)
  284. }
  285. if err != nil || len(ids) == 0 {
  286. return
  287. }
  288. if tasks, err = s.redis.GetTask(c, ids); err != nil {
  289. err = redis.ErrNil
  290. }
  291. return
  292. }
  293. func (s *Service) calibur(c context.Context, opt *taskmod.ListOptions, rfunc taskmod.RangeFunc, lfunc taskmod.ListFuncDB, remove taskmod.RemoveFunc) (taskmods []*taskmod.Task, count int64, err error) {
  294. var (
  295. hitids, missids []int64
  296. missmap map[int64]struct{}
  297. mtaskmods map[int64]*taskmod.Task
  298. )
  299. mtaskmods, count, hitids, missids, err = rfunc(c, opt)
  300. log.Info("calibur(%+v) rfunc count(%d) hitids(%v) missids(%v)", opt, count, hitids, missids)
  301. if err != nil {
  302. return
  303. }
  304. if len(missids) > 0 {
  305. for _, id := range missids {
  306. if err = s.syncTask(c, id); err != nil {
  307. log.Error("syncTask error(%v)", err)
  308. remove(c, &opt.BaseOptions, id)
  309. }
  310. }
  311. }
  312. if len(hitids) > 0 {
  313. if missmap, err = lfunc(c, mtaskmods, hitids, opt.UID); err != nil {
  314. log.Error("calibur lfunc error(%v)", err)
  315. return
  316. }
  317. if len(missmap) > 0 {
  318. log.Info("calibur personal任务移除%v", missmap)
  319. for id := range missmap {
  320. remove(c, &opt.BaseOptions, id)
  321. }
  322. }
  323. }
  324. for _, id := range hitids {
  325. if _, ok := missmap[id]; ok && opt.Action != "release" {
  326. delete(mtaskmods, id)
  327. } else {
  328. taskmods = append(taskmods, mtaskmods[id])
  329. }
  330. }
  331. return
  332. }
  333. /*
  334. func (s *Service) checkSubmitOption(c context.Context, opt *taskmod.SubmitOptions, task *taskmod.Task) bool {
  335. opt.OldState = task.State
  336. opt.OldUID = task.UID
  337. // 1. 组员只能处理自己的延迟任务
  338. if task.State == taskmod.TaskStateDelay {
  339. if opt.BisLeader {
  340. return true
  341. }
  342. if task.UID != opt.UID {
  343. return false
  344. }
  345. }
  346. if task.State == taskmod.TaskStateDispatch && opt.UID == task.UID {
  347. opt.Utime = uint64(time.Since(task.Gtime.Time()).Seconds())
  348. return true
  349. }
  350. return false
  351. }
  352. */
  353. func (s *Service) checkDelayOption(c context.Context, opt *taskmod.DelayOptions, task *taskmod.Task) bool {
  354. if task.State == taskmod.TaskStateDispatch && task.UID == opt.UID {
  355. return true
  356. }
  357. return false
  358. }
  359. func (s *Service) syncUpCache(c context.Context) (err error) {
  360. if s.Debug() == "local" {
  361. return
  362. }
  363. upGroup := make(map[int64]*common.Group)
  364. upgs, err := s.rpc.UpGroups(c)
  365. if err != nil || upgs == nil {
  366. return
  367. }
  368. for gid, upg := range upgs {
  369. if _, ok := upGroup[gid]; !ok {
  370. upGroup[gid] = &common.Group{
  371. ID: gid,
  372. Name: upg.Name,
  373. Note: upg.Note,
  374. Tag: upg.Tag,
  375. FontColor: upg.FontColor,
  376. BgColor: upg.BgColor,
  377. }
  378. log.Info("groupCache upg(%+v) upGroup(%+v)", upg, upGroup[gid])
  379. }
  380. }
  381. s.groupCache = upGroup
  382. log.Info("groupCache(%+v)", s.groupCache)
  383. return
  384. }