task_dispatch.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package archive
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/videoup-report/model/task"
  6. "go-common/library/database/sql"
  7. "go-common/library/log"
  8. )
  9. const (
  10. _dispatchSQL = "SELECT id,state FROM task_dispatch WHERE aid=? AND cid=? ORDER BY id DESC"
  11. _inDispatchSQL = "INSERT INTO task_dispatch(pool,subject,adminid,aid,cid,uid,state,conf_id,conf_state,conf_weight,upspecial,cftime,ptime) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?)"
  12. _delDispatchSQL = "UPDATE task_dispatch SET state=? WHERE aid=? AND cid=? AND state!=?"
  13. _delDispatchByAidSQL = "UPDATE task_dispatch SET state=? WHERE aid=? AND state IN (?,?,?)"
  14. _delDispatchByTimeSQL = "DELETE FROM task_dispatch WHERE mtime>=? AND mtime<=? AND state in (2,6)"
  15. _taskIDforWeightSQL = "SELECT id FROM task_dispatch WHERE state=0 AND id>? ORDER BY id ASC limit 1000"
  16. _upTaskWeightSQL = "UPDATE task_dispatch set weight=?,uptime=now() where id=? and state=0"
  17. _upSpecialSQL = "UPDATE task_dispatch SET upspecial=? WHERE id=?"
  18. )
  19. // DispatchState get dipatch state.
  20. func (d *Dao) DispatchState(c context.Context, aid, cid int64) (id int64, state int8, err error) {
  21. row := d.db.QueryRow(c, _dispatchSQL, aid, cid)
  22. if err = row.Scan(&id, &state); err != nil {
  23. if err == sql.ErrNoRows {
  24. err = nil
  25. return
  26. }
  27. log.Error("row.Scan(%d) error(%v)", err)
  28. return
  29. }
  30. return
  31. }
  32. // AddDispatch add task dispatch
  33. func (d *Dao) AddDispatch(c context.Context, t *task.Task) (lastID int64, err error) {
  34. res, err := d.db.Exec(c, _inDispatchSQL, t.Pool, t.Subject, t.AdminID, t.Aid, t.Cid, t.UID, t.State,
  35. t.ConfigID, t.ConfigState, t.ConfigWeight, t.UPSpecial, t.CFtime, t.Ptime)
  36. if err != nil {
  37. log.Error("d.db.Exec(%s) error(%v)", _inDispatchSQL, err)
  38. return
  39. }
  40. return res.LastInsertId()
  41. }
  42. // DelDispatch del dispatch.
  43. func (d *Dao) DelDispatch(c context.Context, aid, cid int64) (rows int64, err error) {
  44. res, err := d.db.Exec(c, _delDispatchSQL, task.StateForTaskUserDeleted, aid, cid, task.StateForTaskUserDeleted)
  45. if err != nil {
  46. log.Error("d.db.Exec(%s, %d, %d) error(%v)", _delDispatchSQL, aid, cid, err)
  47. return
  48. }
  49. return res.RowsAffected()
  50. }
  51. // DelDispatchByAid del dispatch by aid.
  52. func (d *Dao) DelDispatchByAid(c context.Context, aid int64) (rows int64, err error) {
  53. res, err := d.db.Exec(c, _delDispatchByAidSQL, task.StateForTaskUserDeleted, aid, task.StateForTaskDefault, task.StateForTaskWork, task.StateForTaskDelay)
  54. if err != nil {
  55. log.Error("d.db.Exec(%s, %d, %d) error(%v)", _delDispatchByAidSQL, aid, err)
  56. return
  57. }
  58. return res.RowsAffected()
  59. }
  60. // TxDelDispatchByTime del dispatch by time segment
  61. func (d *Dao) TxDelDispatchByTime(c context.Context, tx *sql.Tx, startTime, endTime time.Time) (rows int64, err error) {
  62. res, err := tx.Exec(_delDispatchByTimeSQL, startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"))
  63. if err != nil {
  64. log.Error("tx.Exec(%s, %s, %s) error(%v)", _delDispatchByTimeSQL, startTime.Format("2006-01-02 15:04:05"), endTime.Format("2006-01-02 15:04:05"), err)
  65. return
  66. }
  67. return res.RowsAffected()
  68. }
  69. // TaskIDforWeight 获取需要更新权重的任务id(用于给redis批量读取)
  70. func (d *Dao) TaskIDforWeight(c context.Context, lastid int64) (ids []int64, last int64, err error) {
  71. rows, err := d.db.Query(c, _taskIDforWeightSQL, lastid) //获取一批待审核任务
  72. if err != nil {
  73. log.Error("d.db.Query(%s, %d) error(%v)", _taskIDforWeightSQL, lastid, err)
  74. return
  75. }
  76. defer rows.Close()
  77. for rows.Next() {
  78. var id int64
  79. if err = rows.Scan(&id); err != nil {
  80. log.Error("rows.Scan error(%v)", err)
  81. return
  82. }
  83. ids = append(ids, id)
  84. last = id
  85. }
  86. return
  87. }
  88. // UpTaskWeight 更新单条权重
  89. func (d *Dao) UpTaskWeight(c context.Context, taskid int64, weight int64) (rows int64, err error) {
  90. res, err := d.db.Exec(c, _upTaskWeightSQL, weight, taskid)
  91. if err != nil {
  92. log.Error("d.db.Exec(%s,%d,%d) error(%v)", _upTaskWeightSQL, weight, taskid, err)
  93. return
  94. }
  95. return res.RowsAffected()
  96. }
  97. // SetUpSpecial 更新单条权重
  98. func (d *Dao) SetUpSpecial(c context.Context, taskid int64, special int8) (rows int64, err error) {
  99. res, err := d.db.Exec(c, _upSpecialSQL, special, taskid)
  100. if err != nil {
  101. log.Error("d.db.Exec(%s,%d,%d) error(%v)", _upSpecialSQL, special, taskid, err)
  102. return
  103. }
  104. return res.RowsAffected()
  105. }