task_dispatch.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/admin/main/videoup/model/archive"
  7. "go-common/app/admin/main/videoup/model/manager"
  8. "go-common/app/admin/main/videoup/model/utils"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "go-common/library/xstr"
  12. )
  13. // List 查看任务列表
  14. func (s *Service) List(c context.Context, uid int64, pn, ps int, ltype, leader int8) (tasks []*archive.Task, err error) {
  15. return s.arc.ListByCondition(c, uid, pn, ps, ltype, leader)
  16. }
  17. // Delay 申请延迟
  18. func (s *Service) Delay(c context.Context, id, uid int64, reason string) (err error) {
  19. tx, err := s.arc.BeginTran(c)
  20. if err != nil {
  21. log.Error("s.arc.BeginTran() error(%v)", err)
  22. return
  23. }
  24. rows, err := s.arc.TxUpTaskByID(tx, id, map[string]interface{}{"state": archive.TypeDelay, "dtime": time.Now()})
  25. if err != nil {
  26. log.Error("s.arc.TxUpTaskByID(%d) error(%v)", id, err)
  27. tx.Rollback()
  28. return
  29. }
  30. if rows > 0 {
  31. if _, err = s.arc.TxAddTaskHis(tx, 0, archive.ActionDelay /*action*/, id /*task_id*/, 0, uid /*uid*/, 0, 0, reason /*reason*/); err != nil {
  32. log.Error("s.arc.AddTaskLog(%d) error(%v)", id, err)
  33. tx.Rollback()
  34. return
  35. }
  36. }
  37. return tx.Commit()
  38. }
  39. // TaskSubmit 提交审核结果
  40. func (s *Service) TaskSubmit(c context.Context, id int64, uid int64, status int64) (err error) {
  41. var utime int64
  42. t, err := s.arc.TaskByID(c, id)
  43. if err != nil {
  44. log.Error(" s.arc.TaskByID(%d) error(%v)", id, err)
  45. return
  46. }
  47. switch {
  48. case t.State == archive.TypeDelay:
  49. utime = 0
  50. case t.GTime.TimeValue().IsZero():
  51. utime = int64(time.Since(t.MTime.TimeValue()).Seconds())
  52. default:
  53. utime = int64(time.Since(t.GTime.TimeValue()).Seconds())
  54. }
  55. tx, err := s.arc.BeginTran(c)
  56. if err != nil {
  57. log.Error("s.arc.BeginTran error(%v)", err)
  58. return
  59. }
  60. rows, err := s.arc.TxUpTaskByID(tx, id, map[string]interface{}{"state": archive.TypeFinished, "utime": utime})
  61. if err != nil {
  62. log.Error("s.arc.TxUpTaskByID(%d) error(%v)", id, err)
  63. tx.Rollback()
  64. return
  65. }
  66. if rows > 0 {
  67. if _, err = s.arc.TxAddTaskHis(tx, 0, archive.ActionSubmit /*action*/, id /*task_id*/, t.Cid /*cid*/, uid /*uid*/, utime /*utime*/, int16(status) /*result*/, "TaskSubmit" /*reason*/); err != nil {
  68. log.Error("s.arc.AddTaskLog(%d) error(%v)", id, err)
  69. tx.Rollback()
  70. return
  71. }
  72. }
  73. return tx.Commit()
  74. }
  75. // Next 领取任务
  76. func (s *Service) Next(c context.Context, uid int64) (task *archive.Task, err error) {
  77. var rows int64
  78. task, err = s.arc.GetNextTask(c, uid)
  79. if err != nil {
  80. log.Error("d.getTask(%d) error(%v)", uid, err)
  81. return
  82. }
  83. if task != nil {
  84. return
  85. }
  86. // 释放超时任务
  87. s.Free(c, 0)
  88. // 从实时任务池抢占
  89. if rows, err = s.dispatchTask(c, uid); err != nil {
  90. return
  91. } else if rows > 0 {
  92. return s.arc.GetNextTask(c, uid)
  93. }
  94. return
  95. }
  96. // Info 查询任务信息
  97. func (s *Service) Info(c context.Context, tid int64) (task *archive.Task, err error) {
  98. return s.arc.TaskByID(c, tid)
  99. }
  100. // 抢占任务(先抢占再查,避免重复下发)
  101. func (s *Service) dispatchTask(c context.Context, uid int64) (rows int64, err error) {
  102. var (
  103. tls []*archive.TaskForLog
  104. arrid []int64
  105. )
  106. if tls, err = s.arc.GetDispatchTask(c, uid); err != nil {
  107. log.Error("s.arc.GetDispatchTask(%d) error(%v)", uid, err)
  108. return
  109. }
  110. for _, item := range tls {
  111. arrid = append(arrid, item.ID)
  112. }
  113. if len(arrid) > 0 {
  114. if rows, err = s.arc.UpDispatchTask(c, uid, arrid); err != nil {
  115. log.Error("s.arc.UpDispatchTask(%d,%v) error(%v)", uid, arrid, err)
  116. return
  117. }
  118. // 日志允许错误
  119. if int(rows) == len(arrid) {
  120. log.Info("UpDispatchTask 更新数量(%d)", rows)
  121. } else {
  122. log.Warn("UpDispatchTask 更新数量(%d) 日志数量(%d)", rows, len(arrid))
  123. }
  124. s.arc.MulAddTaskHis(c, tls, archive.ActionDispatch, uid)
  125. }
  126. return
  127. }
  128. // Free 任务释放(有uid为主动释放,没有uid为被动释放)(先查再释放,有可能记录冗余释放信息)
  129. func (s *Service) Free(c context.Context, uid int64) (rows int64) {
  130. var (
  131. rts []*archive.TaskForLog
  132. ids, rtids []int64
  133. lastid int64
  134. err error
  135. mtime = time.Now()
  136. )
  137. if uid == 0 {
  138. if rts, err = s.arc.GetTimeOutTask(c); err != nil {
  139. log.Error("s.Free s.arc.GetTimeOutTask error(%v)", err)
  140. return
  141. }
  142. } else {
  143. if rts, lastid, err = s.arc.GetRelTask(c, uid); err != nil {
  144. log.Error("s.Free s.arc.GetRelTask(%d) error(%v)", uid, err)
  145. return
  146. }
  147. }
  148. mcases := make(map[int64]*archive.WCItem)
  149. for _, rt := range rts {
  150. ids = append(ids, rt.ID)
  151. if rt.Subject == 1 { //指派任务回流
  152. rtids = append(rtids, rt.ID)
  153. mcases[rt.ID] = &archive.WCItem{Radio: 4, Weight: archive.WLVConf.SubRelease, Mtime: utils.NewFormatTime(time.Now()), Desc: "指派回流权重"}
  154. }
  155. }
  156. if len(ids) > 0 {
  157. if rows, err = s.arc.MulReleaseMtime(c, ids, mtime); err != nil {
  158. log.Error("s.arc.MulReleaseMtime(%v, %v) error(%v)", ids, mtime, err)
  159. return
  160. }
  161. if rows > 0 {
  162. s.arc.MulAddTaskHis(c, rts, archive.ActionRelease, uid)
  163. }
  164. }
  165. if lastid > 0 {
  166. s.arc.UpGtimeByID(c, lastid, "0000-00-00 00:00:00")
  167. timelogout := time.Now()
  168. log.Info("添加延时释放任务(%d %v)", lastid, timelogout)
  169. time.AfterFunc(5*time.Minute, func() {
  170. s.releaseSpecial(timelogout, lastid, uid)
  171. })
  172. }
  173. if len(rtids) > 0 {
  174. s.setWeightConf(c, xstr.JoinInts(rtids), mcases)
  175. }
  176. return
  177. }
  178. func (s *Service) releaseSpecial(tout time.Time, taskid, uid int64) {
  179. tx, err := s.arc.BeginTran(context.TODO())
  180. if err != nil {
  181. log.Error(" s.arc.BeginTran error(%v)", err)
  182. return
  183. }
  184. rows, err := s.arc.TxReleaseSpecial(tx, tout, 1, taskid, uid)
  185. if err != nil {
  186. log.Error("s.arc.TxReleaseSpecial error(%v)", err)
  187. tx.Rollback()
  188. return
  189. }
  190. if rows > 0 {
  191. log.Info("s.arc.TxReleaseSpecial 释放任务(%d)", taskid)
  192. if _, err = s.arc.TxAddTaskHis(tx, 0, archive.ActionRelease, taskid, 0, uid, 0, 0, "登出延时释放"); err != nil {
  193. log.Error("s.arc.TxAddTaskHis error(%v)", err)
  194. tx.Rollback()
  195. return
  196. }
  197. }
  198. tx.Commit()
  199. }
  200. func (s *Service) getTWCache(c context.Context, ids []int64) (mcases map[int64]*archive.TaskPriority, err error) {
  201. if mcases, err = s.task.GetWeightRedis(c, ids); len(mcases) != len(ids) {
  202. mcases, err = s.arc.GetWeightDB(c, ids)
  203. }
  204. return
  205. }
  206. func (s *Service) judge(c context.Context, tid, aid, cid, uid int64) (err error) {
  207. var (
  208. rows int64
  209. tx *sql.Tx
  210. v *archive.Video
  211. a *archive.Archive
  212. )
  213. // 1.校验视频
  214. if v, err = s.arc.VideoByCID(c, cid); err != nil {
  215. log.Error("s.arc.VideoByCID(%d) error(%v)", cid, err)
  216. return
  217. }
  218. if v == nil || v.Status == archive.VideoStatusDelete {
  219. err = fmt.Errorf("视频(cid=%d)被删除", cid)
  220. goto DELETE
  221. }
  222. // 2.校验稿件
  223. if a, err = s.arc.Archive(c, aid); err != nil {
  224. log.Error("s.arc.Archive(%d) error(%v)", aid, err)
  225. return
  226. }
  227. if a == nil || a.State == archive.StateForbidUpDelete {
  228. err = fmt.Errorf("稿件(aid=%d)被删除", aid)
  229. }
  230. DELETE:
  231. if err != nil {
  232. if tx, err = s.arc.BeginTran(c); err != nil {
  233. log.Error("s.arc.BeginTran() error(%v)", err)
  234. return
  235. }
  236. if rows, err = s.arc.TxUpTaskByID(tx, tid, map[string]interface{}{"state": archive.TypeFinished, "utime": 0}); err != nil {
  237. log.Error("s.arc.TxUpTaskByID(%d) error(%v)", tid, err)
  238. tx.Rollback()
  239. return
  240. }
  241. if rows > 0 {
  242. if _, err = s.arc.TxAddTaskHis(tx, 0 /*pool*/, archive.ActionTaskDelete /*action*/, tid /*task_id*/, cid /*cid*/, uid /*uid*/, 0 /*utime*/, archive.VideoStatusDelete /*result*/, "judge delete" /*reason*/); err != nil {
  243. log.Error("s.arc.AddTaskLog(%d) error(%v)", tid, err)
  244. tx.Rollback()
  245. return
  246. }
  247. }
  248. return tx.Commit()
  249. }
  250. return
  251. }
  252. // CheckOwner 检查任务状态修改权限
  253. func (s *Service) CheckOwner(c context.Context, tid, uid int64) (err error) {
  254. var role int8
  255. var rows int64
  256. task, err := s.arc.TaskByID(c, tid)
  257. if task == nil || err != nil {
  258. log.Error("s.arc.TaskByID(%d) error(%v)", tid, err)
  259. return
  260. }
  261. if err = s.judge(c, task.ID, task.Aid, task.Cid, uid); err != nil {
  262. log.Error("s.judge(%+v) error(%v)", task, err)
  263. return
  264. }
  265. if role, err = s.mng.GetUserRole(c, uid); err != nil || role == 0 {
  266. err = fmt.Errorf("非法用户(%d)", uid)
  267. return
  268. }
  269. if task.State == archive.TypeDelay || task.State == archive.TypeSpecial {
  270. return
  271. }
  272. if !s.CheckOnline(c, uid) {
  273. err = fmt.Errorf("请先签到(%d)", uid)
  274. return
  275. }
  276. if role == manager.TaskLeader {
  277. return
  278. }
  279. if task.UID != uid {
  280. err = fmt.Errorf("没有权限处理该任务")
  281. return
  282. }
  283. // 普通用户处理超时了,将任务释放掉
  284. if task.State == archive.TypeDispatched && time.Since(task.GTime.TimeValue()).Minutes() > 10.0 {
  285. var tx *sql.Tx
  286. if tx, err = s.arc.BeginTran(c); err != nil {
  287. log.Error("s.arc.BeginTran() error(%v)", err)
  288. return
  289. }
  290. if rows, err = s.arc.TxUpTaskByID(tx, tid, map[string]interface{}{"state": archive.TypeRealTime, "uid": 0, "gtime": "0000-00-00 00:00:00"}); err != nil {
  291. log.Error("s.arc.TxUpTaskByID(%d) error(%v)", tid, err)
  292. tx.Rollback()
  293. return
  294. }
  295. if rows > 0 {
  296. if _, err = s.arc.TxAddTaskHis(tx, 0 /*pool*/, archive.ActionRelease /*action*/, tid /*task_id*/, 0 /*cid*/, uid /*uid*/, 0 /*utime*/, 0 /*result*/, "timeout release" /*reason*/); err != nil {
  297. log.Error("s.arc.AddTaskLog(%d) error(%v)", tid, err)
  298. tx.Rollback()
  299. return
  300. }
  301. }
  302. return tx.Commit()
  303. }
  304. return
  305. }