task.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package service
  2. import (
  3. "context"
  4. "runtime/debug"
  5. "go-common/app/job/main/ugcpay/dao"
  6. "go-common/app/job/main/ugcpay/model"
  7. xsql "go-common/library/database/sql"
  8. "go-common/library/log"
  9. "github.com/pkg/errors"
  10. )
  11. func (s *Service) wrapDisProc(tp TaskProcess) func() {
  12. return func() {
  13. defer func() {
  14. if x := recover(); x != nil {
  15. log.Error("task : %s, panic(%+v): %s", tp.Name(), x, debug.Stack())
  16. }
  17. }()
  18. var (
  19. ok bool
  20. err error
  21. )
  22. if ok, err = s.taskCreate(tp.Name(), tp.TTL()); err != nil {
  23. log.Info("s.taskCreate err: %+v", err)
  24. return
  25. }
  26. if !ok {
  27. log.Info("task : %s end, other task is running", tp.Name())
  28. return
  29. }
  30. defer func() {
  31. if err = s.taskDone(tp.Name()); err != nil {
  32. log.Error("task : %s, taskDone error: %+v", tp.Name(), err)
  33. }
  34. }()
  35. log.Info("task : %s, task start", tp.Name())
  36. if err = tp.Run(); err != nil {
  37. log.Error("task : %s end, error: %+v", tp.Name(), err)
  38. }
  39. }
  40. }
  41. // TaskProcess .
  42. type TaskProcess interface {
  43. Run() error // 运行任务
  44. TTL() int32 // 任务的最长生命周期
  45. Name() string // 任务名称
  46. }
  47. func (s *Service) taskCreate(task string, ttl int32) (ok bool, err error) {
  48. log.Info("task create: %s, ttl: %d", task, ttl)
  49. return s.dao.AddCacheTask(context.Background(), task, ttl)
  50. }
  51. func (s *Service) taskDone(task string) (err error) {
  52. // return s.dao.DelCacheTask(context.Background(), task)
  53. return
  54. }
  55. func checkOrCreateTaskFromLog(ctx context.Context, task TaskProcess, tl *taskLog, expectFN func(context.Context) (int64, error)) (finished bool, err error) {
  56. var (
  57. taskCreated bool
  58. expect int64
  59. )
  60. if taskCreated, finished = tl.checkTask(task); finished {
  61. log.Info("%s already finished", task.Name())
  62. return
  63. }
  64. if !taskCreated {
  65. if expect, err = expectFN(ctx); err != nil {
  66. return
  67. }
  68. if _, err = tl.createTask(ctx, task, expect); err != nil {
  69. return
  70. }
  71. }
  72. return
  73. }
  74. func runTXCASTaskWithLog(ctx context.Context, task TaskProcess, tl *taskLog, biz func(context.Context, *xsql.Tx) (bool, error)) (err error) {
  75. fn := func(ctx context.Context) (affected bool, err error) {
  76. affected = true
  77. tx, err := tl.d.BeginTran(ctx)
  78. if err != nil {
  79. return
  80. }
  81. if affected, err = biz(ctx, tx); err != nil {
  82. // 业务报错,不主动rollback
  83. return
  84. }
  85. if err = tl.recordTaskSuccess(ctx, tx, task); err != nil {
  86. tx.Rollback()
  87. return
  88. }
  89. err = tx.Commit()
  90. return
  91. }
  92. if err = runCAS(ctx, fn); err != nil {
  93. tl.recordTaskFailure(ctx, task)
  94. }
  95. return
  96. }
  97. type taskLog struct {
  98. d *dao.Dao
  99. }
  100. func (t *taskLog) createTask(ctx context.Context, task TaskProcess, expect int64) (logTask *model.LogTask, err error) {
  101. logTask = &model.LogTask{
  102. Name: task.Name(),
  103. Expect: expect,
  104. State: "created",
  105. }
  106. logTask.ID, err = t.d.InsertLogTask(ctx, logTask)
  107. return
  108. }
  109. func (t *taskLog) recordTaskSuccess(ctx context.Context, tx *xsql.Tx, task TaskProcess) (err error) {
  110. _, err = t.d.TXIncrLogTaskSuccess(ctx, tx, task.Name())
  111. if err != nil {
  112. err = errors.Wrapf(err, "taskLog recordTaskSuccess: %s", task.Name())
  113. }
  114. return
  115. }
  116. func (t *taskLog) recordTaskFailure(ctx context.Context, task TaskProcess) {
  117. _, err := t.d.IncrLogTaskFailure(ctx, task.Name())
  118. if err != nil {
  119. err = errors.Wrapf(err, "taskLog recordTaskFailure: %s", task.Name())
  120. log.Error("%+v", err)
  121. }
  122. }
  123. func (t *taskLog) checkTask(task TaskProcess) (created, finished bool) {
  124. data, err := t.d.LogTask(ctx, task.Name())
  125. if err != nil {
  126. return
  127. }
  128. if data == nil {
  129. return
  130. }
  131. log.Info("checkTask: %s, data: %+v", task.Name(), data)
  132. created = true
  133. if data.State == "success" {
  134. finished = true
  135. return
  136. }
  137. if data.Expect == data.Success {
  138. finished = true
  139. }
  140. return
  141. }