task_orm.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package dao
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "time"
  7. "go-common/app/job/main/aegis/model"
  8. "go-common/library/log"
  9. "github.com/jinzhu/gorm"
  10. )
  11. const (
  12. _taskReleaseSQL = "update task SET admin_id=0,state=0,uid=0,gtime=0 where state=? AND mtime<=?"
  13. _taskClearSQL = "DELETE FROM task WHERE mtime<=? AND state>=3 LIMIT ?"
  14. )
  15. // TaskActiveConfigs list config
  16. func (d *Dao) TaskActiveConfigs(c context.Context) (configs []*model.TaskConfig, err error) {
  17. db := d.orm.Model(&model.TaskConfig{}).Where("state=?", model.ConfigStateOn)
  18. if err = db.Find(&configs).Error; err != nil {
  19. log.Error("query error(%v)", err)
  20. return
  21. }
  22. return
  23. }
  24. // TaskActiveConsumer list consumer
  25. func (d *Dao) TaskActiveConsumer(c context.Context) (consumerCache map[string]map[int64]struct{}, err error) {
  26. rows, err := d.orm.Table("task_consumer").Select("business_id,flow_id,uid").Where("state=?", model.ConsumerStateOn).Rows()
  27. if err != nil {
  28. return
  29. }
  30. defer rows.Close()
  31. consumerCache = make(map[string]map[int64]struct{})
  32. for rows.Next() {
  33. var bizID, flowID, UID int64
  34. if err = rows.Scan(&bizID, &flowID, &UID); err != nil {
  35. log.Error("rows.Scan error(%v)", err)
  36. continue
  37. }
  38. key := fmt.Sprintf("%d-%d", bizID, flowID)
  39. if _, ok := consumerCache[key]; ok {
  40. consumerCache[key][UID] = struct{}{}
  41. } else {
  42. consumerCache[key] = map[int64]struct{}{UID: {}}
  43. }
  44. }
  45. return
  46. }
  47. // KickOutConsumer 踢出用户
  48. func (d *Dao) KickOutConsumer(c context.Context, bizid, flowid, uid int64) (err error) {
  49. return d.orm.Table("task_consumer").Where("business_id=? AND flow_id=? AND uid=?", bizid, flowid, uid).
  50. Update("state", model.ConsumerStateOff).Error
  51. }
  52. // Resource .
  53. func (d *Dao) Resource(c context.Context, rid int64) (res *model.Resource, err error) {
  54. res = &model.Resource{}
  55. if err = d.orm.Where("id = ?", rid).First(res).Error; err == gorm.ErrRecordNotFound {
  56. res = nil
  57. err = nil
  58. }
  59. return
  60. }
  61. //RscState resource state
  62. func (d *Dao) RscState(c context.Context, rid int64) (state int64, err error) {
  63. err = d.orm.Table("resource_result").Select("state").Where("rid=?", rid).Row().Scan(&state)
  64. return
  65. }
  66. // TaskRelease .
  67. func (d *Dao) TaskRelease(c context.Context, mtime time.Time) (err error) {
  68. return d.orm.Exec(_taskReleaseSQL, model.TaskStateDispatch, mtime).Error
  69. }
  70. // ReleaseByConsumer .
  71. func (d *Dao) ReleaseByConsumer(c context.Context, bizid, flowid, uid int64) (err error) {
  72. return d.orm.Table("task").Where("business_id=? AND flow_id=? AND uid=? AND (state=1 or (admin_id>0 AND state=0))", bizid, flowid, uid).Update(
  73. map[string]interface{}{
  74. "uid": 0,
  75. "state": 0,
  76. "gtime": 0,
  77. "admin_id": 0,
  78. }).Error
  79. }
  80. //Report .
  81. func (d *Dao) Report(c context.Context, rt *model.Report) (err error) {
  82. return d.orm.Create(rt).Error
  83. }
  84. //TaskClear 已完成任务最多保留3天
  85. func (d *Dao) TaskClear(c context.Context, mtime time.Time, limit int64) (rows int64, err error) {
  86. db := d.orm.Exec(_taskClearSQL, mtime, limit)
  87. rows, err = db.RowsAffected, db.Error
  88. return
  89. }
  90. //CheckFlow 检查资源是否在对应流程上
  91. func (d *Dao) CheckFlow(c context.Context, rid, flowid int64) (ok bool, err error) {
  92. var id int64
  93. err = d.orm.Table("net_flow_resource").Select("id").
  94. Where("rid=? AND flow_id=? AND state!=-1", rid, flowid).Row().Scan(&id)
  95. if err != nil {
  96. if err == sql.ErrNoRows {
  97. err = nil
  98. } else {
  99. log.Error("CheckFlow(%d,%d) error(%v)", rid, flowid)
  100. }
  101. return
  102. }
  103. if id > 0 {
  104. ok = true
  105. }
  106. return
  107. }
  108. // CreateTask .
  109. func (d *Dao) CreateTask(c context.Context, task *model.Task) error {
  110. return d.orm.Table("task").Where("rid=? AND flow_id=? AND state<?", task.RID, task.FlowID, model.TaskStateSubmit).
  111. Assign(map[string]interface{}{
  112. "mid": task.MID,
  113. "fans": task.Fans,
  114. "group": task.Group,
  115. }).FirstOrCreate(task).Error
  116. }