sign_task_job.go 8.5 KB


  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/admin/main/up/util"
  6. "go-common/app/job/main/up/conf"
  7. "go-common/app/job/main/up/dao/upcrm"
  8. "go-common/app/job/main/up/model/signmodel"
  9. "go-common/app/job/main/up/model/upcrmmodel"
  10. v1 "go-common/app/service/main/archive/api"
  11. "go-common/library/log"
  12. xtime "go-common/library/time"
  13. "github.com/jinzhu/gorm"
  14. )
  15. //CheckTaskJob check task job
  16. func (s *Service) CheckTaskJob(tm time.Time) {
  17. // 今天计算昨天的数据
  18. var yesterday = tm.AddDate(0, 0, -1)
  19. log.Info("start to run CheckTaskJob, date=%s, yesterday=%s", tm, yesterday)
  20. s.CheckTaskFinish(yesterday)
  21. log.Info("finish run CheckTaskJob, date=%s", tm)
  22. }
  23. //Archive data
  24. type Archive struct {
  25. ID int64 `gorm:"column:id"`
  26. }
  27. //CheckTaskFinish check task finish, calculate datas in (-,date]
  28. func (s *Service) CheckTaskFinish(date time.Time) {
  29. // 1.查找所有有效的合同id, begin_date <= date && end _date >= date
  30. // 2.找到所有合同id对应的任务id,
  31. // 3.根据任务类型,日、周、月、累计,计算任务周期[a,b)
  32. // 4.计算完成数量
  33. var crmdb = s.crmdb.GetDb()
  34. var dateStr = date.Format(upcrmmodel.TimeFmtDate)
  35. var offset = 0
  36. var limit = 200
  37. var actualSize = limit
  38. log.Info("start to check task state")
  39. archiveDb, err := gorm.Open("mysql", conf.Conf.ArchiveOrm.DSN)
  40. s.crmdb.StartTask(upcrmmodel.TaskTypeSignTaskCalculate, date)
  41. archiveDb.LogMode(true)
  42. if err != nil {
  43. log.Error("connect archive db fail")
  44. return
  45. }
  46. defer archiveDb.Close()
  47. defer func() {
  48. if err == nil {
  49. s.crmdb.FinishTask(upcrmmodel.TaskTypeSignTaskCalculate, date, upcrmmodel.TaskStateFinish)
  50. } else {
  51. s.crmdb.FinishTask(upcrmmodel.TaskTypeSignTaskCalculate, date, upcrmmodel.TaskStateError)
  52. }
  53. }()
  54. var taskTotalCount = 0
  55. for actualSize == limit {
  56. var signUps []*signmodel.SignUp
  57. var signUpMap = make(map[uint32]*signmodel.SignUp)
  58. // 1
  59. err = crmdb.Table(signmodel.TableNameSignUp).
  60. Select("id, begin_date, end_date").
  61. Where("begin_date <= ? and end_date >= ?", dateStr, dateStr).
  62. Offset(offset).
  63. Limit(limit).
  64. Find(&signUps).Error
  65. offset += limit
  66. if err != nil && err != gorm.ErrRecordNotFound {
  67. log.Error("err get signs, err=%+v", err)
  68. return
  69. }
  70. actualSize = len(signUps)
  71. var signIDs []uint32
  72. for _, v := range signUps {
  73. signUpMap[v.ID] = v
  74. signIDs = append(signIDs, v.ID)
  75. }
  76. // 2
  77. var taskList []*signmodel.SignTask
  78. err = crmdb.Where("sign_id in (?) and state != ? and generate_date<?", signIDs, signmodel.SignTaskStateDelete, dateStr).
  79. Find(&taskList).Error
  80. if err != nil {
  81. log.Error("err get tasks, err=%+v", err)
  82. return
  83. }
  84. // 3
  85. for _, task := range taskList {
  86. taskTotalCount++
  87. if task.Mid == 0 {
  88. log.Error("task's mid is zero, please check! task id=%d", task.ID)
  89. continue
  90. }
  91. var signInfo, ok = signUpMap[task.SignID]
  92. if !ok {
  93. log.Error("sign not found, err=%v", err)
  94. continue
  95. }
  96. // 4 计算数量
  97. err = s.checkSingleTask(task, signInfo, date, archiveDb)
  98. if err != nil {
  99. log.Error("check task err, task_id=%d, err=%v", task.ID, err)
  100. continue
  101. }
  102. }
  103. log.Info("finish to check task state, task total num=%d", taskTotalCount)
  104. }
  105. }
  106. // get task history, if not exist, then will create it
  107. func (s *Service) getOrCreateTaskHistory(task *signmodel.SignTask, generateDate time.Time) (res *signmodel.SignTaskHistory, err error) {
  108. var crmdb = s.crmdb.GetDb()
  109. res = new(signmodel.SignTaskHistory)
  110. err = crmdb.Select("*").Where("task_template_id=? and generate_date=?", task.ID, generateDate).
  111. Find(&res).Error
  112. // 创建一条,如果没找到的话
  113. if err == gorm.ErrRecordNotFound {
  114. res = &signmodel.SignTaskHistory{
  115. Mid: task.Mid,
  116. SignID: task.SignID,
  117. TaskTemplateID: task.ID,
  118. TaskType: task.TaskType,
  119. TaskCondition: task.TaskCondition,
  120. Attribute: task.Attribute,
  121. GenerateDate: xtime.Time(generateDate.Unix()),
  122. State: signmodel.SignTaskStateRunning,
  123. }
  124. err = crmdb.Save(&res).Error
  125. if err != nil {
  126. log.Error("create task history fail, err=%v, task=%v", err, task)
  127. return
  128. }
  129. }
  130. return
  131. }
  132. // check task state
  133. func (s *Service) checkSingleTask(task *signmodel.SignTask, signInfo *signmodel.SignUp, date time.Time, archiveDb *gorm.DB) (err error) {
  134. var taskBegin, taskEnd time.Time
  135. if task.TaskType == signmodel.TaskTypeAccumulate {
  136. taskBegin = signInfo.BeginDate.Time()
  137. taskEnd = signInfo.EndDate.Time()
  138. } else {
  139. taskBegin, taskEnd = upcrm.GetTaskDuration(date, task.TaskType)
  140. }
  141. if task.Mid == 0 {
  142. log.Error("task's mid is zero, please check! task id=%d", task.ID)
  143. return
  144. }
  145. // get task history
  146. // 如果是累计任务,这里的taskBegin要设置为0
  147. var tBegin = taskBegin
  148. if task.TaskType == signmodel.TaskTypeAccumulate {
  149. tBegin = time.Time{}
  150. }
  151. taskHistory, err := s.getOrCreateTaskHistory(task, tBegin)
  152. if err != nil {
  153. log.Error("get task history fail, task=%+v, err=%+v", task, err)
  154. return
  155. }
  156. var dateStr = date.Format(upcrmmodel.TimeFmtDate)
  157. switch {
  158. default:
  159. var crmdb = s.crmdb.GetDb()
  160. // 4.去稿件库中查找对应的稿件数量
  161. var archiveCount = 0
  162. var archiveList []*Archive
  163. err = archiveDb.Table("archive").
  164. Where("mid = ? and ctime>= ? and ctime <? and (state >= 0 or state = -6)",
  165. task.Mid, taskBegin, taskEnd).
  166. Select("id").
  167. Find(&archiveList).
  168. Error
  169. if err != nil && err != gorm.ErrRecordNotFound {
  170. log.Error("check archive count fail, taskid=%+v err=%+v", task, err)
  171. break
  172. }
  173. var finalResult []int64
  174. // 5.任务完成度统计时,
  175. // 若录入为不包含商单,
  176. // 则任务完成数=新增稿件数-减去(绿洲/商单报备)稿件数+请假任务数;
  177. // 若录入为包含商单,
  178. // 则任务完成数=新增稿件数+请假任务数;
  179. // 如果没有archive,就直接返回
  180. if len(archiveList) != 0 {
  181. // 需要判断商单
  182. if task.IsAttrSet(signmodel.SignTaskAttrBitBusiness) {
  183. var ids []int64
  184. for _, v := range archiveList {
  185. ids = append(ids, v.ID)
  186. }
  187. ids = util.Unique(ids)
  188. // 查询archive服务
  189. archiveResult, e := s.arcRPC.Arcs(context.Background(), &v1.ArcsRequest{Aids: ids})
  190. if e != nil {
  191. err = e
  192. log.Error("get archive result err, err=%+v", err)
  193. break
  194. }
  195. for _, v := range archiveList {
  196. a, ok := archiveResult.Arcs[v.ID]
  197. // 是商单的要排除
  198. if !ok || a.OrderID > 0 {
  199. continue
  200. }
  201. finalResult = append(finalResult, v.ID)
  202. }
  203. } else {
  204. for _, v := range archiveList {
  205. finalResult = append(finalResult, v.ID)
  206. }
  207. }
  208. archiveCount = len(finalResult)
  209. }
  210. // 请假任务数
  211. var absence signmodel.SignTaskAbsence
  212. err = crmdb.Select("sum(absence_count) as absence_count").
  213. Where("task_history_id=? and state!=?", taskHistory.ID, signmodel.SignTaskAbsenceStateDelete).
  214. Find(&absence).Error
  215. if err != nil {
  216. log.Error("get task absence fail, task history=%+v", taskHistory)
  217. return
  218. }
  219. archiveCount += int(absence.AbsenceCount)
  220. log.Info("task count=%d, archive=%d, absence=%d, task=%+v", archiveCount, len(finalResult), absence.AbsenceCount, taskHistory)
  221. // 更新task history的数量
  222. task.TaskCounter = int32(archiveCount)
  223. var tx = crmdb.Begin()
  224. defer func() {
  225. if r := recover(); r != nil || err != nil {
  226. log.Error("roll back task update, task=%+v, r=%+v | err=%+v", task, r, err)
  227. tx.Rollback()
  228. }
  229. }()
  230. err = tx.Table(signmodel.TableNameSignTask).Where("id=?", task.ID).
  231. Updates(map[string]interface{}{
  232. "generate_date": dateStr,
  233. }).Error
  234. if err != nil {
  235. log.Error("update sign task fail, task=%+v, err=%+v", task, err)
  236. return
  237. }
  238. // update history
  239. var state = signmodel.SignTaskStateRunning
  240. if archiveCount >= int(task.TaskCondition) {
  241. state = signmodel.SignTaskStateFinish
  242. }
  243. err = tx.Table(signmodel.TableNameSignTaskHistory).Where("id=?", taskHistory.ID).
  244. Updates(map[string]interface{}{
  245. "task_counter": task.TaskCounter,
  246. "task_condition": task.TaskCondition,
  247. "state": state,
  248. "attribute": task.Attribute,
  249. "task_type": task.TaskType,
  250. }).Error
  251. if err != nil {
  252. log.Error("update sign task history fail, task=%+v, err=%+v", task, err)
  253. return
  254. }
  255. // update sign
  256. err = tx.Table(signmodel.TableNameSignUp).Where("id=?", task.SignID).
  257. Updates(map[string]interface{}{
  258. "task_state": state,
  259. }).Error
  260. if err != nil {
  261. log.Error("update sign up fail, task=%+v, err=%+v", task, err)
  262. return
  263. }
  264. err = tx.Commit().Error
  265. if err != nil {
  266. log.Error("commit err, err=%+v", err)
  267. }
  268. }
  269. return
  270. }