sync.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "go-common/app/admin/ep/saga/conf"
  7. "go-common/app/admin/ep/saga/model"
  8. "go-common/library/log"
  9. )
  10. const _baseBranch = "master"
  11. // syncalldataproc ...
  12. func (s *Service) syncweekdataproc() {
  13. //the blow object will be sync or update all time data
  14. go s.SyncMember()
  15. go s.SyncRunners()
  16. go s.SyncContacts(context.TODO())
  17. }
  18. // syncalldataproc ...
  19. func (s *Service) syncalldataproc() {
  20. if err := s.SyncBranch(); err != nil {
  21. return
  22. }
  23. // AggregateBranch base on branch, if SyncBranch err, so return
  24. s.AggregateBranch()
  25. }
  26. // syncdataproc ...
  27. func (s *Service) syncdataproc() {
  28. var (
  29. wg sync.WaitGroup
  30. done = func(f func() error) {
  31. defer wg.Done()
  32. f()
  33. }
  34. )
  35. wg.Add(2)
  36. go done(s.SyncIssues)
  37. go done(s.SyncPipelines)
  38. wg.Wait()
  39. wg.Add(1)
  40. go done(s.SyncCommit)
  41. wg.Wait()
  42. wg.Add(2)
  43. go done(s.SyncJobs)
  44. go done(s.SyncMR)
  45. wg.Wait()
  46. wg.Add(3)
  47. go done(s.SyncMRNote)
  48. go done(s.SyncMRAwardEmoji)
  49. go done(s.SyncMRDiscussion)
  50. wg.Wait()
  51. s.AggregateMR()
  52. }
  53. // SyncCommit ...
  54. func (s *Service) SyncCommit() (err error) {
  55. var (
  56. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  57. allPage int
  58. allNum int
  59. result *model.SyncResult
  60. )
  61. log.Info("===================== SyncCommit start ========================")
  62. for _, projectID := range projectIDs {
  63. if result, err = s.SyncProjectCommit(projectID); err != nil {
  64. log.Error("SyncCommit projectID(%d), err(%+v)", projectID, err)
  65. go s.WechatFailData(model.DataTypeCommit, projectID, result, err)
  66. return
  67. }
  68. log.Info(">>>>>>>>> SyncCommit projectID(%d) complete Page(%d), Num(%d)", projectID, result.TotalPage, result.TotalNum)
  69. if result != nil && len(result.FailData) > 0 {
  70. go s.WechatFailData(model.DataTypeCommit, projectID, result, nil)
  71. }
  72. allPage = allPage + result.TotalPage
  73. allNum = allNum + result.TotalNum
  74. }
  75. log.Info("===================== SyncCommit finished totalPage(%d), totalNum(%d)========================", allPage, allNum)
  76. return
  77. }
  78. // SyncMR ...
  79. func (s *Service) SyncMR() (err error) {
  80. var (
  81. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  82. allPage int
  83. allNum int
  84. result *model.SyncResult
  85. )
  86. log.Info("===================== SyncMR start ========================")
  87. for _, projectID := range projectIDs {
  88. if result, err = s.SyncProjectMR(context.TODO(), projectID); err != nil {
  89. log.Error("SyncMR projectID(%d), err(%+v)", projectID, err)
  90. go s.WechatFailData(model.DataTypeMR, projectID, result, err)
  91. return
  92. }
  93. log.Info(">>>>>>>>> SyncMR projectID(%d) complete Page(%d), Num(%d)", projectID, result.TotalPage, result.TotalNum)
  94. if result != nil && len(result.FailData) > 0 {
  95. go s.WechatFailData(model.DataTypeMR, projectID, result, nil)
  96. }
  97. allPage = allPage + result.TotalPage
  98. allNum = allNum + result.TotalNum
  99. }
  100. log.Info("===================== SyncMR finished totalPage(%d), totalNum(%d)========================", allPage, allNum)
  101. return
  102. }
  103. // AggregateMR ...
  104. func (s *Service) AggregateMR() (err error) {
  105. var (
  106. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  107. )
  108. log.Info("===================== AggMR start ========================")
  109. for _, projectID := range projectIDs {
  110. if err = s.AggregateProjectMR(context.TODO(), projectID); err != nil {
  111. log.Error("AggMR projectID(%d), err(%+v)", projectID, err)
  112. return
  113. }
  114. log.Info(">>>>>>>>> AggMR projectID(%d) complete", projectID)
  115. }
  116. log.Info("===================== AggMR finished ========================")
  117. return
  118. }
  119. // SyncJobs ...
  120. func (s *Service) SyncJobs() (err error) {
  121. var (
  122. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  123. allPage int
  124. allNum int
  125. result *model.SyncResult
  126. )
  127. log.Info("===================== SyncJobs start ========================")
  128. for _, projectID := range projectIDs {
  129. if result, err = s.SyncProjectJobs(projectID); err != nil {
  130. log.Error("SyncJobs projectID(%d), err(%+v)", projectID, err)
  131. go s.WechatFailData(model.DataTypeJob, projectID, result, err)
  132. return
  133. }
  134. log.Info(">>>>>>>>> SyncJobs projectID(%d) complete Page(%d), Num(%d)", projectID, result.TotalPage, result.TotalNum)
  135. if result != nil && len(result.FailData) > 0 {
  136. go s.WechatFailData(model.DataTypeJob, projectID, result, nil)
  137. }
  138. allPage = allPage + result.TotalPage
  139. allNum = allNum + result.TotalNum
  140. }
  141. log.Info("===================== SyncJobs finished totalPage(%d), totalNum(%d)========================", allPage, allNum)
  142. return
  143. }
  144. // SyncPipelines ...
  145. func (s *Service) SyncPipelines() (err error) {
  146. var (
  147. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  148. allPage int
  149. allNum int
  150. result *model.SyncResult
  151. )
  152. log.Info("===================== SyncPipelines start ========================")
  153. for _, projectID := range projectIDs {
  154. if result, err = s.SyncProjectPipelines(projectID); err != nil {
  155. log.Error("SyncPipelines projectID(%d), err(%+v)", projectID, err)
  156. go s.WechatFailData(model.DataTypePipeline, projectID, result, err)
  157. return
  158. }
  159. log.Info(">>>>>>>>> SyncPipelines projectID(%d) complete Page(%d), Num(%d)", projectID, result.TotalPage, result.TotalNum)
  160. if result != nil && len(result.FailData) > 0 {
  161. go s.WechatFailData(model.DataTypePipeline, projectID, result, nil)
  162. }
  163. allPage = allPage + result.TotalPage
  164. allNum = allNum + result.TotalNum
  165. }
  166. log.Info("===================== SyncPipelines finished totalPage(%d), totalNum(%d)========================", allPage, allNum)
  167. return
  168. }
  169. // SyncMRNote ...
  170. func (s *Service) SyncMRNote() (err error) {
  171. var (
  172. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  173. totalPage int
  174. totalNum int
  175. page int
  176. num int
  177. )
  178. log.Info("===================== SyncNote start ========================")
  179. for _, projectID := range projectIDs {
  180. if page, num, err = s.SyncProjectNotes(context.TODO(), projectID); err != nil {
  181. log.Error("SyncNotes projectID(%d), err(%+v)", projectID, err)
  182. return
  183. }
  184. log.Info(">>>>>>>>> SyncNote projectID(%d) complete Page(%d), Num(%d)", projectID, page, num)
  185. totalPage = totalPage + page
  186. totalNum = totalNum + num
  187. }
  188. log.Info("===================== SyncNote finished totalPage(%d), totalNum(%d)========================", totalPage, totalNum)
  189. return
  190. }
  191. // SyncMember ...
  192. func (s *Service) SyncMember() (err error) {
  193. var (
  194. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  195. totalPage int
  196. totalNum int
  197. page int
  198. num int
  199. )
  200. log.Info("===================== SyncMember start ========================")
  201. for _, projectID := range projectIDs {
  202. if page, num, err = s.SyncProjectMember(context.TODO(), projectID); err != nil {
  203. log.Error("SyncMember projectID(%d), err(%+v)", projectID, err)
  204. return
  205. }
  206. log.Info(">>>>>>>>> SyncMember projectID(%d) complete Page(%d), Num(%d)", projectID, page, num)
  207. totalPage = totalPage + page
  208. totalNum = totalNum + num
  209. }
  210. log.Info("===================== SyncMember finished totalPage(%d), totalNum(%d)========================", totalPage, totalNum)
  211. return
  212. }
  213. // SyncMRAwardEmoji ...
  214. func (s *Service) SyncMRAwardEmoji() (err error) {
  215. var (
  216. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  217. totalPage int
  218. totalNum int
  219. page int
  220. num int
  221. )
  222. log.Info("===================== SyncMRAwardEmoji start ========================")
  223. for _, projectID := range projectIDs {
  224. if page, num, err = s.SyncProjectAwardEmoji(context.TODO(), projectID); err != nil {
  225. log.Error("SyncMRAwardEmoji projectID(%d), err(%+v)", projectID, err)
  226. return
  227. }
  228. log.Info(">>>>>>>>> SyncMRAwardEmoji projectID(%d) complete Page(%d), Num(%d)", projectID, page, num)
  229. totalPage = totalPage + page
  230. totalNum = totalNum + num
  231. }
  232. log.Info("===================== SyncMRAwardEmoji finished totalPage(%d), totalNum(%d)========================", totalPage, totalNum)
  233. return
  234. }
  235. // SyncMRDiscussion ...
  236. func (s *Service) SyncMRDiscussion() (err error) {
  237. var (
  238. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  239. totalPage int
  240. totalNum int
  241. page int
  242. num int
  243. )
  244. log.Info("===================== SyncMRDiscussion start ========================")
  245. for _, projectID := range projectIDs {
  246. if page, num, err = s.SyncProjectDiscussion(context.TODO(), projectID); err != nil {
  247. log.Error("SyncMRDiscussion projectID(%d), err(%+v)", projectID, err)
  248. return
  249. }
  250. log.Info(">>>>>>>>> SyncMRDiscussion projectID(%d) complete Page(%d), Num(%d)", projectID, page, num)
  251. totalPage = totalPage + page
  252. totalNum = totalNum + num
  253. }
  254. log.Info("===================== SyncMRDiscussion finished totalPage(%d), totalNum(%d)========================", totalPage, totalNum)
  255. return
  256. }
  257. // SyncRunners ...
  258. func (s *Service) SyncRunners() (err error) {
  259. var (
  260. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  261. totalPage int
  262. totalNum int
  263. page int
  264. num int
  265. )
  266. log.Info("===================== SyncRunners start ========================")
  267. for _, projectID := range projectIDs {
  268. if page, num, err = s.SyncAllRunners(projectID); err != nil {
  269. log.Error("SyncRunners projectID(%d), err(%+v)", projectID, err)
  270. return
  271. }
  272. log.Info(">>>>>>>>> SyncRunners projectID(%d) complete Page(%d), Num(%d)", projectID, page, num)
  273. totalPage = totalPage + page
  274. totalNum = totalNum + num
  275. }
  276. log.Info("===================== SyncRunners finished totalPage(%d), totalNum(%d)========================", totalPage, totalNum)
  277. return
  278. }
  279. // SyncIssues ...
  280. func (s *Service) SyncIssues() (err error) {
  281. var (
  282. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  283. totalPage int
  284. totalNum int
  285. page int
  286. num int
  287. )
  288. log.Info("===================== SyncIssues start ========================")
  289. for _, projectID := range projectIDs {
  290. if page, num, err = s.SyncAllIssues(projectID); err != nil {
  291. log.Error("SyncIssues projectID(%d), err(%+v)", projectID, err)
  292. return
  293. }
  294. log.Info(">>>>>>>>> SyncIssues projectID(%d) complete Page(%d), Num(%d)", projectID, page, num)
  295. totalPage = totalPage + page
  296. totalNum = totalNum + num
  297. }
  298. log.Info("===================== SyncIssues finished totalPage(%d), totalNum(%d)========================", totalPage, totalNum)
  299. return
  300. }
  301. // SyncBranch ...
  302. func (s *Service) SyncBranch() (err error) {
  303. var (
  304. projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  305. allPage int
  306. allNum int
  307. result *model.SyncResult
  308. )
  309. log.Info("===================== SyncBranch start ========================")
  310. for _, projectID := range projectIDs {
  311. if result, err = s.SyncProjectBranch(context.TODO(), projectID); err != nil {
  312. log.Error("SyncBranch projectID(%d), err(%+v)", projectID, err)
  313. go s.WechatFailData(model.DataTypeBranch, projectID, result, err)
  314. return
  315. }
  316. log.Info(">>>>>>>>> SyncBranch projectID(%d) complete Page(%d), Num(%d)", projectID, result.TotalPage, result.TotalNum)
  317. if result != nil && len(result.FailData) > 0 {
  318. go s.WechatFailData(model.DataTypeBranch, projectID, result, nil)
  319. }
  320. allPage = allPage + result.TotalPage
  321. allNum = allNum + result.TotalNum
  322. }
  323. log.Info("===================== SyncBranch finished totalPage(%d), totalNum(%d)========================", allPage, allNum)
  324. return
  325. }
  326. // AggregateBranch ...
  327. func (s *Service) AggregateBranch() (err error) {
  328. var projectIDs = conf.Conf.Property.DefaultProject.ProjectIDs
  329. log.Info("===================== SyncAggregateBranch start ========================")
  330. for _, projectID := range projectIDs {
  331. if err = s.AggregateProjectBranch(context.TODO(), projectID, _baseBranch); err != nil {
  332. log.Error("SyncAggregateBranch projectID(%d), err(%+v)", projectID, err)
  333. return
  334. }
  335. }
  336. log.Info("===================== SyncAggregateBranch finished ========================")
  337. return
  338. }
  339. // WechatFailData ...
  340. func (s *Service) WechatFailData(dataType string, projectID int, result *model.SyncResult, error error) (err error) {
  341. var (
  342. ctx = context.Background()
  343. users = conf.Conf.Property.SyncData.WechatUser
  344. content string
  345. )
  346. title := "[SAGA-ADMIN] SYNC ERROR !!!"
  347. subject := fmt.Sprintf("%s\n\nDataType ( %s )\nProjectID ( %d )", title, dataType, projectID)
  348. if error != nil {
  349. return s.wechat.PushMsg(ctx, users, fmt.Sprintf("%s\n\n%s", subject, error.Error()))
  350. }
  351. if result == nil || len(result.FailData) <= 0 {
  352. return
  353. }
  354. content = "LIST : \n"
  355. for _, data := range result.FailData {
  356. if dataType == model.DataTypeJob || dataType == model.DataTypePipeline || dataType == model.DataTypeMR {
  357. content += fmt.Sprintf("%d, ", data.ChildID)
  358. } else if dataType == model.DataTypeCommit || dataType == model.DataTypeBranch {
  359. content += fmt.Sprintf("%s\n", data.ChildIDStr)
  360. }
  361. }
  362. return s.wechat.PushMsg(ctx, users, fmt.Sprintf("%s\n\n%s", subject, content))
  363. }