task_dispatch_extend.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package archive
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "time"
  7. "go-common/app/job/main/videoup-report/model/task"
  8. xsql "go-common/library/database/sql"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _getTaskWeight = "SELECT t.id,t.state,a.mid,t.ctime,t.upspecial,t.ptime,e.description FROM `task_dispatch` AS t " +
  13. "LEFT JOIN `task_dispatch_extend` AS e ON t.id=e.task_id INNER JOIN archive as a ON a.id=t.aid WHERE t.state=0 AND t.id>? LIMIT 1000"
  14. _inDispatchExtendSQL = "INSERT INTO task_dispatch_extend(task_id,description) VALUE (?,?)"
  15. _delTaskExtendSQL = "DELETE FROM task_dispatch_extend WHERE mtime < ? LIMIT ?"
  16. )
  17. // GetTaskWeight 从数据库读取权重配置
  18. func (d *Dao) GetTaskWeight(c context.Context, lastid int64) (mcases map[int64]*task.WeightParams, err error) {
  19. var (
  20. rows *xsql.Rows
  21. desc sql.NullString
  22. )
  23. if rows, err = d.db.Query(c, _getTaskWeight, lastid); err != nil {
  24. log.Error("d.db.Query(%s, %d) error(%v)", _getTaskWeight, lastid, err)
  25. return
  26. }
  27. defer rows.Close()
  28. mcases = make(map[int64]*task.WeightParams)
  29. for rows.Next() {
  30. tp := new(task.WeightParams)
  31. if err = rows.Scan(&tp.TaskID, &tp.State, &tp.Mid, &tp.Ctime, &tp.Special, &tp.Ptime, &desc); err != nil {
  32. log.Error("rows.Scan error(%v)", err)
  33. return
  34. }
  35. if desc.Valid && len(desc.String) > 0 {
  36. arr := []*task.ConfigItem{}
  37. if err = json.Unmarshal([]byte(desc.String), &arr); err != nil {
  38. arr = nil
  39. log.Error("json.Unmarshal error(%v)", err)
  40. continue
  41. }
  42. tp.CfItems = arr
  43. }
  44. mcases[tp.TaskID] = tp
  45. }
  46. return
  47. }
  48. // InDispatchExtend 扩展表,记录权重配置信息
  49. func (d *Dao) InDispatchExtend(c context.Context, taskid int64, desc string) (lastid int64, err error) {
  50. res, err := d.db.Exec(c, _inDispatchExtendSQL, taskid, desc)
  51. if err != nil {
  52. log.Error("tx.Exec(%s, %d, %v) error(%v)", _inDispatchExtendSQL, taskid, desc, err)
  53. return
  54. }
  55. return res.LastInsertId()
  56. }
  57. // DelTaskExtend del task_dispatch_extend
  58. func (d *Dao) DelTaskExtend(c context.Context, before time.Time, limit int64) (rows int64, err error) {
  59. res, err := d.db.Exec(c, _delTaskExtendSQL, before.Format("2006-01-02 15:04:05"), limit)
  60. if err != nil {
  61. log.Error("d.db.Exec(%s, %s, %d) error(%v)", _delTaskExtendSQL, before, limit, err)
  62. return
  63. }
  64. return res.RowsAffected()
  65. }