|
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "time"
- "go-common/app/admin/ep/saga/model"
- "go-common/app/admin/ep/saga/service/utils"
- "go-common/library/cache/redis"
- "go-common/library/log"
- "github.com/xanzy/go-gitlab"
- )
- // QueryProjectJob ...
- func (s *Service) QueryProjectJob(c context.Context, req *model.ProjectJobRequest) (resp *model.ProjectJobResp, err error) {
- var (
- layout = "2006-01-02"
- queryCacheKey string
- jobs []*model.ProjectJob
- since time.Time
- util time.Time
- )
- resp = &model.ProjectJobResp{ProjectID: req.ProjectID, QueryDescription: "最近一月的Jobs日常", State: req.Scope, DataInfo: []*model.DateJobInfo{}}
- year, month, day := time.Now().Date()
- util = time.Date(year, month, day-1, 0, 0, 0, 0, time.Local)
- since = util.AddDate(0, -1, 0)
- //query from redis first
- queryCacheKey = fmt.Sprintf("saga_admin_job_%d_%s_%s_%s_%d", req.ProjectID, req.Branch, req.Scope, req.Machine, req.StatisticsType)
- if err = s.dao.ItemRedis(c, queryCacheKey, &resp); err != redis.ErrNil {
- return
- }
- if resp.TotalItem, jobs, err = s.queryProjectJobByTime(c, req.ProjectID, since, util); err != nil {
- return
- }
- //init map key
- pendingTime := make(map[string][]float64)
- runningTime := make(map[string][]float64)
- for i := 1; ; i++ {
- day := since.AddDate(0, 0, i)
- if day.After(util) {
- break
- }
- dayStr := day.Format(layout)
- pendingTime[dayStr] = []float64{}
- runningTime[dayStr] = []float64{}
- resp.DataInfo = append(resp.DataInfo, &model.DateJobInfo{Date: dayStr, SlowestPendingJob: []*model.ProjectJob{}})
- }
- //根据查询条件进行过滤
- newJobs := jobs[:0]
- if req.Branch != "" {
- for _, job := range jobs {
- if job.Branch == req.Branch {
- newJobs = append(newJobs, job)
- }
- }
- jobs = newJobs
- newJobs = jobs[:0]
- }
- if req.User != "" {
- for _, job := range jobs {
- if job.User == req.User {
- newJobs = append(newJobs, job)
- }
- }
- jobs = newJobs
- newJobs = jobs[:0]
- }
- if req.Machine != "" {
- for _, job := range jobs {
- if job.Machine == req.Machine {
- newJobs = append(newJobs, job)
- }
- }
- jobs = newJobs
- }
- //统计pending running 时间
- for _, job := range jobs {
- var (
- jobInfo *model.DateJobInfo
- )
- jobDate := job.CreatedAt.Format(layout)
- for _, j := range resp.DataInfo {
- if j.Date == jobDate {
- jobInfo = j
- break
- }
- }
- jobInfo.JobTotal++
- if job.Status == req.Scope {
- jobInfo.StatusNum++
- if job.StartedAt != nil && job.CreatedAt != nil {
- pending := job.StartedAt.Sub(*job.CreatedAt).Seconds()
- pendingTime[jobDate] = append(pendingTime[jobDate], pending)
- if pending >= 300 {
- jobInfo.SlowestPendingJob = append(jobInfo.SlowestPendingJob, job)
- }
- }
- if job.Status == "success" {
- running := job.FinishedAt.Sub(*job.StartedAt).Seconds()
- runningTime[jobDate] = append(runningTime[jobDate], running)
- }
- }
- }
- for k, v := range runningTime {
- var (
- jobInfo *model.DateJobInfo
- )
- for _, j := range resp.DataInfo {
- if j.Date == k {
- jobInfo = j
- break
- }
- }
- jobInfo.PendingTime = utils.CalAverageTime(req.StatisticsType, v)
- jobInfo.RunningTime = utils.CalAverageTime(req.StatisticsType, pendingTime[k])
- }
- // set data to redis
- err = s.dao.SetItemRedis(c, queryCacheKey, resp, model.ExpiredOneDay)
- return
- }
- // queryProjectJobByTime ...
- func (s *Service) queryProjectJobByTime(c context.Context, projectID int, since, util time.Time) (count int, result []*model.ProjectJob, err error) {
- var (
- resp *gitlab.Response
- jobs []gitlab.Job
- overTime bool
- )
- if _, resp, err = s.gitlab.ListProjectJobs(projectID, 1); err != nil {
- return
- }
- if resp.TotalItems <= 0 {
- return
- }
- for page := 1; ; page++ {
- if jobs, resp, err = s.gitlab.ListProjectJobs(projectID, page); err != nil {
- return
- }
- for _, job := range jobs {
- if job.CreatedAt == nil {
- continue
- }
- if job.CreatedAt.After(since) && job.CreatedAt.Before(util) {
- count++
- jobInfo := &model.ProjectJob{
- Status: job.Status,
- Branch: job.Ref,
- Machine: job.Runner.Description,
- User: job.User.Name,
- CreatedAt: job.CreatedAt,
- StartedAt: job.StartedAt,
- FinishedAt: job.FinishedAt}
- result = append(result, jobInfo)
- }
- if job.CreatedAt.Before(since) {
- overTime = true
- }
- }
- if overTime {
- break
- }
- if resp.NextPage == 0 {
- break
- }
- }
- return
- }
- // QueryProjectJobNew ...
- func (s *Service) QueryProjectJobNew(c context.Context, req *model.ProjectJobRequest) (resp *model.ProjectJobResp, err error) {
- var (
- layout = "2006-01-02"
- fmtLayout = `%d-%d-%d 00:00:00`
- jobs []*model.StatisticsJobs
- )
- resp = &model.ProjectJobResp{ProjectID: req.ProjectID, QueryDescription: "最近一月的Jobs日常", State: req.Scope, DataInfo: []*model.DateJobInfo{}}
- year, month, day := time.Now().Date()
- until := time.Date(year, month, day-1, 0, 0, 0, 0, time.Local)
- since := until.AddDate(0, -1, 0)
- sinceStr := fmt.Sprintf(fmtLayout, since.Year(), since.Month(), since.Day())
- untilStr := fmt.Sprintf(fmtLayout, until.Year(), until.Month(), until.Day())
- if resp.TotalItem, jobs, err = s.dao.QueryJobsByTime(req.ProjectID, req, sinceStr, untilStr); err != nil {
- return
- }
- //init map key
- pendingTime := make(map[string][]float64)
- runningTime := make(map[string][]float64)
- for i := 1; ; i++ {
- day := since.AddDate(0, 0, i)
- if day.After(until) {
- break
- }
- dayStr := day.Format(layout)
- pendingTime[dayStr] = []float64{}
- runningTime[dayStr] = []float64{}
- resp.DataInfo = append(resp.DataInfo, &model.DateJobInfo{Date: dayStr, SlowestPendingJob: []*model.ProjectJob{}})
- }
- //统计pending running 时间
- for _, job := range jobs {
- var (
- jobInfo *model.DateJobInfo
- )
- jobDate := job.CreatedAt.Format(layout)
- for _, j := range resp.DataInfo {
- if j.Date == jobDate {
- jobInfo = j
- break
- }
- }
- if jobInfo == nil {
- continue
- }
- jobInfo.JobTotal++
- if job.Status == req.Scope {
- jobInfo.StatusNum++
- if job.StartedAt != nil && job.CreatedAt != nil {
- pending := job.StartedAt.Sub(*job.CreatedAt).Seconds()
- pendingTime[jobDate] = append(pendingTime[jobDate], pending)
- if pending >= 300 {
- jo := &model.ProjectJob{
- Status: job.Status,
- User: job.UserName,
- Branch: job.Ref,
- Machine: job.RunnerDescription,
- CreatedAt: job.CreatedAt,
- StartedAt: job.StartedAt,
- FinishedAt: job.FinishedAt,
- }
- jobInfo.SlowestPendingJob = append(jobInfo.SlowestPendingJob, jo)
- }
- }
- if job.Status == "success" {
- running := job.FinishedAt.Sub(*job.StartedAt).Seconds()
- runningTime[jobDate] = append(runningTime[jobDate], running)
- }
- }
- }
- for k, v := range runningTime {
- var (
- jobInfo *model.DateJobInfo
- )
- for _, j := range resp.DataInfo {
- if j.Date == k {
- jobInfo = j
- break
- }
- }
- jobInfo.PendingTime = utils.CalAverageTime(req.StatisticsType, v)
- jobInfo.RunningTime = utils.CalAverageTime(req.StatisticsType, pendingTime[k])
- }
- return
- }
- /*-------------------------------------- sync job ----------------------------------------*/
- // SyncProjectJobs ...
- func (s *Service) SyncProjectJobs(projectID int) (result *model.SyncResult, err error) {
- var (
- //syncAllTime = conf.Conf.Property.SyncData.SyncAllTime
- syncAllTime = false
- since *time.Time
- until *time.Time
- projectInfo *model.ProjectInfo
- )
- if projectInfo, err = s.dao.ProjectInfoByID(projectID); err != nil {
- return
- }
- if !syncAllTime {
- since, until = utils.CalSyncTime()
- log.Info("sync project(%d) job time since: %v, until: %v", projectID, since, until)
- if result, err = s.SyncProjectJobsByTime(projectID, projectInfo.Name, *since, *until); err != nil {
- return
- }
- } else {
- if result, err = s.SyncProjectJobsNormal(projectID, projectInfo.Name); err != nil {
- return
- }
- }
- return
- }
- // SyncProjectJobsNormal ...
- func (s *Service) SyncProjectJobsNormal(projectID int, projectName string) (result *model.SyncResult, err error) {
- var (
- jobs []gitlab.Job
- resp *gitlab.Response
- )
- result = &model.SyncResult{}
- for page := 1; ; page++ {
- result.TotalPage++
- if jobs, resp, err = s.gitlab.ListProjectJobs(projectID, page); err != nil {
- return
- }
- for _, job := range jobs {
- if err = s.structureDatabasejob(projectID, projectName, job); err != nil {
- log.Error("job Save Database err: projectID(%d), JobID(%d)", projectID, job.ID)
- err = nil
- errData := &model.FailData{
- ChildID: job.ID,
- }
- result.FailData = append(result.FailData, errData)
- continue
- }
- result.TotalNum++
- }
- if resp.NextPage == 0 {
- break
- }
- }
- return
- }
- // SyncProjectJobsByTime ...
- func (s *Service) SyncProjectJobsByTime(projectID int, projectName string, since, until time.Time) (result *model.SyncResult, err error) {
- var (
- jobs []gitlab.Job
- resp *gitlab.Response
- startQuery bool
- )
- result = &model.SyncResult{}
- if _, resp, err = s.gitlab.ListProjectJobs(projectID, 1); err != nil {
- return
- }
- page := 1
- for page <= resp.TotalPages {
- result.TotalPage++
- if !startQuery {
- if jobs, _, err = s.gitlab.ListProjectJobs(projectID, page); err != nil {
- return
- }
- if page == 1 && len(jobs) <= 0 {
- return
- }
- if jobs[0].CreatedAt.After(until) {
- page++
- continue
- } else {
- startQuery = true
- page--
- continue
- }
- }
- if jobs, _, err = s.gitlab.ListProjectJobs(projectID, page); err != nil {
- return
- }
- for _, job := range jobs {
- createTime := job.CreatedAt
- if createTime.After(since) && createTime.Before(until) {
- if err = s.structureDatabasejob(projectID, projectName, job); err != nil {
- log.Error("job Save Database err: projectID(%d), JobID(%d)", projectID, job.ID)
- err = nil
- errData := &model.FailData{
- ChildID: job.ID,
- }
- result.FailData = append(result.FailData, errData)
- continue
- }
- result.TotalNum++
- }
- if createTime.Before(since) {
- return
- }
- }
- page++
- }
- return
- }
- // structureDatabasejob ...
- func (s *Service) structureDatabasejob(projectID int, projectName string, job gitlab.Job) (err error) {
- var (
- jobArtifactsFile string
- jobCommitID string
- )
- jobArtifactsFileByte, _ := json.Marshal(job.ArtifactsFile)
- jobArtifactsFile = string(jobArtifactsFileByte)
- if job.Commit != nil {
- jobCommitID = job.Commit.ID
- }
- jobDB := &model.StatisticsJobs{
- ProjectID: projectID,
- ProjectName: projectName,
- CommitID: jobCommitID,
- CreatedAt: job.CreatedAt,
- Coverage: job.Coverage,
- ArtifactsFile: jobArtifactsFile,
- FinishedAt: job.FinishedAt,
- JobID: job.ID,
- Name: job.Name,
- Ref: job.Ref,
- RunnerID: job.Runner.ID,
- RunnerDescription: job.Runner.Description,
- Stage: job.Stage,
- StartedAt: job.StartedAt,
- Status: job.Status,
- Tag: job.Tag,
- UserID: job.User.ID,
- UserName: job.User.Name,
- WebURL: job.WebURL,
- }
- return s.SaveDatabasejob(jobDB)
- }
- // SaveDatabasejob ...
- func (s *Service) SaveDatabasejob(jobDB *model.StatisticsJobs) (err error) {
- var total int
- if total, err = s.dao.HasJob(jobDB.ProjectID, jobDB.JobID); err != nil {
- log.Error("SaveDatabaseJob HasJob(%+v)", err)
- return
- }
- // found only one, so update
- if total == 1 {
- if err = s.dao.UpdateJob(jobDB.ProjectID, jobDB.JobID, jobDB); err != nil {
- log.Error("SaveDatabaseJob UpdateJob(%+v)", err)
- return
- }
- return
- } else if total > 1 {
- // found repeated row, this situation will not exist under normal
- log.Warn("SaveDatabasejob job has more rows(%d)", total)
- return
- }
- // insert row now
- if err = s.dao.CreateJob(jobDB); err != nil {
- log.Error("SaveDatabaseJob CreateJob(%+v)", err)
- return
- }
- return
- }
|