task_state.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package archive
  2. import (
  3. "context"
  4. xsql "database/sql"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "go-common/app/admin/main/videoup/model/archive"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "go-common/library/xstr"
  12. )
  13. const (
  14. _upTaskByIDSQL = "UPDATE task_dispatch SET %s WHERE id=?"
  15. _upGtimeByIDSQL = "UPDATE task_dispatch SET gtime=? WHERE id=?"
  16. _releaseByIDSQL = "UPDATE task_dispatch SET subject=0,state=0,uid=0,gtime='0000-00-00 00:00:00' WHERE id=?"
  17. _releaseMtimeSQL = "UPDATE task_dispatch SET subject=0,state=0,uid=0,gtime='0000-00-00 00:00:00' WHERE id IN (%s) AND mtime<=?"
  18. _timeOutTaskSQL = "SELECT id,cid,subject,mtime FROM task_dispatch WHERE (state=1 AND mtime<?) OR (state=0 AND uid<>0 AND ctime<?)"
  19. _getRelTaskSQL = "SELECT id,cid,subject,mtime,gtime FROM task_dispatch WHERE state IN (0,1) AND uid=?"
  20. _releaseSpecialSQL = "UPDATE task_dispatch SET subject=0,state=0,uid=0 WHERE id=? AND gtime='0000-00-00 00:00:00' AND mtime<=? AND state=? AND uid=?"
  21. )
  22. // UpGtimeByID update gtime
  23. func (d *Dao) UpGtimeByID(c context.Context, id int64, gtime string) (rows int64, err error) {
  24. var res xsql.Result
  25. if res, err = d.db.Exec(c, _upGtimeByIDSQL, gtime, id); err != nil {
  26. log.Error("d.db.Exec(%s, %v, %d) error(%v)", _upGtimeByIDSQL, gtime, id)
  27. return
  28. }
  29. return res.RowsAffected()
  30. }
  31. // TxUpTaskByID 更新任务状态
  32. func (d *Dao) TxUpTaskByID(tx *sql.Tx, id int64, paras map[string]interface{}) (rows int64, err error) {
  33. arrSet := []string{}
  34. arrParas := []interface{}{}
  35. for k, v := range paras {
  36. arrSet = append(arrSet, k+"=?")
  37. arrParas = append(arrParas, v)
  38. }
  39. arrParas = append(arrParas, id)
  40. sqlstring := fmt.Sprintf(_upTaskByIDSQL, strings.Join(arrSet, ","))
  41. res, err := tx.Exec(sqlstring, arrParas...)
  42. if err != nil {
  43. log.Error("tx.Exec(%v %v) error(%v)", sqlstring, arrParas, err)
  44. return
  45. }
  46. return res.RowsAffected()
  47. }
  48. // TxReleaseByID 释放指定任务
  49. func (d *Dao) TxReleaseByID(tx *sql.Tx, id int64) (rows int64, err error) {
  50. res, err := tx.Exec(_releaseByIDSQL, id)
  51. if err != nil {
  52. log.Error("tx.Exec(%s, %d) error(%v)", _releaseByIDSQL, id, err)
  53. return
  54. }
  55. return res.RowsAffected()
  56. }
  57. // MulReleaseMtime 批量释放任务,加时间防止释放错误
  58. func (d *Dao) MulReleaseMtime(c context.Context, ids []int64, mtime time.Time) (rows int64, err error) {
  59. sqlstring := fmt.Sprintf(_releaseMtimeSQL, xstr.JoinInts(ids))
  60. res, err := d.db.Exec(c, sqlstring, mtime)
  61. if err != nil {
  62. log.Error("tx.Exec(%s, %v) error(%v)", sqlstring, mtime, err)
  63. return
  64. }
  65. return res.RowsAffected()
  66. }
  67. // GetTimeOutTask 释放正在处理且超时的,释放指派后但长时间未审核的
  68. func (d *Dao) GetTimeOutTask(c context.Context) (rts []*archive.TaskForLog, err error) {
  69. var (
  70. rows *sql.Rows
  71. )
  72. if rows, err = d.rddb.Query(c, _timeOutTaskSQL, time.Now().Add(-10*time.Minute), time.Now().Add(-80*time.Minute)); err != nil {
  73. log.Error("d.rddb.Query(%s) error(%v)", _timeOutTaskSQL, err)
  74. return
  75. }
  76. defer rows.Close()
  77. for rows.Next() {
  78. rt := &archive.TaskForLog{}
  79. if err = rows.Scan(&rt.ID, &rt.Cid, &rt.Subject, &rt.Mtime); err != nil {
  80. log.Error("rows.Scan error(%v)", err)
  81. return
  82. }
  83. rts = append(rts, rt)
  84. }
  85. return
  86. }
  87. // GetRelTask 用户登出或者主动释放(分配给该用户的都释放)
  88. func (d *Dao) GetRelTask(c context.Context, uid int64) (rts []*archive.TaskForLog, lastid int64, err error) {
  89. var (
  90. gtime time.Time
  91. rows *sql.Rows
  92. )
  93. if rows, err = d.rddb.Query(c, _getRelTaskSQL, uid); err != nil {
  94. log.Error("d.rddb.Query(%s, %d) error(%v)", _getRelTaskSQL, uid, err)
  95. return
  96. }
  97. defer rows.Close()
  98. for rows.Next() {
  99. rt := &archive.TaskForLog{}
  100. if err = rows.Scan(&rt.ID, &rt.Cid, &rt.Subject, &rt.Mtime, &gtime); err != nil {
  101. log.Error("rows.Scan error(%v)", err)
  102. return
  103. }
  104. if gtime.IsZero() {
  105. rts = append(rts, rt)
  106. } else {
  107. lastid = rt.ID
  108. }
  109. }
  110. return
  111. }
  112. // TxReleaseSpecial 延时固定时间释放的任务,需要校验释放时的状态,时间,认领人等
  113. func (d *Dao) TxReleaseSpecial(tx *sql.Tx, mtime time.Time, state int8, taskid, uid int64) (rows int64, err error) {
  114. res, err := tx.Exec(_releaseSpecialSQL, taskid, mtime, state, uid)
  115. if err != nil {
  116. log.Error("tx.Exec(%s, %d, %v, %d, %d) error(%v)", _releaseSpecialSQL, taskid, mtime, state, uid, err)
  117. return
  118. }
  119. return res.RowsAffected()
  120. }