task_dispatch.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package archive
  2. import (
  3. "context"
  4. xsql "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "strings"
  8. "time"
  9. "go-common/app/admin/main/videoup/model/archive"
  10. "go-common/app/admin/main/videoup/model/utils"
  11. "go-common/library/database/sql"
  12. "go-common/library/log"
  13. "go-common/library/xstr"
  14. )
  15. const (
  16. _userUndoneSpecifiedSQL = "SELECT id,pool,subject,adminid,aid,cid,uid,state,ctime,mtime FROM task_dispatch WHERE uid = ? AND state !=2 AND subject = 1"
  17. _dispatchTaskSQL = "SELECT id,cid,mtime FROM task_dispatch WHERE uid in (0,?) AND state = 0 ORDER BY `weight` DESC,`subject` DESC,`id` ASC limit 8"
  18. _upDispatchTaskSQL = "UPDATE task_dispatch SET state=1,uid=?,gtime='0000-00-00 00:00:00' WHERE id IN (%s) AND state=0"
  19. _getNextTaskSQL = "SELECT id,pool,subject,adminid,aid,cid,uid,state,utime,ctime,mtime,dtime,gtime,weight FROM task_dispatch WHERE uid=? AND state = 1 ORDER BY `weight` DESC,`subject` DESC,`id` ASC limit 1"
  20. _upTaskGtimeSQL = "UPDATE task_dispatch SET gtime=? WHERE id=?"
  21. _listByConditionSQL = "SELECT id,pool,subject,adminid,aid,cid,uid,state,utime,ctime,mtime,dtime,gtime,weight FROM task_dispatch where %s order by %s %s"
  22. _taskByIDSQL = "SELECT id,pool,subject,adminid,aid,cid,uid,state,utime,ctime,mtime,dtime,gtime,ptime,weight FROM task_dispatch WHERE id =? union " +
  23. "SELECT task_id as id,pool,subject,adminid,aid,cid,uid,state,utime,ctime,mtime,dtime,gtime,ptime,weight FROM task_dispatch_done WHERE task_id=?"
  24. _getWeightDBSQL = "SELECT t.id,t.state,a.mid,t.ctime,t.upspecial,t.ptime,e.description FROM `task_dispatch` AS t " +
  25. "LEFT JOIN `task_dispatch_extend` AS e ON t.id=e.task_id INNER JOIN archive as a ON a.id=t.aid WHERE t.id IN (%s)"
  26. _taskDispatchByIDSQL = `SELECT id,subject,aid,cid,uid,state,ctime,utime,mtime,dtime,gtime FROM task_dispatch WHERE id=?`
  27. )
  28. // UserUndoneSpecTask get undone dispatch which belongs to someone.
  29. func (d *Dao) UserUndoneSpecTask(c context.Context, uid int64) (tasks []*archive.Task, err error) {
  30. rows, err := d.db.Query(c, _userUndoneSpecifiedSQL, uid)
  31. if err != nil {
  32. log.Error("d.db.Query() error(%v)", err)
  33. return
  34. }
  35. defer rows.Close()
  36. for rows.Next() {
  37. t := &archive.Task{}
  38. if err = rows.Scan(&t.ID, &t.Pool, &t.Subject, &t.AdminID, &t.Aid, &t.Cid, &t.UID, &t.State, &t.CTime, &t.MTime); err != nil {
  39. if err == sql.ErrNoRows {
  40. err = nil
  41. return
  42. }
  43. log.Error("row.Scan(%d) error(%v)", err)
  44. return
  45. }
  46. tasks = append(tasks, t)
  47. }
  48. return
  49. }
  50. // GetDispatchTask 获取抢占到的任务(用于记录日志)
  51. func (d *Dao) GetDispatchTask(c context.Context, uid int64) (tls []*archive.TaskForLog, err error) {
  52. rows, err := d.rddb.Query(c, _dispatchTaskSQL, uid)
  53. if err != nil {
  54. log.Error("d.rddb.Query(%s, %d) error(%v)", _dispatchTaskSQL, uid, err)
  55. return
  56. }
  57. defer rows.Close()
  58. for rows.Next() {
  59. taskLog := &archive.TaskForLog{}
  60. if err = rows.Scan(&taskLog.ID, &taskLog.Cid, &taskLog.Mtime); err != nil {
  61. log.Error("rows.Scan(%s, %d) error(%v)", _dispatchTaskSQL, uid, err)
  62. return
  63. }
  64. tls = append(tls, taskLog)
  65. }
  66. return
  67. }
  68. // UpDispatchTask 抢占任务
  69. func (d *Dao) UpDispatchTask(c context.Context, uid int64, ids []int64) (rows int64, err error) {
  70. var (
  71. res xsql.Result
  72. sqlstring = fmt.Sprintf(_upDispatchTaskSQL, xstr.JoinInts(ids))
  73. )
  74. res, err = d.db.Exec(c, sqlstring, uid)
  75. if err != nil {
  76. log.Error("d.db.Exec(%s %d %v) error(%v)", sqlstring, uid, err)
  77. return
  78. }
  79. return res.RowsAffected()
  80. }
  81. // GetNextTask 获取一条任务
  82. func (d *Dao) GetNextTask(c context.Context, uid int64) (task *archive.Task, err error) {
  83. task = new(archive.Task)
  84. err = d.rddb.QueryRow(c, _getNextTaskSQL, uid).Scan(&task.ID, &task.Pool, &task.Subject, &task.AdminID,
  85. &task.Aid, &task.Cid, &task.UID, &task.State, &task.UTime, &task.CTime, &task.MTime, &task.DTime, &task.GTime, &task.Weight)
  86. if err != nil {
  87. if err == sql.ErrNoRows {
  88. return nil, nil
  89. }
  90. log.Error("db.QueryRow(%d) error(%v)", err)
  91. return nil, err
  92. }
  93. if task.GTime.TimeValue().IsZero() {
  94. timeNow := time.Now()
  95. _, err = d.db.Exec(c, _upTaskGtimeSQL, timeNow, task.ID)
  96. if err != nil {
  97. log.Error("d.db.Exec(%v,%d) error(%v)", timeNow, task.ID, err)
  98. return nil, err
  99. }
  100. task.GTime = utils.NewFormatTime(timeNow)
  101. }
  102. return
  103. }
  104. // TaskByID get task
  105. func (d *Dao) TaskByID(c context.Context, id int64) (task *archive.Task, err error) {
  106. task = new(archive.Task)
  107. err = d.rddb.QueryRow(c, _taskByIDSQL, id, id).Scan(&task.ID, &task.Pool, &task.Subject, &task.AdminID,
  108. &task.Aid, &task.Cid, &task.UID, &task.State, &task.UTime, &task.CTime, &task.MTime, &task.DTime, &task.GTime, &task.PTime, &task.Weight)
  109. if err != nil {
  110. if err == sql.ErrNoRows {
  111. return nil, nil
  112. }
  113. log.Error("db.QueryRow(%d) error(%v)", err)
  114. return nil, err
  115. }
  116. return
  117. }
  118. // ListByCondition 从数据库获取读取任务列表
  119. func (d *Dao) ListByCondition(c context.Context, uid int64, pn, ps int, ltype, leader int8) (tasks []*archive.Task, err error) {
  120. var task *archive.Task
  121. tasks = []*archive.Task{}
  122. if !archive.IsDispatch(ltype) {
  123. log.Error("ListByCondition listtype(%d) error", ltype)
  124. return
  125. }
  126. listSQL := d.sqlHelper(uid, pn, ps, ltype, leader)
  127. rows, err := d.rddb.Query(c, listSQL)
  128. if err != nil {
  129. log.Error("rddb.Query(%s) error(%v)", listSQL, err)
  130. return
  131. }
  132. defer rows.Close()
  133. for rows.Next() {
  134. task = &archive.Task{}
  135. err = rows.Scan(&task.ID, &task.Pool, &task.Subject, &task.AdminID,
  136. &task.Aid, &task.Cid, &task.UID, &task.State, &task.UTime, &task.CTime, &task.MTime, &task.DTime, &task.GTime, &task.Weight)
  137. if err != nil {
  138. log.Error("rows.Scan(%s) error(%v)", listSQL, err)
  139. return nil, nil
  140. }
  141. tasks = append(tasks, task)
  142. }
  143. return
  144. }
  145. func (d *Dao) sqlHelper(uid int64, pn, ps int, ltype int8, leader int8) string {
  146. var (
  147. wherecase []string
  148. ordercase []string
  149. limitStr string
  150. whereStr string
  151. orderStr string
  152. )
  153. limitStr = fmt.Sprintf("LIMIT %d,%d", (pn-1)*ps, ps)
  154. if uid != 0 && (ltype != archive.TypeRealTime && leader != 1) { //实时任务或者组长不区分uid
  155. wherecase = append(wherecase, fmt.Sprintf("uid=%d", uid))
  156. }
  157. ordercase = append(ordercase, "weight desc,ctime asc")
  158. switch ltype {
  159. case archive.TypeRealTime:
  160. wherecase = append(wherecase, "state=0")
  161. case archive.TypeDispatched:
  162. wherecase = append(wherecase, "state=1 AND subject=0")
  163. ordercase = append(ordercase, "utime desc")
  164. case archive.TypeDelay:
  165. wherecase = append(wherecase, "state=3")
  166. ordercase = append(ordercase, "dtime asc")
  167. case archive.TypeSpecial:
  168. wherecase = append(wherecase, "state=5 AND subject=1")
  169. ordercase = append(ordercase, "mtime asc")
  170. case archive.TypeSpecialWait:
  171. wherecase = append(wherecase, "state=1 AND subject=1")
  172. ordercase = append(ordercase, "utime desc")
  173. default:
  174. wherecase = append(wherecase, "state=0")
  175. }
  176. whereStr = strings.Join(wherecase, " AND ")
  177. orderStr = strings.Join(ordercase, ",")
  178. return fmt.Sprintf(_listByConditionSQL, whereStr, orderStr, limitStr)
  179. }
  180. // GetWeightDB 从数据库读取权重配置
  181. func (d *Dao) GetWeightDB(c context.Context, ids []int64) (mcases map[int64]*archive.TaskPriority, err error) {
  182. var (
  183. rows *sql.Rows
  184. desc xsql.NullString
  185. )
  186. sqlstring := fmt.Sprintf(_getWeightDBSQL, xstr.JoinInts(ids))
  187. if rows, err = d.db.Query(c, sqlstring); err != nil {
  188. log.Error("d.db.Query(%s) error(%v)", sqlstring, err)
  189. return
  190. }
  191. defer rows.Close()
  192. mcases = make(map[int64]*archive.TaskPriority)
  193. for rows.Next() {
  194. tp := new(archive.TaskPriority)
  195. if err = rows.Scan(&tp.TaskID, &tp.State, &tp.Mid, &tp.Ctime, &tp.Special, &tp.Ptime, &desc); err != nil {
  196. log.Error("rows.Scan error(%v)", err)
  197. return
  198. }
  199. if desc.Valid && len(desc.String) > 0 {
  200. if err = json.Unmarshal([]byte(desc.String), &(tp.CfItems)); err != nil {
  201. log.Error("json.Unmarshal error(%v)", err)
  202. return
  203. }
  204. }
  205. mcases[tp.TaskID] = tp
  206. }
  207. return
  208. }
  209. //TaskDispatchByID task by id
  210. func (d *Dao) TaskDispatchByID(c context.Context, id int64) (tk *archive.Task, err error) {
  211. tk = &archive.Task{}
  212. if err = d.rddb.QueryRow(c, _taskDispatchByIDSQL, id).Scan(&tk.ID, &tk.Subject, &tk.Aid, &tk.Cid, &tk.UID, &tk.State, &tk.CTime, &tk.UTime, &tk.MTime, &tk.DTime, &tk.GTime); err != nil {
  213. if err == sql.ErrNoRows {
  214. err = nil
  215. } else {
  216. log.Error("TaskDispatchByID rows.Scan error(%v) id(%d)", err, id)
  217. }
  218. }
  219. return
  220. }