merge.go 16 KB


  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime/debug"
  6. "go-common/app/tool/saga/model"
  7. "go-common/app/tool/saga/service/notification"
  8. "go-common/library/log"
  9. ggitlab "github.com/xanzy/go-gitlab"
  10. )
  11. func (c *Command) runTryMerge(ctx context.Context, event *model.HookComment, repo *model.Repo) (err error) {
  12. var (
  13. ok bool
  14. canMerge bool
  15. projID = int(event.MergeRequest.SourceProjectID)
  16. mrIID = int(event.MergeRequest.IID)
  17. wip = event.MergeRequest.WorkInProgress
  18. noteID int
  19. taskInfo = &model.TaskInfo{
  20. Event: event,
  21. Repo: repo,
  22. }
  23. )
  24. log.Info("runTryMerge start ... MRIID: %d, Repo Config: %+v", mrIID, repo.Config)
  25. if ok, err = c.dao.ExistMRIID(ctx, mrIID); err != nil || ok {
  26. return
  27. }
  28. if noteID, err = c.gitlab.CreateMRNote(projID, mrIID, "<pre>SAGA 开始执行,请大佬稍后......</pre>"); err != nil {
  29. return
  30. }
  31. taskInfo.NoteID = noteID
  32. // 1, check wip
  33. if wip {
  34. c.gitlab.UpdateMRNote(projID, mrIID, noteID, "<pre>警告:当前MR处于WIP状态,请待开发结束后再merge!</pre>")
  35. return
  36. }
  37. // 2, check labels
  38. if ok, err = c.checkLabels(projID, mrIID, noteID, repo); err != nil || !ok {
  39. return
  40. }
  41. // 3, check merge status
  42. if canMerge, err = c.checkMergeStatus(projID, mrIID, noteID); err != nil || !canMerge {
  43. return
  44. }
  45. // 4, check pipeline status
  46. if repo.Config.RelatePipeline {
  47. if repo.Config.DelayMerge {
  48. if ok, _, err = c.checkPipeline(projID, mrIID, noteID, 0, model.QueryProcessing); err != nil || !ok {
  49. return
  50. }
  51. } else {
  52. if ok, _, err = c.checkPipeline(projID, mrIID, noteID, 0, model.QuerySuccess); err != nil || !ok {
  53. return
  54. }
  55. }
  56. }
  57. // 5, check path auth
  58. if ok, err = c.checkAllPathAuth(taskInfo); err != nil || !ok {
  59. return
  60. }
  61. // 6, show current mr queue info
  62. c.showMRQueueInfo(ctx, taskInfo)
  63. if err = c.dao.PushMergeTask(ctx, model.TaskStatusWaiting, taskInfo); err != nil {
  64. return
  65. }
  66. if err = c.dao.AddMRIID(ctx, mrIID, int(repo.Config.LockTimeout)); err != nil {
  67. return
  68. }
  69. log.Info("runTryMerge merge task 已加入 waiting 任务列队中... MRIID: %d", mrIID)
  70. return
  71. }
  72. func (c *Command) execMergeTask(taskInfo *model.TaskInfo) (err error) {
  73. var (
  74. ctx = context.TODO()
  75. projID = int(taskInfo.Event.MergeRequest.SourceProjectID)
  76. mrIID = int(taskInfo.Event.MergeRequest.IID)
  77. sourceBranch = taskInfo.Event.MergeRequest.SourceBranch
  78. pipeline = &ggitlab.Pipeline{}
  79. noteID = taskInfo.NoteID
  80. mergeInfo = &model.MergeInfo{
  81. ProjID: projID,
  82. MRIID: mrIID,
  83. URL: taskInfo.Event.ObjectAttributes.URL,
  84. AuthBranches: taskInfo.Repo.Config.AuthBranches,
  85. SourceBranch: taskInfo.Event.MergeRequest.SourceBranch,
  86. TargetBranch: taskInfo.Event.MergeRequest.TargetBranch,
  87. AuthorID: int(taskInfo.Event.MergeRequest.AuthorID),
  88. UserName: taskInfo.Event.User.UserName,
  89. MinReviewer: taskInfo.Repo.Config.MinReviewer,
  90. LockTimeout: taskInfo.Repo.Config.LockTimeout,
  91. Title: taskInfo.Event.MergeRequest.Title,
  92. Description: taskInfo.Event.MergeRequest.Description,
  93. }
  94. )
  95. mergeInfo.NoteID = noteID
  96. // 从等待任务列队移除
  97. if err = c.dao.DeleteMergeTask(ctx, model.TaskStatusWaiting, taskInfo); err != nil {
  98. return
  99. }
  100. // 加入到正在执行任务列队
  101. if err = c.dao.PushMergeTask(ctx, model.TaskStatusRunning, taskInfo); err != nil {
  102. return
  103. }
  104. if taskInfo.Repo.Config.RelatePipeline {
  105. if taskInfo.Repo.Config.DelayMerge {
  106. if err = c.HookDelayMerge(projID, sourceBranch, mergeInfo); err != nil {
  107. return
  108. }
  109. return
  110. }
  111. if err = c.gitlab.UpdateMRNote(projID, mrIID, noteID, "<pre>SAGA 提示:为了保证合进主干后能正常编译,正在重跑pipeline,等待时间取决于pipeline运行时间!请耐心等待!</pre>"); err != nil {
  112. return
  113. }
  114. if pipeline, err = c.retryPipeline(taskInfo.Event); err != nil {
  115. return
  116. }
  117. mergeInfo.PipelineID = pipeline.ID
  118. if err = c.dao.SetMergeInfo(ctx, projID, sourceBranch, mergeInfo); err != nil {
  119. return
  120. }
  121. } else {
  122. if err = c.HookMerge(projID, sourceBranch, mergeInfo); err != nil {
  123. return
  124. }
  125. }
  126. return
  127. }
  128. func (c *Command) retryPipeline(event *model.HookComment) (pipeline *ggitlab.Pipeline, err error) {
  129. var (
  130. trigger *ggitlab.PipelineTrigger
  131. triggers []*ggitlab.PipelineTrigger
  132. projID = int(event.MergeRequest.SourceProjectID)
  133. sourceBranch = event.MergeRequest.SourceBranch
  134. )
  135. if triggers, err = c.gitlab.Triggers(projID); err != nil {
  136. return
  137. }
  138. if len(triggers) == 0 {
  139. log.Info("No triggers were found for project %d, try to create it now.", projID)
  140. if trigger, err = c.gitlab.CreateTrigger(projID); err != nil {
  141. return
  142. }
  143. triggers = []*ggitlab.PipelineTrigger{trigger}
  144. }
  145. trigger = triggers[0]
  146. if trigger.Owner == nil || trigger.Owner.ID == 0 {
  147. log.Info("Legacy trigger (without owner), take ownership now.")
  148. if trigger, err = c.gitlab.TakeOwnership(projID, trigger.ID); err != nil {
  149. return
  150. }
  151. }
  152. if pipeline, err = c.gitlab.TriggerPipeline(projID, sourceBranch, trigger.Token); err != nil {
  153. return
  154. }
  155. return
  156. }
  157. // HookPipeline ...
  158. func (c *Command) HookPipeline(projID int, branch string, pipelineID int) (err error) {
  159. var (
  160. ok bool
  161. canMerge bool
  162. mergeInfo *model.MergeInfo
  163. )
  164. defer func() {
  165. if x := recover(); x != nil {
  166. log.Error("HookPipeline: %+v %s", x, debug.Stack())
  167. }
  168. }()
  169. if ok, mergeInfo, err = c.dao.MergeInfo(context.TODO(), projID, branch); err != nil || !ok {
  170. return
  171. }
  172. log.Info("HookPipeline projID: %d, MRIID: %d, branch: %s, pipelineId: %d", projID, mergeInfo.MRIID, branch, mergeInfo.PipelineID)
  173. if pipelineID < mergeInfo.PipelineID {
  174. return
  175. }
  176. defer func() {
  177. if err = c.resetMergeStatus(projID, mergeInfo.MRIID, branch, true); err != nil {
  178. log.Error("resetMergeStatus MRIID: %d, error: %+v", mergeInfo.MRIID, err)
  179. }
  180. }()
  181. // 1, check pipeline id
  182. if ok, _, err = c.checkPipeline(projID, mergeInfo.MRIID, mergeInfo.NoteID, mergeInfo.PipelineID, model.QueryID); err != nil || !ok {
  183. return
  184. }
  185. // 2, check pipeline status
  186. if ok, _, err = c.checkPipeline(projID, mergeInfo.MRIID, mergeInfo.NoteID, 0, model.QuerySuccess); err != nil || !ok {
  187. return
  188. }
  189. // 3, check merge status
  190. if canMerge, err = c.checkMergeStatus(projID, mergeInfo.MRIID, mergeInfo.NoteID); err != nil || !canMerge {
  191. return
  192. }
  193. log.Info("HookPipeline acceptMerge ... MRIID: %d", mergeInfo.MRIID)
  194. if ok, err = c.acceptMerge(mergeInfo); err != nil || !ok {
  195. return
  196. }
  197. return
  198. }
  199. // HookMerge ...
  200. func (c *Command) HookMerge(projID int, branch string, mergeInfo *model.MergeInfo) (err error) {
  201. var (
  202. ok bool
  203. canMerge bool
  204. )
  205. defer func() {
  206. if x := recover(); x != nil {
  207. log.Error("HookMerge: %+v %s", x, debug.Stack())
  208. }
  209. }()
  210. defer func() {
  211. if err = c.resetMergeStatus(projID, mergeInfo.MRIID, branch, true); err != nil {
  212. log.Error("resetMergeStatus MRIID: %d, error: %+v", mergeInfo.MRIID, err)
  213. }
  214. }()
  215. log.Info("HookMerge projID: %d, MRIID: %d, branch: %s", projID, mergeInfo.MRIID, branch)
  216. if canMerge, err = c.checkMergeStatus(projID, mergeInfo.MRIID, mergeInfo.NoteID); err != nil || !canMerge {
  217. return
  218. }
  219. log.Info("HookMerge acceptMerge ... MRIID: %d", mergeInfo.MRIID)
  220. if ok, err = c.acceptMerge(mergeInfo); err != nil || !ok {
  221. return
  222. }
  223. return
  224. }
  225. // HookDelayMerge ...
  226. func (c *Command) HookDelayMerge(projID int, branch string, mergeInfo *model.MergeInfo) (err error) {
  227. var (
  228. ctx = context.TODO()
  229. ok bool
  230. noteID = mergeInfo.NoteID
  231. mrIID = mergeInfo.MRIID
  232. pipelineID int
  233. status string
  234. )
  235. defer func() {
  236. if x := recover(); x != nil {
  237. log.Error("HookDelayMerge: %+v %s", x, debug.Stack())
  238. }
  239. }()
  240. //if ok, pipelineID, err = c.checkPipeline(projID, mrIID, noteID, 0, model.QuerySuccessRmNote); err != nil {
  241. //return
  242. //}
  243. if pipelineID, status, err = c.gitlab.MRPipelineStatus(projID, mrIID); err != nil {
  244. return
  245. }
  246. if status == model.PipelineSuccess || status == model.PipelineSkipped {
  247. ok = true
  248. } else if status != model.PipelineRunning && status != model.PipelinePending {
  249. comment := fmt.Sprintf("<pre>警告:pipeline状态异常,请确保pipeline状态正常后再执行merge操作!</pre>")
  250. err = c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  251. return
  252. }
  253. log.Info("HookDelayMerge projID: %d, MRIID: %d, branch: %s, pipeline status: %t", projID, mergeInfo.MRIID, branch, ok)
  254. if ok {
  255. if err = c.HookMerge(projID, branch, mergeInfo); err != nil {
  256. return
  257. }
  258. } else {
  259. mergeInfo.PipelineID = pipelineID
  260. if err = c.dao.SetMergeInfo(ctx, projID, branch, mergeInfo); err != nil {
  261. return
  262. }
  263. }
  264. return
  265. }
  266. func (c *Command) acceptMerge(mergeInfo *model.MergeInfo) (ok bool, err error) {
  267. var (
  268. comment string
  269. author string
  270. canMerge bool
  271. state string
  272. authorID = mergeInfo.AuthorID
  273. username = mergeInfo.UserName
  274. projID = mergeInfo.ProjID
  275. mrIID = mergeInfo.MRIID
  276. url = mergeInfo.URL
  277. sourceBranch = mergeInfo.SourceBranch
  278. targetBranch = mergeInfo.TargetBranch
  279. noteID = mergeInfo.NoteID
  280. content = mergeInfo.Title
  281. )
  282. if author, err = c.gitlab.UserName(authorID); err != nil {
  283. return
  284. }
  285. if canMerge, err = c.checkMergeStatus(projID, mrIID, noteID); err != nil {
  286. return
  287. }
  288. if !canMerge {
  289. go notification.WechatAuthor(c.dao, author, url, sourceBranch, targetBranch, comment)
  290. return
  291. }
  292. if len(mergeInfo.Description) > 0 {
  293. content = content + "\n\n" + mergeInfo.Description
  294. }
  295. mergeMSG := fmt.Sprintf("Merge branch [%s] into [%s] by [%s]\n%s", sourceBranch, targetBranch, username, content)
  296. if state, err = c.gitlab.AcceptMR(projID, mrIID, mergeMSG); err != nil || state != model.MRStateMerged {
  297. if err != nil {
  298. comment = fmt.Sprintf("<pre>[%s]尝试合并失败,当前状态不允许合并,请查看上方merge按钮旁的提示!</pre>", username)
  299. } else {
  300. comment = fmt.Sprintf("<pre>[%s]尝试合并失败,请检查当前状态或同步目标分支代码后再试!</pre>", username)
  301. }
  302. go notification.WechatAuthor(c.dao, author, url, sourceBranch, targetBranch, comment)
  303. c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  304. return
  305. }
  306. ok = true
  307. comment = fmt.Sprintf("<pre>[%s]尝试合并成功!</pre>", username)
  308. go notification.WechatAuthor(c.dao, author, url, sourceBranch, targetBranch, comment)
  309. c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  310. return
  311. }
  312. func (c *Command) resetMergeStatus(projID int, MRIID int, branch string, taskRunning bool) (err error) {
  313. var (
  314. ctx = context.TODO()
  315. )
  316. log.Info("resetMergeStatus projID: %d, MRIID: %d start", projID, MRIID)
  317. if err = c.dao.UnLock(ctx, fmt.Sprintf(model.SagaRepoLockKey, projID)); err != nil {
  318. log.Error("UnLock error: %+v", err)
  319. }
  320. if err = c.dao.DeleteMergeInfo(ctx, projID, branch); err != nil {
  321. log.Error("DeleteMergeInfo error: %+v", err)
  322. }
  323. if err = c.dao.DeleteMRIID(ctx, MRIID); err != nil {
  324. log.Error("Delete MRIID :%d, error: %+v", MRIID, err)
  325. }
  326. if taskRunning {
  327. if err = c.DeleteRunningTask(projID, MRIID); err != nil {
  328. log.Error("DeleteRunningTask: %+v", err)
  329. }
  330. }
  331. log.Info("resetMergeStatus projID: %d, MRIID: %d end!", projID, MRIID)
  332. return
  333. }
  334. // DeleteRunningTask ...
  335. func (c *Command) DeleteRunningTask(projID int, mrID int) (err error) {
  336. var (
  337. ctx = context.TODO()
  338. taskInfos []*model.TaskInfo
  339. )
  340. if _, taskInfos, err = c.dao.MergeTasks(ctx, model.TaskStatusRunning); err != nil {
  341. return
  342. }
  343. for _, taskInfo := range taskInfos {
  344. pID := int(taskInfo.Event.MergeRequest.SourceProjectID)
  345. mID := int(taskInfo.Event.MergeRequest.IID)
  346. if pID == projID && mID == mrID {
  347. // 从正在运行的任务列队中移除
  348. err = c.dao.DeleteMergeTask(ctx, model.TaskStatusRunning, taskInfo)
  349. return
  350. }
  351. }
  352. return
  353. }
  354. func (c *Command) checkMergeStatus(projID int, mrIID int, noteID int) (canMerge bool, err error) {
  355. var (
  356. wip bool
  357. state string
  358. status string
  359. comment string
  360. )
  361. if wip, state, status, err = c.gitlab.MergeStatus(projID, mrIID); err != nil {
  362. return
  363. }
  364. if wip {
  365. comment = "<pre>SAGA 尝试合并失败,当前MR是一项正在进行的工作!若已完成请先点击“Resolve WIP status”按钮处理后再+merge!</pre>"
  366. } else if state != model.MergeStateOpened {
  367. comment = "<pre>SAGA 尝试合并失败,当前MR已经关闭或者已经合并!</pre>"
  368. } else if status != model.MergeStatusOk {
  369. comment = "<pre>SAGA 尝试合并失败,请先解决合并冲突!</pre>"
  370. } else {
  371. canMerge = true
  372. }
  373. if len(comment) > 0 {
  374. c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  375. }
  376. return
  377. }
  378. // checkLabels ios or android need checkout label when release app stage
  379. func (c *Command) checkLabels(projID int, mrIID int, noteID int, repo *model.Repo) (ok bool, err error) {
  380. var (
  381. labels []string
  382. comment = fmt.Sprintf("<pre>警告:SAGA 无法执行+merge,发版阶段只允许合入指定label的MR!</pre>")
  383. )
  384. if len(repo.Config.AllowLabel) <= 0 {
  385. ok = true
  386. return
  387. }
  388. if labels, err = c.gitlab.MergeLabels(projID, mrIID); err != nil {
  389. return
  390. }
  391. log.Info("checkMrLabels MRIID: %d, labels: %+v", mrIID, labels)
  392. for _, label := range labels {
  393. if label == repo.Config.AllowLabel {
  394. ok = true
  395. return
  396. }
  397. }
  398. if err = c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment); err != nil {
  399. return
  400. }
  401. return
  402. }
  403. func (c *Command) checkPipeline(projID int, mrIID int, noteID int, lastPipelineID int, queryStatus model.QueryStatus) (ok bool, pipelineID int, err error) {
  404. var status string
  405. if pipelineID, status, err = c.gitlab.MRPipelineStatus(projID, mrIID); err != nil {
  406. return
  407. }
  408. log.Info("checkPipeline MRIID: %d, queryStatus: %d, pipeline status: %s", mrIID, queryStatus, status)
  409. // query pipeline id index
  410. if queryStatus == model.QueryID {
  411. if pipelineID > lastPipelineID {
  412. comment := fmt.Sprintf("<pre>警告:SAGA 检测到重新提交代码了,+merge中断!请重新review代码!</pre>")
  413. err = c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  414. return
  415. }
  416. ok = true
  417. return
  418. }
  419. // query process status
  420. if queryStatus == model.QueryProcessing {
  421. if status == model.PipelineRunning || status == model.PipelinePending {
  422. comment := fmt.Sprintf("<pre>警告:pipeline正在运行中,暂不能立即merge,待pipeline运行通过后会自动执行merge操作!</pre>")
  423. err = c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  424. ok = true
  425. return
  426. } else if status == model.PipelineSuccess || status == model.PipelineSkipped {
  427. ok = true
  428. return
  429. }
  430. comment := fmt.Sprintf("<pre>警告:pipeline状态异常,请确保pipeline状态正常后再执行merge操作!</pre>")
  431. err = c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  432. return
  433. }
  434. // query success status
  435. if queryStatus == model.QuerySuccess {
  436. if status != model.PipelineSuccess {
  437. comment := fmt.Sprintf("<pre>警告:SAGA 无法执行+merge,pipeline还未成功,请大佬先让pipeline执行通过!</pre>")
  438. err = c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  439. ok = false
  440. return
  441. }
  442. }
  443. ok = true
  444. return
  445. }
  446. // showMRQueueInfo ...
  447. func (c *Command) showMRQueueInfo(ctx context.Context, taskInfo *model.TaskInfo) (err error) {
  448. var (
  449. mrIID = int(taskInfo.Event.MergeRequest.IID)
  450. projID = int(taskInfo.Event.MergeRequest.SourceProjectID)
  451. noteID = taskInfo.NoteID
  452. taskInfos []*model.TaskInfo
  453. comment string
  454. waitNum int
  455. runningNum int
  456. )
  457. if _, taskInfos, err = c.dao.MergeTasks(ctx, model.TaskStatusWaiting); err != nil {
  458. return
  459. }
  460. for _, waitTaskInfo := range taskInfos {
  461. if waitTaskInfo.Event.ProjectID == taskInfo.Event.ProjectID {
  462. waitNum++
  463. }
  464. }
  465. if _, taskInfos, err = c.dao.MergeTasks(ctx, model.TaskStatusRunning); err != nil {
  466. return
  467. }
  468. for _, runningTaskInfo := range taskInfos {
  469. if runningTaskInfo.Event.ProjectID == taskInfo.Event.ProjectID {
  470. runningNum++
  471. }
  472. }
  473. if waitNum > 0 {
  474. comment = fmt.Sprintf("<pre>SAGA 提示:当前还有 [%d] 个 MR 等待合并,请大佬耐心等待!</pre>", waitNum)
  475. c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  476. } else if runningNum > 0 {
  477. comment = fmt.Sprintf("<pre>SAGA 提示:当前还有 [%d] 个 MR 正在执行,请大佬耐心等待!</pre>", runningNum)
  478. c.gitlab.UpdateMRNote(projID, mrIID, noteID, comment)
  479. }
  480. return
  481. }