task.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package gorm
  2. import (
  3. "context"
  4. "database/sql"
  5. "go-common/app/admin/main/aegis/model/task"
  6. "go-common/library/log"
  7. "github.com/jinzhu/gorm"
  8. )
  9. const (
  10. _submitSQL = "UPDATE task SET state=?,uid=?,utime=? WHERE id=? AND state=? AND uid=?"
  11. )
  12. // TxSubmit 提交任务
  13. func (d *Dao) TxSubmit(tx *gorm.DB, opt *task.SubmitOptions, state int8) (rows int64, err error) {
  14. rows = tx.Exec(_submitSQL, state, opt.UID, opt.Utime, opt.TaskID, opt.OldState, opt.OldUID).RowsAffected
  15. return
  16. }
  17. // TxCloseTasks close
  18. func (d *Dao) TxCloseTasks(tx *gorm.DB, rids []int64, uid int64) (err error) {
  19. err = tx.Table("task").Where("rid IN (?) AND state<?", rids, task.TaskStateSubmit).Update("state", task.TaskStateClosed).Update("uid", uid).Error
  20. return
  21. }
  22. // CloseTask .
  23. func (d *Dao) CloseTask(c context.Context, id int64) (err error) {
  24. return d.orm.Table("task").Where("id=?", id).Update("state", task.TaskStateClosed).Update("uid", 399).Error
  25. }
  26. // TaskByRID task by rid
  27. func (d *Dao) TaskByRID(c context.Context, rid, flowid int64) (t *task.Task, err error) {
  28. db := d.orm.Model(&task.Task{}).Where("rid = ? AND state<?", rid, task.TaskStateSubmit)
  29. if flowid > 0 {
  30. db = db.Where("flow_id=?", flowid)
  31. }
  32. t = &task.Task{}
  33. if err = db.Find(t).Error; err == gorm.ErrRecordNotFound {
  34. err = nil
  35. t = nil
  36. }
  37. return
  38. }
  39. // MaxWeight max weight
  40. func (d *Dao) MaxWeight(c context.Context, bizID, flowID int64) (max int64, err error) {
  41. if err = d.orm.Table("task").Select("max(weight)").Where("business_id = ? AND flow_id = ?", bizID, flowID).
  42. Where("state = ? OR state = ?", task.TaskStateInit, task.TaskStateDispatch).Row().Scan(&max); err != nil {
  43. max = 0
  44. err = nil
  45. }
  46. return
  47. }
  48. // UndoStat 未完成
  49. func (d *Dao) UndoStat(c context.Context, bizID, flowID, UID int64) (stat *task.UnDOStat, err error) {
  50. stat = &task.UnDOStat{}
  51. err = d.orm.Raw(`SELECT COUNT(CASE WHEN admin_id>0 AND state = 0 THEN 1 ELSE NULL END) assign,
  52. COUNT(CASE WHEN admin_id = 0 AND state = 2 THEN 1 ELSE NULL END) delay,
  53. COUNT(CASE WHEN admin_id = 0 AND state = 1 THEN 1 ELSE NULL END) normal
  54. FROM task WHERE business_id=? AND flow_id=? AND uid=?`, bizID, flowID, UID).Scan(stat).Error
  55. return
  56. }
  57. // TaskStat 任务详情统计
  58. func (d *Dao) TaskStat(c context.Context, bizID, flowID, UID int64) (stat *task.Stat, err error) {
  59. stat = &task.Stat{}
  60. err = d.orm.Raw(`SELECT COUNT(CASE WHEN admin_id=0 AND state = 0 THEN 1 ELSE NULL END) normal,
  61. COUNT(CASE WHEN admin_id>0 AND state = 0 THEN 1 ELSE NULL END) assign,
  62. COUNT(CASE WHEN state=2 THEN 1 ELSE NULL END) delayTotal,
  63. COUNT(CASE WHEN uid=? AND state=2 THEN 1 ELSE NULL END) delayPersonal
  64. FROM task WHERE business_id=? AND flow_id=?`, UID, bizID, flowID).Scan(stat).Error
  65. return
  66. }
  67. // TaskListSeized 停滞任务
  68. func (d *Dao) TaskListSeized(c context.Context, opt *task.ListOptions) (ids []int64, count int64, err error) {
  69. return d.tasklist(c, "seized", opt.BusinessID, opt.FlowID, opt.UID, opt.Pn, opt.Ps)
  70. }
  71. // TaskListDelayd 延迟任务
  72. func (d *Dao) TaskListDelayd(c context.Context, opt *task.ListOptions) (ids []int64, count int64, err error) {
  73. return d.tasklist(c, "delayd", opt.BusinessID, opt.FlowID, opt.UID, opt.Pn, opt.Ps)
  74. }
  75. // TaskListAssignd 指派停滞任务
  76. func (d *Dao) TaskListAssignd(c context.Context, opt *task.ListOptions) (ids []int64, count int64, err error) {
  77. return d.tasklist(c, "assignd", opt.BusinessID, opt.FlowID, opt.UID, opt.Pn, opt.Ps)
  78. }
  79. func (d *Dao) tasklist(c context.Context, ltp string, bizID, flowID, UID int64, pn, ps int) (ids []int64, count int64, err error) {
  80. db := d.orm.Table("task").Where("business_id=? AND flow_id=?", bizID, flowID)
  81. switch ltp {
  82. case "seized":
  83. db = db.Where("state=?", task.TaskStateDispatch)
  84. case "delayd":
  85. db = db.Where("state=?", task.TaskStateDelay)
  86. case "assignd":
  87. db = db.Where("state=? AND admin_id>0", task.TaskStateDispatch)
  88. }
  89. if UID > 0 {
  90. db = db.Where("uid=?", UID)
  91. }
  92. var rows *sql.Rows
  93. rows, err = db.Count(&count).Select("id").Order("weight DESC").Offset((pn - 1) * ps).Limit(ps).Rows()
  94. if err != nil {
  95. log.Error("tasklist error(%v)", err)
  96. return
  97. }
  98. defer rows.Close()
  99. for rows.Next() {
  100. var id int64
  101. if err = rows.Scan(&id); err != nil {
  102. log.Error("tasklist error(%v)", err)
  103. return
  104. }
  105. ids = append(ids, id)
  106. }
  107. return
  108. }
  109. //TaskHitAuditing 检查资源是否正在审核
  110. func (d *Dao) TaskHitAuditing(c context.Context, rids []int64) (map[int64]struct{}, error) {
  111. hitids := make(map[int64]struct{})
  112. rows, err := d.orm.Table("task").Select("rid").Where("rid IN (?)", rids).
  113. Where("state = ? AND gtime!=0", task.TaskStateDispatch).Rows()
  114. if err != nil {
  115. return hitids, err
  116. }
  117. defer rows.Close()
  118. for rows.Next() {
  119. var id int64
  120. if err = rows.Scan(&id); err != nil {
  121. return hitids, err
  122. }
  123. hitids[id] = struct{}{}
  124. }
  125. return hitids, err
  126. }