task.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package archive
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/job/main/videoup-report/model/archive"
  6. "go-common/library/database/sql"
  7. "go-common/library/log"
  8. )
  9. const (
  10. // task
  11. _taskByMtimeSQL = "SELECT id,state,ctime,mtime FROM task_dispatch WHERE mtime>? and ptime=0"
  12. _taskDoneByMtimeSQL = "SELECT id,state,ctime,mtime FROM task_dispatch_done WHERE mtime>? and ptime=0"
  13. _taskByUntreatedSQL = "SELECT id,state,ctime,mtime FROM task_dispatch WHERE (state=0 OR state=1) and ptime=0"
  14. // task took in and sel
  15. _addTaskTookSQL = "INSERT INTO task_dispatch_took(m50,m60,m80,m90,type,ctime,mtime) VALUE(?,?,?,?,?,?,?)"
  16. _taskTooksSQL = "SELECT id,m50,m60,m80,m90,type,ctime,mtime FROM task_dispatch_took WHERE type=1 AND ctime>?"
  17. _taskTookByHalfHourSQL = "SELECT id,m50,m60,m80,m90,type,ctime,mtime FROM task_dispatch_took WHERE type=2 ORDER BY ctime DESC LIMIT 1"
  18. _taskTooksByHalfHourSQL = "SELECT id,m50,m60,m80,m90,type,ctime,mtime FROM task_dispatch_took WHERE type=2 AND ctime>=? AND ctime<=? ORDER BY ctime ASC"
  19. )
  20. // TaskByMtime gets to took the task by mtime
  21. func (d *Dao) TaskByMtime(c context.Context, stime time.Time) (tasks []*archive.Task, err error) {
  22. rows, err := d.db.Query(c, _taskByMtimeSQL, stime)
  23. if err != nil {
  24. log.Error("d.taskStmt.Query(%v) error(%v)", stime, err)
  25. return
  26. }
  27. defer rows.Close()
  28. for rows.Next() {
  29. task := &archive.Task{}
  30. if err = rows.Scan(&task.ID, &task.State, &task.Ctime, &task.Mtime); err != nil {
  31. log.Error("rows.Scan error(%v)", err)
  32. return
  33. }
  34. tasks = append(tasks, task)
  35. }
  36. return
  37. }
  38. // TaskDoneByMtime gets to took the task done by mtime
  39. func (d *Dao) TaskDoneByMtime(c context.Context, stime time.Time) (tasks []*archive.Task, err error) {
  40. rows, err := d.db.Query(c, _taskDoneByMtimeSQL, stime)
  41. if err != nil {
  42. log.Error("d.taskStmt.Query(%v) error(%v)", stime, err)
  43. return
  44. }
  45. defer rows.Close()
  46. for rows.Next() {
  47. task := &archive.Task{}
  48. if err = rows.Scan(&task.ID, &task.State, &task.Ctime, &task.Mtime); err != nil {
  49. log.Error("rows.Scan error(%v)", err)
  50. return
  51. }
  52. tasks = append(tasks, task)
  53. }
  54. return
  55. }
  56. // TaskByUntreated gets to took the task by untreated
  57. func (d *Dao) TaskByUntreated(c context.Context) (tasks []*archive.Task, err error) {
  58. rows, err := d.db.Query(c, _taskByUntreatedSQL)
  59. if err != nil {
  60. log.Error("d.taskStmt.Query error(%v)", err)
  61. return
  62. }
  63. defer rows.Close()
  64. for rows.Next() {
  65. task := &archive.Task{}
  66. if err = rows.Scan(&task.ID, &task.State, &task.Ctime, &task.Mtime); err != nil {
  67. log.Error("rows.Scan error(%v)", err)
  68. return
  69. }
  70. tasks = append(tasks, task)
  71. }
  72. return
  73. }
  74. // AddTaskTook add TaskTook
  75. func (d *Dao) AddTaskTook(c context.Context, took *archive.TaskTook) (lastID int64, err error) {
  76. res, err := d.db.Exec(c, _addTaskTookSQL, took.M50, took.M60, took.M80, took.M90, took.TypeID, took.Ctime, took.Mtime)
  77. if err != nil {
  78. log.Error("d.TaskTookAddStmt.Exec error(%v)", err)
  79. return
  80. }
  81. lastID, err = res.LastInsertId()
  82. return
  83. }
  84. // TaskTooks gets TaskTook by ctime
  85. func (d *Dao) TaskTooks(c context.Context, stime time.Time) (tooks []*archive.TaskTook, err error) {
  86. rows, err := d.db.Query(c, _taskTooksSQL, stime)
  87. if err != nil {
  88. log.Error("d.TaskTookStmt.Query() error(%v)", err)
  89. return
  90. }
  91. defer rows.Close()
  92. for rows.Next() {
  93. took := &archive.TaskTook{}
  94. if err = rows.Scan(&took.ID, &took.M50, &took.M60, &took.M80, &took.M90, &took.TypeID, &took.Ctime, &took.Mtime); err != nil {
  95. log.Error("rows.Scan error(%v)", err)
  96. return
  97. }
  98. tooks = append(tooks, took)
  99. }
  100. return
  101. }
  102. // TaskTookByHalfHour get TaskTook by half hour
  103. func (d *Dao) TaskTookByHalfHour(c context.Context) (took *archive.TaskTook, err error) {
  104. row := d.db.QueryRow(c, _taskTookByHalfHourSQL)
  105. took = &archive.TaskTook{}
  106. if err = row.Scan(&took.ID, &took.M50, &took.M60, &took.M80, &took.M90, &took.TypeID, &took.Ctime, &took.Mtime); err != nil {
  107. if err == sql.ErrNoRows {
  108. took = nil
  109. err = nil
  110. } else {
  111. log.Error("row.Scan error(%v)", err)
  112. }
  113. }
  114. return
  115. }
  116. // TaskTooksByHalfHour get TaskTooks by half hour
  117. func (d *Dao) TaskTooksByHalfHour(c context.Context, stime time.Time, etime time.Time) (tooks []*archive.TaskTook, err error) {
  118. rows, err := d.db.Query(c, _taskTooksByHalfHourSQL, stime, etime)
  119. if err != nil {
  120. log.Error("d.TaskTooksByHalfHour.Query(%v,%v) error(%v)", stime, etime, err)
  121. return
  122. }
  123. defer rows.Close()
  124. for rows.Next() {
  125. took := &archive.TaskTook{}
  126. if err = rows.Scan(&took.ID, &took.M50, &took.M60, &took.M80, &took.M90, &took.TypeID, &took.Ctime, &took.Mtime); err != nil {
  127. log.Error("rows.Scan error(%v)", err)
  128. return
  129. }
  130. tooks = append(tooks, took)
  131. }
  132. return
  133. }