command.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime/debug"
  6. "time"
  7. "go-common/app/tool/saga/conf"
  8. "go-common/app/tool/saga/dao"
  9. "go-common/app/tool/saga/model"
  10. "go-common/app/tool/saga/service/gitlab"
  11. "go-common/library/log"
  12. )
  13. // Command Command def.
  14. type Command struct {
  15. dao *dao.Dao
  16. gitlab *gitlab.Gitlab
  17. cmds map[string]cmdFunc
  18. }
  19. type cmdFunc func(ctx context.Context, event *model.HookComment, repo *model.Repo) (err error)
  20. // New ...
  21. func New(dao *dao.Dao, gitlab *gitlab.Gitlab) (c *Command) {
  22. c = &Command{
  23. dao: dao,
  24. gitlab: gitlab,
  25. cmds: make(map[string]cmdFunc),
  26. }
  27. return
  28. }
  29. // Exec ...
  30. func (c *Command) Exec(ctx context.Context, cmd string, event *model.HookComment, repo *model.Repo) (err error) {
  31. var (
  32. f cmdFunc
  33. ok bool
  34. projID = int(event.MergeRequest.SourceProjectID)
  35. mrIID = int(event.MergeRequest.IID)
  36. )
  37. if f, ok = c.cmds[cmd]; !ok {
  38. return
  39. }
  40. if err = f(ctx, event, repo); err != nil {
  41. c.gitlab.CreateMRNote(projID, mrIID, fmt.Sprintf("<pre>SAGA 异常:%+v %s</pre>", err, debug.Stack()))
  42. return
  43. }
  44. return
  45. }
  46. func (c *Command) register(cmd string, f cmdFunc) {
  47. c.cmds[cmd] = f
  48. }
  49. // Registers ...
  50. func (c *Command) Registers() {
  51. c.register(model.SagaCommandPlusOne, c.runPlusOne)
  52. c.register(model.SagaCommandMerge, c.runTryMerge)
  53. c.register(model.SagaCommandMerge1, c.runTryMerge)
  54. c.register(model.SagaCommandPlusOne1, c.runPlusOne)
  55. }
  56. // ListenTask ...
  57. func (c *Command) ListenTask() {
  58. var (
  59. ctx = context.TODO()
  60. err error
  61. t *time.Timer
  62. ok bool
  63. taskInfos []*model.TaskInfo
  64. )
  65. defer func() {
  66. if x := recover(); x != nil {
  67. log.Error("ListenTask: %+v %s", x, debug.Stack())
  68. }
  69. }()
  70. t = time.NewTimer(time.Duration(conf.Conf.Property.TaskInterval))
  71. for range t.C {
  72. if _, taskInfos, err = c.dao.MergeTasks(ctx, model.TaskStatusWaiting); err != nil {
  73. log.Error("request MergeTasks: %+v", err)
  74. continue
  75. }
  76. for _, taskInfo := range taskInfos {
  77. if ok, err = c.dao.TryLock(ctx, fmt.Sprintf(model.SagaRepoLockKey, int(taskInfo.Event.ProjectID)), model.SagaLockValue, int(taskInfo.Repo.Config.LockTimeout)); err != nil {
  78. log.Error("TryLock ProjectID: %d, MRIID: %d, err: %+v", taskInfo.Event.ProjectID, taskInfo.Event.MergeRequest.IID, err)
  79. continue
  80. }
  81. if !ok {
  82. log.Info("TryLock ok: %t, ProjectID: %d, MRIID: %d", ok, taskInfo.Event.ProjectID, taskInfo.Event.MergeRequest.IID)
  83. continue
  84. }
  85. log.Info("request MRIID: %d, MergeTasks:%+v", taskInfo.Event.MergeRequest.IID, taskInfo)
  86. go func(taskInfo *model.TaskInfo) {
  87. var (
  88. projID = int(taskInfo.Event.MergeRequest.SourceProjectID)
  89. mrIID = int(taskInfo.Event.MergeRequest.IID)
  90. branch = taskInfo.Event.MergeRequest.SourceBranch
  91. )
  92. if err = c.execMergeTask(taskInfo); err != nil {
  93. c.gitlab.UpdateMRNote(projID, mrIID, taskInfo.NoteID, fmt.Sprintf("<pre>SAGA 异常:%+v</pre>", err))
  94. if err = c.resetMergeStatus(projID, mrIID, branch, false); err != nil {
  95. log.Error("resetMergeStatus error: %+v", err)
  96. }
  97. log.Error("execMergeTask: %+v", err)
  98. }
  99. }(taskInfo)
  100. }
  101. t.Reset(time.Duration(conf.Conf.Property.TaskInterval))
  102. }
  103. }