task_dispatch.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/admin/main/videoup-task/model"
  7. "go-common/library/database/sql"
  8. "go-common/library/log"
  9. "go-common/library/xstr"
  10. )
  11. // List 查看任务列表
  12. func (s *Service) List(c context.Context, uid int64, pn, ps int, ltype, leader int8) (tasks []*model.Task, err error) {
  13. return s.dao.ListByCondition(c, uid, pn, ps, ltype, leader)
  14. }
  15. // Delay 申请延迟
  16. func (s *Service) Delay(c context.Context, id, uid int64, reason string) (err error) {
  17. tx, err := s.dao.BeginTran(c)
  18. if err != nil {
  19. log.Error("s.dao.BeginTran() error(%v)", err)
  20. return
  21. }
  22. rows, err := s.dao.TxUpTaskByID(tx, id, map[string]interface{}{"state": model.TypeDelay, "dtime": time.Now()})
  23. if err != nil {
  24. log.Error("s.dao.TxUpTaskByID(%d) error(%v)", id, err)
  25. tx.Rollback()
  26. return
  27. }
  28. if rows > 0 {
  29. if _, err = s.dao.TxAddTaskHis(tx, 0, model.ActionDelay /*action*/, id /*task_id*/, 0, uid /*uid*/, 0, 0, reason /*reason*/); err != nil {
  30. log.Error("s.dao.AddTaskLog(%d) error(%v)", id, err)
  31. tx.Rollback()
  32. return
  33. }
  34. }
  35. return tx.Commit()
  36. }
  37. // TaskSubmit 提交审核结果
  38. func (s *Service) TaskSubmit(c context.Context, t *model.Task, uid int64, status int16) (err error) {
  39. var utime int64
  40. switch {
  41. case t.State == model.TypeDelay || t.State == model.TypeReview: //延迟任务,复审任务不记录utime
  42. utime = 0
  43. case t.GTime.TimeValue().IsZero():
  44. utime = int64(time.Since(t.MTime.TimeValue()))
  45. default:
  46. utime = int64(time.Since(t.GTime.TimeValue()))
  47. }
  48. tx, err := s.dao.BeginTran(c)
  49. if err != nil {
  50. log.Error("s.dao.BeginTran error(%v)", err)
  51. return
  52. }
  53. rows, err := s.dao.TxUpTaskByID(tx, t.ID, map[string]interface{}{"state": model.TypeFinished, "utime": utime})
  54. if err != nil {
  55. log.Error("s.dao.TxUpTaskByID(%d) error(%v)", t.ID, err)
  56. tx.Rollback()
  57. return
  58. }
  59. if rows > 0 {
  60. if _, err = s.dao.TxAddTaskHis(tx, 0, model.ActionSubmit /*action*/, t.ID /*task_id*/, t.Cid /*cid*/, uid /*uid*/, utime /*utime*/, status /*result*/, "TaskSubmit" /*reason*/); err != nil {
  61. log.Error("s.dao.AddTaskLog(%d) error(%v)", t.ID, err)
  62. tx.Rollback()
  63. return
  64. }
  65. }
  66. return tx.Commit()
  67. }
  68. // Next 领取任务
  69. func (s *Service) Next(c context.Context, uid int64) (task *model.Task, err error) {
  70. var rows int64
  71. task, err = s.dao.GetNextTask(c, uid)
  72. if err != nil {
  73. log.Error("d.getTask(%d) error(%v)", uid, err)
  74. return
  75. }
  76. if task != nil {
  77. return
  78. }
  79. // 释放超时任务
  80. s.Free(c, 0)
  81. // 从实时任务池抢占
  82. if rows, err = s.dispatchTask(c, uid); err != nil {
  83. return
  84. } else if rows > 0 {
  85. return s.dao.GetNextTask(c, uid)
  86. }
  87. return
  88. }
  89. // Info 查询任务信息
  90. func (s *Service) Info(c context.Context, tid int64) (task *model.Task, err error) {
  91. return s.dao.TaskByID(c, tid)
  92. }
  93. // 抢占任务(先抢占再查,避免重复下发)
  94. func (s *Service) dispatchTask(c context.Context, uid int64) (rows int64, err error) {
  95. var (
  96. tls []*model.TaskForLog
  97. arrid []int64
  98. )
  99. if tls, err = s.dao.GetDispatchTask(c, uid); err != nil {
  100. log.Error("s.dao.GetDispatchTask(%d) error(%v)", uid, err)
  101. return
  102. }
  103. for _, item := range tls {
  104. arrid = append(arrid, item.ID)
  105. }
  106. if len(arrid) > 0 {
  107. if rows, err = s.dao.UpDispatchTask(c, uid, arrid); err != nil {
  108. log.Error("s.dao.UpDispatchTask(%d,%v,%v) error(%v)", uid, arrid, err)
  109. return
  110. }
  111. // 日志允许错误
  112. if int(rows) == len(arrid) {
  113. log.Info("UpDispatchTask 更新数量(%d)", rows)
  114. } else {
  115. log.Warn("UpDispatchTask 更新数量(%d) 日志数量(%d)", rows, len(arrid))
  116. }
  117. s.dao.MulAddTaskHis(c, tls, model.ActionDispatch, uid)
  118. }
  119. return
  120. }
  121. // Free 任务释放(有uid为主动释放,没有uid为被动释放)(先查再释放,有可能记录冗余释放信息)
  122. func (s *Service) Free(c context.Context, uid int64) (rows int64) {
  123. var (
  124. rts []*model.TaskForLog
  125. ids, rtids []int64
  126. lastid int64
  127. err error
  128. mtime = time.Now()
  129. )
  130. if uid == 0 {
  131. if rts, err = s.dao.GetTimeOutTask(c); err != nil {
  132. log.Error("s.Free s.dao.GetTimeOutTask error(%v)", err)
  133. return
  134. }
  135. } else {
  136. if rts, lastid, err = s.dao.GetRelTask(c, uid); err != nil {
  137. log.Error("s.Free s.dao.GetRelTask(%d) error(%v)", uid, err)
  138. return
  139. }
  140. }
  141. mcases := make(map[int64]*model.WCItem)
  142. for _, rt := range rts {
  143. ids = append(ids, rt.ID)
  144. if rt.Subject == 1 { //指派任务回流
  145. rtids = append(rtids, rt.ID)
  146. mcases[rt.ID] = &model.WCItem{Radio: 4, Weight: model.WLVConf.SubRelease,
  147. Mtime: model.NewFormatTime(time.Now()), Desc: "指派回流权重"}
  148. }
  149. }
  150. if len(ids) > 0 {
  151. if rows, err = s.dao.MulReleaseMtime(c, ids, mtime); err != nil {
  152. log.Error("s.dao.MulReleaseMtime(%v, %v) error(%v)", ids, mtime, err)
  153. return
  154. }
  155. if rows > 0 {
  156. s.dao.MulAddTaskHis(c, rts, model.ActionRelease, uid)
  157. }
  158. }
  159. if lastid > 0 {
  160. s.dao.UpGtimeByID(c, lastid, "0000-00-00 00:00:00")
  161. timelogout := time.Now()
  162. log.Info("添加延时释放任务(%d %v)", lastid, timelogout)
  163. time.AfterFunc(5*time.Minute, func() {
  164. s.releaseSpecial(timelogout, lastid, uid)
  165. })
  166. }
  167. if len(rtids) > 0 {
  168. s.setWeightConf(c, xstr.JoinInts(rtids), mcases)
  169. }
  170. return
  171. }
  172. func (s *Service) releaseSpecial(tout time.Time, taskid, uid int64) {
  173. tx, err := s.dao.BeginTran(context.TODO())
  174. if err != nil {
  175. log.Error(" s.dao.BeginTran error(%v)", err)
  176. return
  177. }
  178. rows, err := s.dao.TxReleaseSpecial(tx, tout, 1, taskid, uid)
  179. if err != nil {
  180. log.Error("s.dao.TxReleaseSpecial error(%v)", err)
  181. tx.Rollback()
  182. return
  183. }
  184. if rows > 0 {
  185. log.Info("s.dao.TxReleaseSpecial 释放任务(%d)", taskid)
  186. if _, err = s.dao.TxAddTaskHis(tx, 0, model.ActionRelease, taskid, 0, uid, 0, 0, "登出延时释放"); err != nil {
  187. log.Error("s.dao.TxAddTaskHis error(%v)", err)
  188. tx.Rollback()
  189. return
  190. }
  191. }
  192. tx.Commit()
  193. }
  194. func (s *Service) judge(c context.Context, tid, aid, cid, uid int64) (err error) {
  195. var (
  196. rows int64
  197. tx *sql.Tx
  198. v *model.ArcVideo
  199. a *model.Archive
  200. )
  201. // 1.校验视频
  202. if v, err = s.dao.ArcVideoByCID(c, cid); err != nil {
  203. log.Error("s.dao.ArcVideoByCID(%d) error(%v)", cid, err)
  204. return
  205. }
  206. if v == nil || v.Status == model.VideoStatusDelete {
  207. err = fmt.Errorf("视频(cid=%d)被删除", cid)
  208. goto DELETE
  209. }
  210. // 2.校验稿件
  211. if a, err = s.dao.Archive(c, aid); err != nil {
  212. log.Error("s.dao.Archive(%d) error(%v)", aid, err)
  213. return
  214. }
  215. if a == nil || a.State == model.StateForbidUpDelete {
  216. err = fmt.Errorf("稿件(aid=%d)被删除", aid)
  217. }
  218. DELETE:
  219. if err != nil {
  220. if tx, err = s.dao.BeginTran(c); err != nil {
  221. log.Error("s.dao.BeginTran() error(%v)", err)
  222. return
  223. }
  224. if rows, err = s.dao.TxUpTaskByID(tx, tid, map[string]interface{}{"state": model.TypeFinished, "utime": 0}); err != nil {
  225. log.Error("s.dao.TxUpTaskByID(%d) error(%v)", tid, err)
  226. tx.Rollback()
  227. return
  228. }
  229. if rows > 0 {
  230. if _, err = s.dao.TxAddTaskHis(tx, 0 /*pool*/, model.ActionTaskDelete /*action*/, tid /*task_id*/, cid /*cid*/, uid /*uid*/, 0 /*utime*/, model.VideoStatusDelete /*result*/, "judge delete" /*reason*/); err != nil {
  231. log.Error("s.dao.AddTaskLog(%d) error(%v)", tid, err)
  232. tx.Rollback()
  233. return
  234. }
  235. }
  236. return tx.Commit()
  237. }
  238. return
  239. }
  240. // CheckOwner 检查任务状态修改权限
  241. func (s *Service) CheckOwner(c context.Context, tid, uid int64) (err error) {
  242. var role int8
  243. var rows int64
  244. task, err := s.dao.TaskByID(c, tid)
  245. if task == nil || err != nil {
  246. log.Error("s.dao.TaskByID(%d) error(%v)", tid, err)
  247. return
  248. }
  249. if err = s.judge(c, task.ID, task.Aid, task.Cid, uid); err != nil {
  250. log.Error("s.judge(%+v) error(%v)", task, err)
  251. return
  252. }
  253. if role, err = s.dao.GetUserRole(c, uid); err != nil || role == 0 {
  254. err = fmt.Errorf("非法用户(%d)", uid)
  255. return
  256. }
  257. if task.State == model.TypeDelay || task.State == model.TypeReview {
  258. return
  259. }
  260. if !s.CheckOnline(c, uid) {
  261. err = fmt.Errorf("请先签到(%d)", uid)
  262. return
  263. }
  264. if role == model.TaskLeader {
  265. return
  266. }
  267. if task.UID != uid {
  268. err = fmt.Errorf("没有权限处理该任务")
  269. return
  270. }
  271. // 普通用户处理超时了,将任务释放掉
  272. if task.State == model.TypeDispatched && time.Since(task.GTime.TimeValue()).Minutes() > 10.0 {
  273. var tx *sql.Tx
  274. if tx, err = s.dao.BeginTran(c); err != nil {
  275. log.Error("s.dao.BeginTran() error(%v)", err)
  276. return
  277. }
  278. if rows, err = s.dao.TxUpTaskByID(tx, tid, map[string]interface{}{"state": model.TypeRealTime, "uid": 0, "gtime": "0000-00-00 00:00:00"}); err != nil {
  279. log.Error("s.dao.TxUpTaskByID(%d) error(%v)", tid, err)
  280. tx.Rollback()
  281. return
  282. }
  283. if rows > 0 {
  284. if _, err = s.dao.TxAddTaskHis(tx, 0 /*pool*/, model.ActionRelease /*action*/, tid /*task_id*/, 0 /*cid*/, uid /*uid*/, 0 /*utime*/, 0 /*result*/, "timeout release" /*reason*/); err != nil {
  285. log.Error("s.dao.AddTaskLog(%d) error(%v)", tid, err)
  286. tx.Rollback()
  287. return
  288. }
  289. }
  290. return tx.Commit()
  291. }
  292. return
  293. }