task_status.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "go-common/app/job/main/growup/dao"
  7. "go-common/app/job/main/growup/dao/dataplatform"
  8. )
  9. const (
  10. // TaskAvCharge .
  11. TaskAvCharge = iota + 1
  12. // TaskCmCharge .
  13. TaskCmCharge
  14. // TaskTagRatio .
  15. TaskTagRatio
  16. // TaskBubbleMeta .
  17. TaskBubbleMeta
  18. // TaskBlacklist .
  19. TaskBlacklist
  20. // TaskCreativeIncome .
  21. TaskCreativeIncome
  22. // TaskCreativeStatis .
  23. TaskCreativeStatis
  24. // TaskBgmSync .
  25. TaskBgmSync
  26. // TaskTagIncome .
  27. TaskTagIncome
  28. // TaskCreativeCharge .
  29. TaskCreativeCharge
  30. // TaskBudget .
  31. TaskBudget
  32. // TaskSnapshotBubbleIncome .
  33. TaskSnapshotBubbleIncome
  34. )
  35. const (
  36. _taskSuccess = 1
  37. _taskFail = 2
  38. )
  39. var taskSvr *taskService
  40. type taskService struct {
  41. dao *dao.Dao
  42. dp *dataplatform.Dao
  43. }
  44. // GetTaskService get task service
  45. func GetTaskService() *taskService {
  46. return taskSvr
  47. }
  48. // TaskReady is task ready
  49. func (s *taskService) TaskReady(c context.Context, date string, typs ...int) (err error) {
  50. t, err := time.ParseInLocation("2006-01-02", date, time.Local)
  51. if err != nil {
  52. return
  53. }
  54. ok, err := s.checkBasicStatus(c, t)
  55. if err != nil {
  56. return
  57. }
  58. if !ok {
  59. err = fmt.Errorf("basic task not ready yet: %s", date)
  60. return
  61. }
  62. for _, typ := range typs {
  63. var status int
  64. status, err = s.dao.TaskStatus(c, date, typ)
  65. if err != nil {
  66. return
  67. }
  68. if status != 1 {
  69. err = fmt.Errorf("task(%d) not ready yet: %s", typ, date)
  70. return
  71. }
  72. }
  73. return
  74. }
  75. func (s *taskService) setTaskSuccess(c context.Context, typ int, date, message string) (rows int64, err error) {
  76. return s.dao.InsertTaskStatus(c, typ, _taskSuccess, date, message)
  77. }
  78. func (s *taskService) setTaskFail(c context.Context, typ int, date, message string) (rows int64, err error) {
  79. return s.dao.InsertTaskStatus(c, typ, _taskFail, date, message)
  80. }
  81. // SetTaskStatus set task status by error
  82. func (s *taskService) SetTaskStatus(c context.Context, typ int, date string, err error) (int64, error) {
  83. if err != nil {
  84. return s.setTaskFail(c, typ, date, err.Error())
  85. }
  86. return s.setTaskSuccess(c, typ, date, "success")
  87. }
  88. // checkBasicStatus check basic date status
  89. func (s *taskService) checkBasicStatus(c context.Context, date time.Time) (ok bool, err error) {
  90. return s.dp.SendBasicDataRequest(c, fmt.Sprintf("{\"select\": [], \"where\":{\"job_name\":{\"in\":[\"ucs_%s\"]}}}", date.Format("20060102")))
  91. }
  92. // UpdateTaskStatus update task status
  93. func (s *Service) UpdateTaskStatus(c context.Context, date string, typ int, status int) (err error) {
  94. _, err = s.dao.UpdateTaskStatus(c, date, typ, status)
  95. return
  96. }