task_dispatch.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package dao
  2. import (
  3. "context"
  4. xsql "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "strings"
  8. "time"
  9. "go-common/app/admin/main/videoup-task/model"
  10. "go-common/library/database/sql"
  11. "go-common/library/log"
  12. "go-common/library/xstr"
  13. )
  14. const (
  15. _upTaskByIDSQL = "UPDATE task_dispatch SET %s WHERE id=?"
  16. _upGtimeByIDSQL = "UPDATE task_dispatch SET gtime=? WHERE id=?"
  17. _releaseByIDSQL = "UPDATE task_dispatch SET subject=0,state=0,uid=0,gtime='0000-00-00 00:00:00' WHERE id=?"
  18. _releaseMtimeSQL = "UPDATE task_dispatch SET subject=0,state=0,uid=0,gtime='0000-00-00 00:00:00' WHERE id IN (%s) AND mtime<=?"
  19. _timeOutTaskSQL = "SELECT id,cid,subject,mtime FROM task_dispatch WHERE (state=1 AND mtime<?) OR (state=0 AND uid<>0 AND ctime<?)"
  20. _getRelTaskSQL = "SELECT id,cid,subject,mtime,gtime FROM task_dispatch WHERE state IN (0,1) AND uid=?"
  21. _releaseSpecialSQL = "UPDATE task_dispatch SET subject=0,state=0,uid=0 WHERE id=? AND gtime='0000-00-00 00:00:00' AND mtime<=? AND state=? AND uid=?"
  22. )
  23. // UpGtimeByID update gtime
  24. func (d *Dao) UpGtimeByID(c context.Context, id int64, gtime string) (rows int64, err error) {
  25. var res xsql.Result
  26. if res, err = d.arcDB.Exec(c, _upGtimeByIDSQL, gtime, id); err != nil {
  27. log.Error("d.arcDB.Exec(%s, %v, %d) error(%v)", _upGtimeByIDSQL, gtime, id)
  28. return
  29. }
  30. return res.RowsAffected()
  31. }
  32. // TxUpTaskByID 更新任务状态
  33. func (d *Dao) TxUpTaskByID(tx *sql.Tx, id int64, paras map[string]interface{}) (rows int64, err error) {
  34. arrSet := []string{}
  35. arrParas := []interface{}{}
  36. for k, v := range paras {
  37. arrSet = append(arrSet, k+"=?")
  38. arrParas = append(arrParas, v)
  39. }
  40. arrParas = append(arrParas, id)
  41. sqlstring := fmt.Sprintf(_upTaskByIDSQL, strings.Join(arrSet, ","))
  42. res, err := tx.Exec(sqlstring, arrParas...)
  43. if err != nil {
  44. log.Error("tx.Exec(%v %v) error(%v)", sqlstring, arrParas, err)
  45. return
  46. }
  47. return res.RowsAffected()
  48. }
  49. // TxReleaseByID 释放指定任务
  50. func (d *Dao) TxReleaseByID(tx *sql.Tx, id int64) (rows int64, err error) {
  51. res, err := tx.Exec(_releaseByIDSQL, id)
  52. if err != nil {
  53. log.Error("tx.Exec(%s, %d) error(%v)", _releaseByIDSQL, id, err)
  54. return
  55. }
  56. return res.RowsAffected()
  57. }
  58. // MulReleaseMtime 批量释放任务,加时间防止释放错误
  59. func (d *Dao) MulReleaseMtime(c context.Context, ids []int64, mtime time.Time) (rows int64, err error) {
  60. sqlstring := fmt.Sprintf(_releaseMtimeSQL, xstr.JoinInts(ids))
  61. res, err := d.arcDB.Exec(c, sqlstring, mtime)
  62. if err != nil {
  63. log.Error("tx.Exec(%s, %v) error(%v)", sqlstring, mtime, err)
  64. return
  65. }
  66. return res.RowsAffected()
  67. }
  68. // GetTimeOutTask 释放正在处理且超时的,释放指派后但长时间未审核的
  69. func (d *Dao) GetTimeOutTask(c context.Context) (rts []*model.TaskForLog, err error) {
  70. var (
  71. rows *sql.Rows
  72. )
  73. if rows, err = d.arcDB.Query(c, _timeOutTaskSQL, time.Now().Add(-10*time.Minute), time.Now().Add(-80*time.Minute)); err != nil {
  74. log.Error("d.arcDB.Query(%s) error(%v)", _timeOutTaskSQL, err)
  75. return
  76. }
  77. defer rows.Close()
  78. for rows.Next() {
  79. rt := &model.TaskForLog{}
  80. if err = rows.Scan(&rt.ID, &rt.Cid, &rt.Subject, &rt.Mtime); err != nil {
  81. log.Error("rows.Scan error(%v)", err)
  82. return
  83. }
  84. rts = append(rts, rt)
  85. }
  86. return
  87. }
  88. // GetRelTask 用户登出或者主动释放(分配给该用户的都释放)
  89. func (d *Dao) GetRelTask(c context.Context, uid int64) (rts []*model.TaskForLog, lastid int64, err error) {
  90. var (
  91. gtime time.Time
  92. rows *sql.Rows
  93. )
  94. if rows, err = d.arcDB.Query(c, _getRelTaskSQL, uid); err != nil {
  95. log.Error("d.arcDB.Query(%s, %d) error(%v)", _getRelTaskSQL, uid, err)
  96. return
  97. }
  98. defer rows.Close()
  99. for rows.Next() {
  100. rt := &model.TaskForLog{}
  101. if err = rows.Scan(&rt.ID, &rt.Cid, &rt.Subject, &rt.Mtime, &gtime); err != nil {
  102. log.Error("rows.Scan error(%v)", err)
  103. return
  104. }
  105. if gtime.IsZero() {
  106. rts = append(rts, rt)
  107. } else {
  108. lastid = rt.ID
  109. }
  110. }
  111. return
  112. }
  113. // TxReleaseSpecial 延时固定时间释放的任务,需要校验释放时的状态,时间,认领人等
  114. func (d *Dao) TxReleaseSpecial(tx *sql.Tx, mtime time.Time, state int8, taskid, uid int64) (rows int64, err error) {
  115. res, err := tx.Exec(_releaseSpecialSQL, taskid, mtime, state, uid)
  116. if err != nil {
  117. log.Error("tx.Exec(%s, %d, %v, %d, %d) error(%v)", _releaseSpecialSQL, taskid, mtime, state, uid, err)
  118. return
  119. }
  120. return res.RowsAffected()
  121. }
  122. const (
  123. _userUndoneSpecifiedSQL = "SELECT id,pool,subject,adminid,aid,cid,uid,state,ctime,mtime FROM task_dispatch WHERE uid = ? AND state !=2 AND subject = 1"
  124. _dispatchTaskSQL = "SELECT id,cid,mtime FROM task_dispatch WHERE uid in (0,?) AND state = 0 ORDER BY `weight` DESC,`subject` DESC,`id` ASC limit 8"
  125. _upDispatchTaskSQL = "UPDATE task_dispatch SET state=1,uid=?,gtime='0000-00-00 00:00:00' WHERE id IN (%s) AND state=0"
  126. _getNextTaskSQL = "SELECT id,pool,subject,adminid,aid,cid,uid,state,utime,ctime,mtime,dtime,gtime,weight FROM task_dispatch WHERE uid=? AND state = 1 ORDER BY `weight` DESC,`subject` DESC,`id` ASC limit 1"
  127. _upTaskGtimeSQL = "UPDATE task_dispatch SET gtime=? WHERE id=?"
  128. _listByConditionSQL = "SELECT id,pool,subject,adminid,aid,cid,uid,state,utime,ctime,mtime,dtime,gtime,weight FROM task_dispatch where %s order by %s %s"
  129. _taskByIDSQL = "SELECT id,pool,subject,adminid,aid,cid,uid,state,utime,ctime,mtime,dtime,gtime,ptime,weight FROM task_dispatch WHERE id =? union " +
  130. "SELECT task_id as id,pool,subject,adminid,aid,cid,uid,state,utime,ctime,mtime,dtime,gtime,ptime,weight FROM task_dispatch_done WHERE task_id=?"
  131. _getWeightDBSQL = "SELECT t.id,t.state,a.mid,t.ctime,t.upspecial,t.ptime,e.description FROM `task_dispatch` AS t " +
  132. "LEFT JOIN `task_dispatch_extend` AS e ON t.id=e.task_id INNER JOIN archive as a ON a.id=t.aid WHERE t.id IN (%s)"
  133. )
  134. // UserUndoneSpecTask get undone dispatch which belongs to someone.
  135. func (d *Dao) UserUndoneSpecTask(c context.Context, uid int64) (tasks []*model.Task, err error) {
  136. rows, err := d.arcDB.Query(c, _userUndoneSpecifiedSQL, uid)
  137. if err != nil {
  138. log.Error("d.arcDB.Query() error(%v)", err)
  139. return
  140. }
  141. defer rows.Close()
  142. for rows.Next() {
  143. t := &model.Task{}
  144. if err = rows.Scan(&t.ID, &t.Pool, &t.Subject, &t.AdminID, &t.Aid, &t.Cid, &t.UID, &t.State, &t.CTime, &t.MTime); err != nil {
  145. if err == sql.ErrNoRows {
  146. err = nil
  147. return
  148. }
  149. log.Error("row.Scan(%d) error(%v)", err)
  150. return
  151. }
  152. tasks = append(tasks, t)
  153. }
  154. return
  155. }
  156. // GetDispatchTask 获取抢占到的任务(用于记录日志)
  157. func (d *Dao) GetDispatchTask(c context.Context, uid int64) (tls []*model.TaskForLog, err error) {
  158. rows, err := d.arcDB.Query(c, _dispatchTaskSQL, uid)
  159. if err != nil {
  160. log.Error("d.arcDB.Query(%s, %d) error(%v)", _dispatchTaskSQL, uid, err)
  161. return
  162. }
  163. defer rows.Close()
  164. for rows.Next() {
  165. taskLog := &model.TaskForLog{}
  166. if err = rows.Scan(&taskLog.ID, &taskLog.Cid, &taskLog.Mtime); err != nil {
  167. log.Error("rows.Scan(%s, %d) error(%v)", _dispatchTaskSQL, uid, err)
  168. return
  169. }
  170. tls = append(tls, taskLog)
  171. }
  172. return
  173. }
  174. // UpDispatchTask 抢占任务
  175. func (d *Dao) UpDispatchTask(c context.Context, uid int64, ids []int64) (rows int64, err error) {
  176. var (
  177. res xsql.Result
  178. sqlstring = fmt.Sprintf(_upDispatchTaskSQL, xstr.JoinInts(ids))
  179. )
  180. res, err = d.arcDB.Exec(c, sqlstring, uid)
  181. if err != nil {
  182. log.Error("d.arcDB.Exec(%s %d %v) error(%v)", sqlstring, uid, err)
  183. return
  184. }
  185. return res.RowsAffected()
  186. }
  187. // GetNextTask 获取一条任务
  188. func (d *Dao) GetNextTask(c context.Context, uid int64) (task *model.Task, err error) {
  189. task = new(model.Task)
  190. err = d.arcDB.QueryRow(c, _getNextTaskSQL, uid).Scan(&task.ID, &task.Pool, &task.Subject, &task.AdminID,
  191. &task.Aid, &task.Cid, &task.UID, &task.State, &task.UTime, &task.CTime, &task.MTime, &task.DTime, &task.GTime, &task.Weight)
  192. if err != nil {
  193. if err == sql.ErrNoRows {
  194. return nil, nil
  195. }
  196. log.Error("db.QueryRow(%d) error(%v)", err)
  197. return nil, err
  198. }
  199. if task.GTime.TimeValue().IsZero() {
  200. timeNow := time.Now()
  201. _, err = d.arcDB.Exec(c, _upTaskGtimeSQL, timeNow, task.ID)
  202. if err != nil {
  203. log.Error("d.arcDB.Exec(%v,%d) error(%v)", timeNow, task.ID, err)
  204. return nil, err
  205. }
  206. task.GTime = model.NewFormatTime(timeNow)
  207. }
  208. return
  209. }
  210. // TaskByID get task
  211. func (d *Dao) TaskByID(c context.Context, id int64) (task *model.Task, err error) {
  212. task = new(model.Task)
  213. err = d.arcDB.QueryRow(c, _taskByIDSQL, id, id).Scan(&task.ID, &task.Pool, &task.Subject, &task.AdminID,
  214. &task.Aid, &task.Cid, &task.UID, &task.State, &task.UTime, &task.CTime, &task.MTime, &task.DTime, &task.GTime, &task.PTime, &task.Weight)
  215. if err != nil {
  216. if err == sql.ErrNoRows {
  217. err = nil
  218. task = nil
  219. return
  220. }
  221. log.Error("db.QueryRow(%d) error(%v)", id, err)
  222. return nil, err
  223. }
  224. return
  225. }
  226. // ListByCondition 从数据库获取读取任务列表
  227. func (d *Dao) ListByCondition(c context.Context, uid int64, pn, ps int, ltype, leader int8) (tasks []*model.Task, err error) {
  228. var task *model.Task
  229. tasks = []*model.Task{}
  230. if !model.IsDispatch(ltype) {
  231. log.Error("ListByCondition listtype(%d) error", ltype)
  232. return
  233. }
  234. listSQL := d.sqlHelper(uid, pn, ps, ltype, leader)
  235. rows, err := d.arcDB.Query(c, listSQL)
  236. if err != nil {
  237. log.Error("rddb.Query(%s) error(%v)", listSQL, err)
  238. return
  239. }
  240. defer rows.Close()
  241. for rows.Next() {
  242. task = &model.Task{}
  243. err = rows.Scan(&task.ID, &task.Pool, &task.Subject, &task.AdminID,
  244. &task.Aid, &task.Cid, &task.UID, &task.State, &task.UTime, &task.CTime, &task.MTime, &task.DTime, &task.GTime, &task.Weight)
  245. if err != nil {
  246. log.Error("rows.Scan(%s) error(%v)", listSQL, err)
  247. return nil, nil
  248. }
  249. tasks = append(tasks, task)
  250. }
  251. return
  252. }
  253. func (d *Dao) sqlHelper(uid int64, pn, ps int, ltype int8, leader int8) string {
  254. var (
  255. wherecase []string
  256. ordercase []string
  257. limitStr string
  258. whereStr string
  259. orderStr string
  260. )
  261. limitStr = fmt.Sprintf("LIMIT %d,%d", (pn-1)*ps, ps)
  262. if uid != 0 && (ltype != model.TypeRealTime && leader != 1) { //实时任务或者组长不区分uid
  263. wherecase = append(wherecase, fmt.Sprintf("uid=%d", uid))
  264. }
  265. ordercase = append(ordercase, "weight desc,ctime asc")
  266. switch ltype {
  267. case model.TypeRealTime:
  268. wherecase = append(wherecase, "state=0")
  269. case model.TypeDispatched:
  270. wherecase = append(wherecase, "state=1 AND subject=0")
  271. ordercase = append(ordercase, "utime desc")
  272. case model.TypeDelay:
  273. wherecase = append(wherecase, "state=3")
  274. ordercase = append(ordercase, "dtime asc")
  275. case model.TypeReview:
  276. wherecase = append(wherecase, "state=5")
  277. ordercase = append(ordercase, "mtime asc")
  278. case model.TypeSpecialWait:
  279. wherecase = append(wherecase, "state=1 AND subject=1")
  280. ordercase = append(ordercase, "utime desc")
  281. default:
  282. wherecase = append(wherecase, "state=0")
  283. }
  284. whereStr = strings.Join(wherecase, " AND ")
  285. orderStr = strings.Join(ordercase, ",")
  286. return fmt.Sprintf(_listByConditionSQL, whereStr, orderStr, limitStr)
  287. }
  288. // GetWeightDB 从数据库读取权重配置
  289. func (d *Dao) GetWeightDB(c context.Context, ids []int64) (mcases map[int64]*model.TaskPriority, err error) {
  290. var (
  291. rows *sql.Rows
  292. desc xsql.NullString
  293. )
  294. sqlstring := fmt.Sprintf(_getWeightDBSQL, xstr.JoinInts(ids))
  295. if rows, err = d.arcDB.Query(c, sqlstring); err != nil {
  296. log.Error("d.arcDB.Query(%s) error(%v)", sqlstring, err)
  297. return
  298. }
  299. defer rows.Close()
  300. mcases = make(map[int64]*model.TaskPriority)
  301. for rows.Next() {
  302. tp := new(model.TaskPriority)
  303. if err = rows.Scan(&tp.TaskID, &tp.State, &tp.Mid, &tp.Ctime, &tp.Special, &tp.Ptime, &desc); err != nil {
  304. log.Error("rows.Scan error(%v)", err)
  305. return
  306. }
  307. if desc.Valid && len(desc.String) > 0 {
  308. if err = json.Unmarshal([]byte(desc.String), &(tp.CfItems)); err != nil {
  309. log.Error("json.Unmarshal error(%v)", err)
  310. return
  311. }
  312. }
  313. mcases[tp.TaskID] = tp
  314. }
  315. return
  316. }