job.go 12 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/admin/ep/saga/model"
  8. "go-common/app/admin/ep/saga/service/utils"
  9. "go-common/library/cache/redis"
  10. "go-common/library/log"
  11. "github.com/xanzy/go-gitlab"
  12. )
  13. // QueryProjectJob ...
  14. func (s *Service) QueryProjectJob(c context.Context, req *model.ProjectJobRequest) (resp *model.ProjectJobResp, err error) {
  15. var (
  16. layout = "2006-01-02"
  17. queryCacheKey string
  18. jobs []*model.ProjectJob
  19. since time.Time
  20. util time.Time
  21. )
  22. resp = &model.ProjectJobResp{ProjectID: req.ProjectID, QueryDescription: "最近一月的Jobs日常", State: req.Scope, DataInfo: []*model.DateJobInfo{}}
  23. year, month, day := time.Now().Date()
  24. util = time.Date(year, month, day-1, 0, 0, 0, 0, time.Local)
  25. since = util.AddDate(0, -1, 0)
  26. //query from redis first
  27. queryCacheKey = fmt.Sprintf("saga_admin_job_%d_%s_%s_%s_%d", req.ProjectID, req.Branch, req.Scope, req.Machine, req.StatisticsType)
  28. if err = s.dao.ItemRedis(c, queryCacheKey, &resp); err != redis.ErrNil {
  29. return
  30. }
  31. if resp.TotalItem, jobs, err = s.queryProjectJobByTime(c, req.ProjectID, since, util); err != nil {
  32. return
  33. }
  34. //init map key
  35. pendingTime := make(map[string][]float64)
  36. runningTime := make(map[string][]float64)
  37. for i := 1; ; i++ {
  38. day := since.AddDate(0, 0, i)
  39. if day.After(util) {
  40. break
  41. }
  42. dayStr := day.Format(layout)
  43. pendingTime[dayStr] = []float64{}
  44. runningTime[dayStr] = []float64{}
  45. resp.DataInfo = append(resp.DataInfo, &model.DateJobInfo{Date: dayStr, SlowestPendingJob: []*model.ProjectJob{}})
  46. }
  47. //根据查询条件进行过滤
  48. newJobs := jobs[:0]
  49. if req.Branch != "" {
  50. for _, job := range jobs {
  51. if job.Branch == req.Branch {
  52. newJobs = append(newJobs, job)
  53. }
  54. }
  55. jobs = newJobs
  56. newJobs = jobs[:0]
  57. }
  58. if req.User != "" {
  59. for _, job := range jobs {
  60. if job.User == req.User {
  61. newJobs = append(newJobs, job)
  62. }
  63. }
  64. jobs = newJobs
  65. newJobs = jobs[:0]
  66. }
  67. if req.Machine != "" {
  68. for _, job := range jobs {
  69. if job.Machine == req.Machine {
  70. newJobs = append(newJobs, job)
  71. }
  72. }
  73. jobs = newJobs
  74. }
  75. //统计pending running 时间
  76. for _, job := range jobs {
  77. var (
  78. jobInfo *model.DateJobInfo
  79. )
  80. jobDate := job.CreatedAt.Format(layout)
  81. for _, j := range resp.DataInfo {
  82. if j.Date == jobDate {
  83. jobInfo = j
  84. break
  85. }
  86. }
  87. jobInfo.JobTotal++
  88. if job.Status == req.Scope {
  89. jobInfo.StatusNum++
  90. if job.StartedAt != nil && job.CreatedAt != nil {
  91. pending := job.StartedAt.Sub(*job.CreatedAt).Seconds()
  92. pendingTime[jobDate] = append(pendingTime[jobDate], pending)
  93. if pending >= 300 {
  94. jobInfo.SlowestPendingJob = append(jobInfo.SlowestPendingJob, job)
  95. }
  96. }
  97. if job.Status == "success" {
  98. running := job.FinishedAt.Sub(*job.StartedAt).Seconds()
  99. runningTime[jobDate] = append(runningTime[jobDate], running)
  100. }
  101. }
  102. }
  103. for k, v := range runningTime {
  104. var (
  105. jobInfo *model.DateJobInfo
  106. )
  107. for _, j := range resp.DataInfo {
  108. if j.Date == k {
  109. jobInfo = j
  110. break
  111. }
  112. }
  113. jobInfo.PendingTime = utils.CalAverageTime(req.StatisticsType, v)
  114. jobInfo.RunningTime = utils.CalAverageTime(req.StatisticsType, pendingTime[k])
  115. }
  116. // set data to redis
  117. err = s.dao.SetItemRedis(c, queryCacheKey, resp, model.ExpiredOneDay)
  118. return
  119. }
  120. // queryProjectJobByTime ...
  121. func (s *Service) queryProjectJobByTime(c context.Context, projectID int, since, util time.Time) (count int, result []*model.ProjectJob, err error) {
  122. var (
  123. resp *gitlab.Response
  124. jobs []gitlab.Job
  125. overTime bool
  126. )
  127. if _, resp, err = s.gitlab.ListProjectJobs(projectID, 1); err != nil {
  128. return
  129. }
  130. if resp.TotalItems <= 0 {
  131. return
  132. }
  133. for page := 1; ; page++ {
  134. if jobs, resp, err = s.gitlab.ListProjectJobs(projectID, page); err != nil {
  135. return
  136. }
  137. for _, job := range jobs {
  138. if job.CreatedAt == nil {
  139. continue
  140. }
  141. if job.CreatedAt.After(since) && job.CreatedAt.Before(util) {
  142. count++
  143. jobInfo := &model.ProjectJob{
  144. Status: job.Status,
  145. Branch: job.Ref,
  146. Machine: job.Runner.Description,
  147. User: job.User.Name,
  148. CreatedAt: job.CreatedAt,
  149. StartedAt: job.StartedAt,
  150. FinishedAt: job.FinishedAt}
  151. result = append(result, jobInfo)
  152. }
  153. if job.CreatedAt.Before(since) {
  154. overTime = true
  155. }
  156. }
  157. if overTime {
  158. break
  159. }
  160. if resp.NextPage == 0 {
  161. break
  162. }
  163. }
  164. return
  165. }
  166. // QueryProjectJobNew ...
  167. func (s *Service) QueryProjectJobNew(c context.Context, req *model.ProjectJobRequest) (resp *model.ProjectJobResp, err error) {
  168. var (
  169. layout = "2006-01-02"
  170. fmtLayout = `%d-%d-%d 00:00:00`
  171. jobs []*model.StatisticsJobs
  172. )
  173. resp = &model.ProjectJobResp{ProjectID: req.ProjectID, QueryDescription: "最近一月的Jobs日常", State: req.Scope, DataInfo: []*model.DateJobInfo{}}
  174. year, month, day := time.Now().Date()
  175. until := time.Date(year, month, day-1, 0, 0, 0, 0, time.Local)
  176. since := until.AddDate(0, -1, 0)
  177. sinceStr := fmt.Sprintf(fmtLayout, since.Year(), since.Month(), since.Day())
  178. untilStr := fmt.Sprintf(fmtLayout, until.Year(), until.Month(), until.Day())
  179. if resp.TotalItem, jobs, err = s.dao.QueryJobsByTime(req.ProjectID, req, sinceStr, untilStr); err != nil {
  180. return
  181. }
  182. //init map key
  183. pendingTime := make(map[string][]float64)
  184. runningTime := make(map[string][]float64)
  185. for i := 1; ; i++ {
  186. day := since.AddDate(0, 0, i)
  187. if day.After(until) {
  188. break
  189. }
  190. dayStr := day.Format(layout)
  191. pendingTime[dayStr] = []float64{}
  192. runningTime[dayStr] = []float64{}
  193. resp.DataInfo = append(resp.DataInfo, &model.DateJobInfo{Date: dayStr, SlowestPendingJob: []*model.ProjectJob{}})
  194. }
  195. //统计pending running 时间
  196. for _, job := range jobs {
  197. var (
  198. jobInfo *model.DateJobInfo
  199. )
  200. jobDate := job.CreatedAt.Format(layout)
  201. for _, j := range resp.DataInfo {
  202. if j.Date == jobDate {
  203. jobInfo = j
  204. break
  205. }
  206. }
  207. if jobInfo == nil {
  208. continue
  209. }
  210. jobInfo.JobTotal++
  211. if job.Status == req.Scope {
  212. jobInfo.StatusNum++
  213. if job.StartedAt != nil && job.CreatedAt != nil {
  214. pending := job.StartedAt.Sub(*job.CreatedAt).Seconds()
  215. pendingTime[jobDate] = append(pendingTime[jobDate], pending)
  216. if pending >= 300 {
  217. jo := &model.ProjectJob{
  218. Status: job.Status,
  219. User: job.UserName,
  220. Branch: job.Ref,
  221. Machine: job.RunnerDescription,
  222. CreatedAt: job.CreatedAt,
  223. StartedAt: job.StartedAt,
  224. FinishedAt: job.FinishedAt,
  225. }
  226. jobInfo.SlowestPendingJob = append(jobInfo.SlowestPendingJob, jo)
  227. }
  228. }
  229. if job.Status == "success" {
  230. running := job.FinishedAt.Sub(*job.StartedAt).Seconds()
  231. runningTime[jobDate] = append(runningTime[jobDate], running)
  232. }
  233. }
  234. }
  235. for k, v := range runningTime {
  236. var (
  237. jobInfo *model.DateJobInfo
  238. )
  239. for _, j := range resp.DataInfo {
  240. if j.Date == k {
  241. jobInfo = j
  242. break
  243. }
  244. }
  245. jobInfo.PendingTime = utils.CalAverageTime(req.StatisticsType, v)
  246. jobInfo.RunningTime = utils.CalAverageTime(req.StatisticsType, pendingTime[k])
  247. }
  248. return
  249. }
  250. /*-------------------------------------- sync job ----------------------------------------*/
  251. // SyncProjectJobs ...
  252. func (s *Service) SyncProjectJobs(projectID int) (result *model.SyncResult, err error) {
  253. var (
  254. //syncAllTime = conf.Conf.Property.SyncData.SyncAllTime
  255. syncAllTime = false
  256. since *time.Time
  257. until *time.Time
  258. projectInfo *model.ProjectInfo
  259. )
  260. if projectInfo, err = s.dao.ProjectInfoByID(projectID); err != nil {
  261. return
  262. }
  263. if !syncAllTime {
  264. since, until = utils.CalSyncTime()
  265. log.Info("sync project(%d) job time since: %v, until: %v", projectID, since, until)
  266. if result, err = s.SyncProjectJobsByTime(projectID, projectInfo.Name, *since, *until); err != nil {
  267. return
  268. }
  269. } else {
  270. if result, err = s.SyncProjectJobsNormal(projectID, projectInfo.Name); err != nil {
  271. return
  272. }
  273. }
  274. return
  275. }
  276. // SyncProjectJobsNormal ...
  277. func (s *Service) SyncProjectJobsNormal(projectID int, projectName string) (result *model.SyncResult, err error) {
  278. var (
  279. jobs []gitlab.Job
  280. resp *gitlab.Response
  281. )
  282. result = &model.SyncResult{}
  283. for page := 1; ; page++ {
  284. result.TotalPage++
  285. if jobs, resp, err = s.gitlab.ListProjectJobs(projectID, page); err != nil {
  286. return
  287. }
  288. for _, job := range jobs {
  289. if err = s.structureDatabasejob(projectID, projectName, job); err != nil {
  290. log.Error("job Save Database err: projectID(%d), JobID(%d)", projectID, job.ID)
  291. err = nil
  292. errData := &model.FailData{
  293. ChildID: job.ID,
  294. }
  295. result.FailData = append(result.FailData, errData)
  296. continue
  297. }
  298. result.TotalNum++
  299. }
  300. if resp.NextPage == 0 {
  301. break
  302. }
  303. }
  304. return
  305. }
  306. // SyncProjectJobsByTime ...
  307. func (s *Service) SyncProjectJobsByTime(projectID int, projectName string, since, until time.Time) (result *model.SyncResult, err error) {
  308. var (
  309. jobs []gitlab.Job
  310. resp *gitlab.Response
  311. startQuery bool
  312. )
  313. result = &model.SyncResult{}
  314. if _, resp, err = s.gitlab.ListProjectJobs(projectID, 1); err != nil {
  315. return
  316. }
  317. page := 1
  318. for page <= resp.TotalPages {
  319. result.TotalPage++
  320. if !startQuery {
  321. if jobs, _, err = s.gitlab.ListProjectJobs(projectID, page); err != nil {
  322. return
  323. }
  324. if page == 1 && len(jobs) <= 0 {
  325. return
  326. }
  327. if jobs[0].CreatedAt.After(until) {
  328. page++
  329. continue
  330. } else {
  331. startQuery = true
  332. page--
  333. continue
  334. }
  335. }
  336. if jobs, _, err = s.gitlab.ListProjectJobs(projectID, page); err != nil {
  337. return
  338. }
  339. for _, job := range jobs {
  340. createTime := job.CreatedAt
  341. if createTime.After(since) && createTime.Before(until) {
  342. if err = s.structureDatabasejob(projectID, projectName, job); err != nil {
  343. log.Error("job Save Database err: projectID(%d), JobID(%d)", projectID, job.ID)
  344. err = nil
  345. errData := &model.FailData{
  346. ChildID: job.ID,
  347. }
  348. result.FailData = append(result.FailData, errData)
  349. continue
  350. }
  351. result.TotalNum++
  352. }
  353. if createTime.Before(since) {
  354. return
  355. }
  356. }
  357. page++
  358. }
  359. return
  360. }
  361. // structureDatabasejob ...
  362. func (s *Service) structureDatabasejob(projectID int, projectName string, job gitlab.Job) (err error) {
  363. var (
  364. jobArtifactsFile string
  365. jobCommitID string
  366. )
  367. jobArtifactsFileByte, _ := json.Marshal(job.ArtifactsFile)
  368. jobArtifactsFile = string(jobArtifactsFileByte)
  369. if job.Commit != nil {
  370. jobCommitID = job.Commit.ID
  371. }
  372. jobDB := &model.StatisticsJobs{
  373. ProjectID: projectID,
  374. ProjectName: projectName,
  375. CommitID: jobCommitID,
  376. CreatedAt: job.CreatedAt,
  377. Coverage: job.Coverage,
  378. ArtifactsFile: jobArtifactsFile,
  379. FinishedAt: job.FinishedAt,
  380. JobID: job.ID,
  381. Name: job.Name,
  382. Ref: job.Ref,
  383. RunnerID: job.Runner.ID,
  384. RunnerDescription: job.Runner.Description,
  385. Stage: job.Stage,
  386. StartedAt: job.StartedAt,
  387. Status: job.Status,
  388. Tag: job.Tag,
  389. UserID: job.User.ID,
  390. UserName: job.User.Name,
  391. WebURL: job.WebURL,
  392. }
  393. return s.SaveDatabasejob(jobDB)
  394. }
  395. // SaveDatabasejob ...
  396. func (s *Service) SaveDatabasejob(jobDB *model.StatisticsJobs) (err error) {
  397. var total int
  398. if total, err = s.dao.HasJob(jobDB.ProjectID, jobDB.JobID); err != nil {
  399. log.Error("SaveDatabaseJob HasJob(%+v)", err)
  400. return
  401. }
  402. // found only one, so update
  403. if total == 1 {
  404. if err = s.dao.UpdateJob(jobDB.ProjectID, jobDB.JobID, jobDB); err != nil {
  405. log.Error("SaveDatabaseJob UpdateJob(%+v)", err)
  406. return
  407. }
  408. return
  409. } else if total > 1 {
  410. // found repeated row, this situation will not exist under normal
  411. log.Warn("SaveDatabasejob job has more rows(%d)", total)
  412. return
  413. }
  414. // insert row now
  415. if err = s.dao.CreateJob(jobDB); err != nil {
  416. log.Error("SaveDatabaseJob CreateJob(%+v)", err)
  417. return
  418. }
  419. return
  420. }