bugly_batch_run.go 9.4 KB


  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "go-common/app/admin/ep/marthe/model"
  8. "go-common/library/ecode"
  9. "go-common/library/log"
  10. "github.com/satori/go.uuid"
  11. )
  12. // RunVersions Run Versions.
  13. func (s *Service) RunVersions(buglyVersionID int64) (rep map[string]interface{}, err error) {
  14. lock := s.getBatchRunLock(buglyVersionID)
  15. lock.Lock()
  16. defer lock.Unlock()
  17. var (
  18. buglyRunVersions []*model.BuglyVersion
  19. uid string
  20. isEnableRun bool
  21. )
  22. // 获取可以跑的版本号list
  23. if buglyRunVersions, err = s.GetBatchRunVersions(); err != nil {
  24. log.Error("GetBatchRunVersions err(%v) ", err)
  25. return
  26. }
  27. for _, buglyRunVersion := range buglyRunVersions {
  28. if buglyRunVersion.ID == buglyVersionID {
  29. isEnableRun = true
  30. uid = uuid.NewV4().String()
  31. // 插入batchrun表
  32. buglyBatchRun := &model.BuglyBatchRun{
  33. BuglyVersionID: buglyRunVersion.ID,
  34. Version: buglyRunVersion.Version,
  35. BatchID: uid,
  36. Status: model.BuglyBatchRunStatusRunning,
  37. }
  38. if err = s.dao.InsertBuglyBatchRun(buglyBatchRun); err != nil {
  39. return
  40. }
  41. s.batchRunCache.Do(context.Background(), func(ctx context.Context) {
  42. if err = s.runPerVersion(uid, buglyRunVersion); err != nil {
  43. log.Error("runPerVersion uid(%s) version(%s) err(%v) ", uid, buglyRunVersion.Version, err)
  44. }
  45. defer func() {
  46. // batch run 并且更新batch run表
  47. batchStatus := model.BuglyBatchRunStatusDone
  48. errMsg := ""
  49. endTime := time.Now()
  50. if err != nil {
  51. batchStatus = model.BuglyBatchRunStatusFailed
  52. errMsg = err.Error()
  53. }
  54. updateBuglyBatchRun := &model.BuglyBatchRun{
  55. ID: buglyBatchRun.ID,
  56. Status: batchStatus,
  57. ErrorMsg: errMsg,
  58. EndTime: endTime,
  59. }
  60. if err = s.dao.UpdateBuglyBatchRun(updateBuglyBatchRun); err != nil {
  61. log.Error("runPerVersion UpdateBuglyBatchRun uid(%s) version(%s) err(%v) ", uid, buglyRunVersion.Version, err)
  62. }
  63. }()
  64. })
  65. break
  66. }
  67. }
  68. rep = make(map[string]interface{})
  69. rep["uid"] = uid
  70. rep["enable_to_run"] = isEnableRun
  71. if !isEnableRun {
  72. err = ecode.MartheTaskInRunning
  73. }
  74. return
  75. }
  76. // BatchRunVersions Batch Run Versions
  77. func (s *Service) BatchRunVersions() (err error) {
  78. var (
  79. buglyRunVersions []*model.BuglyVersion
  80. uid = uuid.NewV4().String()
  81. )
  82. // 获取可以跑的版本号list
  83. if buglyRunVersions, err = s.dao.FindEnableAndReadyVersions(); err != nil {
  84. log.Error("GetBatchRunVersions err(%v) ", err)
  85. return
  86. }
  87. for _, buglyRunVersion := range buglyRunVersions {
  88. if _, err = s.RunVersions(buglyRunVersion.ID); err != nil {
  89. log.Error("runPerVersion uid(%s) version(%s) err(%v) ", uid, buglyRunVersion.Version, err)
  90. }
  91. }
  92. return
  93. }
  94. func (s *Service) runPerVersion(uid string, buglyRunVersion *model.BuglyVersion) (err error) {
  95. var (
  96. buglyRet *model.BugRet
  97. buglyCookie *model.BuglyCookie
  98. requestPageCnt int
  99. c = context.Background()
  100. lastRunTime time.Time
  101. buglyBatchRun *model.BuglyBatchRun
  102. buglyProject *model.BuglyProject
  103. )
  104. log.Info("start run version: [%s] batchId: [%s]", buglyRunVersion.Version, uid)
  105. if buglyProject, err = s.dao.QueryBuglyProject(buglyRunVersion.BuglyProjectID); err != nil {
  106. return
  107. }
  108. if buglyProject.ID == 0 {
  109. err = ecode.NothingFound
  110. return
  111. }
  112. //get issue total count
  113. bugIssueRequest := &model.BugIssueRequest{
  114. ProjectID: buglyProject.ProjectID,
  115. PlatformID: buglyProject.PlatformID,
  116. Version: buglyRunVersion.Version,
  117. ExceptionType: buglyProject.ExceptionType,
  118. StartNum: 0,
  119. Rows: 1,
  120. }
  121. //get last run time
  122. if buglyBatchRun, err = s.dao.QueryLastSuccessBatchRunByVersion(buglyRunVersion.Version); err != nil {
  123. return
  124. }
  125. if buglyBatchRun.ID > 0 {
  126. lastRunTime = buglyBatchRun.CTime
  127. } else {
  128. loc, _ := time.LoadLocation("Local")
  129. if lastRunTime, err = time.ParseInLocation(model.TimeLayout, model.TimeLayout, loc); err != nil {
  130. return
  131. }
  132. }
  133. //get enable cookie
  134. if buglyCookie, err = s.GetEnableCookie(); err != nil {
  135. return
  136. }
  137. // if cookie, update cookie as expired
  138. defer func() {
  139. if err != nil && err == ecode.MartheCookieExpired {
  140. s.DisableCookie(c, buglyCookie.ID)
  141. }
  142. }()
  143. if buglyRet, err = s.dao.BuglyIssueAndRetry(c, buglyCookie, bugIssueRequest); err != nil || len(buglyRet.BugIssues) < 1 {
  144. return
  145. }
  146. //获取issue count 和 page 上限
  147. requestPageCnt = int(buglyRet.NumFound/s.c.Bugly.IssuePageSize) + 1
  148. requestPageCntMax := s.c.Bugly.IssueCountUpper / s.c.Bugly.IssuePageSize
  149. if requestPageCnt > requestPageCntMax {
  150. requestPageCnt = requestPageCntMax
  151. }
  152. // update or add issue
  153. for i := 0; i < requestPageCnt; i++ {
  154. innerBreak := false
  155. bugIssueRequest.StartNum = s.c.Bugly.IssuePageSize * i
  156. bugIssueRequest.Rows = s.c.Bugly.IssuePageSize
  157. var ret *model.BugRet
  158. if ret, err = s.dao.BuglyIssueAndRetry(c, buglyCookie, bugIssueRequest); err != nil {
  159. return
  160. }
  161. loc, _ := time.LoadLocation("Local")
  162. issueLink := "/v2/crash-reporting/errors/%s/%s/report?pid=%s&searchType=detail&version=%s&start=0&date=all"
  163. for _, issueDto := range ret.BugIssues {
  164. var (
  165. issueTime time.Time
  166. bugIssueDetail *model.BugIssueDetail
  167. tagStr string
  168. bugDetail = "no detail"
  169. )
  170. tmpTime := []rune(issueDto.LastTime)
  171. issueTime, _ = time.ParseInLocation(model.TimeLayout, string(tmpTime[:len(tmpTime)-4]), loc)
  172. //issue时间早于库里面最新时间的,跳出
  173. if issueTime.Before(lastRunTime) {
  174. innerBreak = true
  175. break
  176. }
  177. var tmpBuglyIssue *model.BuglyIssue
  178. if tmpBuglyIssue, err = s.dao.GetBuglyIssue(issueDto.IssueID, issueDto.Version); err != nil {
  179. log.Error("d.GetSaveIssues url(%s) err(%v)", "GetSaveIssues", err)
  180. continue
  181. }
  182. for _, bugTag := range issueDto.Tags {
  183. tagStr = tagStr + bugTag.TagName + ","
  184. }
  185. if tmpBuglyIssue.ID != 0 {
  186. //update
  187. issueRecord := &model.BuglyIssue{
  188. IssueNo: issueDto.IssueID,
  189. Title: issueDto.Title,
  190. LastTime: issueTime,
  191. HappenTimes: issueDto.Count,
  192. UserTimes: issueDto.UserCount,
  193. Version: issueDto.Version,
  194. Tags: tagStr,
  195. }
  196. s.dao.UpdateBuglyIssue(issueRecord)
  197. } else {
  198. //create
  199. if bugIssueDetail, err = s.dao.BuglyIssueDetailAndRetry(c, buglyCookie, buglyProject.ProjectID, buglyProject.PlatformID, issueDto.IssueID); err == nil {
  200. bugDetail = bugIssueDetail.CallStack
  201. }
  202. issueURL := s.c.Bugly.Host + fmt.Sprintf(issueLink, buglyProject.ProjectID, issueDto.IssueID, buglyProject.PlatformID, buglyRunVersion.Version)
  203. issueRecord := &model.BuglyIssue{
  204. IssueNo: issueDto.IssueID,
  205. Title: issueDto.Title,
  206. LastTime: issueTime,
  207. HappenTimes: issueDto.Count,
  208. UserTimes: issueDto.UserCount,
  209. Version: issueDto.Version,
  210. Tags: tagStr,
  211. Detail: bugDetail,
  212. ExceptionMsg: issueDto.ExceptionMsg,
  213. KeyStack: issueDto.KeyStack,
  214. IssueLink: issueURL,
  215. ProjectID: buglyProject.ProjectID,
  216. }
  217. s.dao.InsertBuglyIssue(issueRecord)
  218. }
  219. }
  220. if innerBreak {
  221. break
  222. }
  223. }
  224. log.Info("end run version: [%s] batchId: [%s]", buglyRunVersion.Version, uid)
  225. return
  226. }
  227. // GetBatchRunVersions Get Batch Run Versions.
  228. func (s *Service) GetBatchRunVersions() (buglyRunVersions []*model.BuglyVersion, err error) {
  229. var (
  230. buglyVersions []*model.BuglyVersion
  231. buglyBatchRuns []*model.BuglyBatchRun
  232. )
  233. if buglyVersions, err = s.dao.FindEnableAndReadyVersions(); err != nil {
  234. return
  235. }
  236. if buglyBatchRuns, err = s.dao.QueryBuglyBatchRunsByStatus(model.BuglyBatchRunStatusRunning); err != nil {
  237. return
  238. }
  239. // 排除正在执行的版本
  240. for _, buglyVersion := range buglyVersions {
  241. var isVersionRun bool
  242. for _, buglyBatchRun := range buglyBatchRuns {
  243. if buglyBatchRun.Version == buglyVersion.Version {
  244. isVersionRun = true
  245. break
  246. }
  247. }
  248. if !isVersionRun {
  249. buglyRunVersions = append(buglyRunVersions, buglyVersion)
  250. }
  251. }
  252. return
  253. }
  254. // DisableBatchRunOverTime Disable Batch Run OverTime.
  255. func (s *Service) DisableBatchRunOverTime() (err error) {
  256. var (
  257. buglyBatchRuns []*model.BuglyBatchRun
  258. tapdBugRecords []*model.TapdBugRecord
  259. timeNow = time.Now()
  260. )
  261. //清 未完成 disable batch run
  262. if buglyBatchRuns, err = s.dao.QueryBuglyBatchRunsByStatus(model.BuglyBatchRunStatusRunning); err != nil {
  263. return
  264. }
  265. for _, buglyBatchRun := range buglyBatchRuns {
  266. if timeNow.Sub(buglyBatchRun.CTime).Hours() > float64(s.c.Scheduler.BatchRunOverHourTime) {
  267. updateBuglyBatchRun := &model.BuglyBatchRun{
  268. ID: buglyBatchRun.ID,
  269. Status: model.BuglyBatchRunStatusFailed,
  270. ErrorMsg: "over time",
  271. EndTime: timeNow,
  272. }
  273. if err = s.dao.UpdateBuglyBatchRun(updateBuglyBatchRun); err != nil {
  274. continue
  275. }
  276. }
  277. }
  278. // 清 未完成 disable insert tapd bug
  279. if tapdBugRecords, err = s.dao.QueryTapdBugRecordByStatus(model.InsertBugStatusRunning); err != nil {
  280. return
  281. }
  282. for _, tapdBugRecord := range tapdBugRecords {
  283. if timeNow.Sub(tapdBugRecord.CTime).Hours() > float64(s.c.Scheduler.BatchRunOverHourTime) {
  284. tapdBugRecord.Status = model.InsertBugStatusFailed
  285. if err = s.dao.UpdateTapdBugRecord(tapdBugRecord); err != nil {
  286. continue
  287. }
  288. }
  289. }
  290. return
  291. }
  292. func (s *Service) getBatchRunLock(buglyVersionId int64) (batchRunLock *sync.Mutex) {
  293. s.syncBatchRunLock.Lock()
  294. defer s.syncBatchRunLock.Unlock()
  295. var ok bool
  296. if batchRunLock, ok = s.mapBatchRunLocks[buglyVersionId]; !ok {
  297. batchRunLock = new(sync.Mutex)
  298. s.mapBatchRunLocks[buglyVersionId] = batchRunLock
  299. }
  300. return
  301. }