task.go 7.9 KB


  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "strconv"
  9. "time"
  10. "go-common/app/job/main/dm2/model"
  11. "go-common/app/job/main/dm2/model/oplog"
  12. "go-common/library/log"
  13. )
  14. func (s *Service) taskResProc() {
  15. var (
  16. c = context.Background()
  17. tasks []*model.TaskInfo
  18. err error
  19. )
  20. ticker := time.NewTicker(time.Duration(s.conf.TaskConf.ResInterval))
  21. defer ticker.Stop()
  22. for range ticker.C {
  23. if tasks, err = s.dao.TaskInfos(c, model.TaskStateSearch); err != nil {
  24. log.Error("s.dao.TaskInfos error(%v)", err)
  25. continue
  26. }
  27. for _, task := range tasks {
  28. count, url, state, err := s.dao.TaskSearchRes(c, task)
  29. if err != nil {
  30. log.Error("s.dao.TaskSearchRes(%+v) error(%v)", task, err)
  31. continue
  32. }
  33. if state == model.TaskSearchFail {
  34. task.State = model.TaskStateFail
  35. } else if state == model.TaskSearchSuc {
  36. task.Result = url
  37. task.Count = count
  38. if task.Sub > 0 {
  39. task.State = model.TaskStateWait
  40. } else {
  41. task.State = model.TaskStateSuc
  42. }
  43. }
  44. s.dao.UpdateTask(c, task)
  45. }
  46. }
  47. }
  48. func (s *Service) taskDelProc() {
  49. var (
  50. c = context.Background()
  51. err error
  52. )
  53. ticker := time.NewTicker(time.Duration(s.conf.TaskConf.DelInterval))
  54. defer ticker.Stop()
  55. for range ticker.C {
  56. if err = s.taskSchedule(c); err != nil {
  57. log.Error("taskDelProc error(%v)", err)
  58. continue
  59. }
  60. }
  61. }
  62. func (s *Service) taskSchedule(c context.Context) (err error) {
  63. var (
  64. ok bool
  65. now = time.Now()
  66. expire = now.Add(time.Duration(s.conf.TaskConf.DelInterval))
  67. expireStr = expire.Format(time.RFC3339)
  68. oldExpireStr, oldExpireGetSetStr string
  69. oldExpire time.Time
  70. )
  71. if ok, err = s.dao.SetnxTaskJob(c, expireStr); err != nil {
  72. return
  73. }
  74. // redis中不存在
  75. if ok {
  76. if err = s.taskDelJob(c); err != nil {
  77. s.dao.DelTaskJob(c)
  78. log.Error("taskDelJob,error(%v)", err)
  79. return
  80. }
  81. return
  82. }
  83. // redis中已经存在
  84. // 判断是否过期了
  85. if oldExpireStr, err = s.dao.GetTaskJob(c); err != nil {
  86. return
  87. }
  88. if oldExpire, err = time.Parse(time.RFC3339, oldExpireStr); err != nil {
  89. return
  90. }
  91. if oldExpire.Sub(now) > 0 {
  92. return
  93. }
  94. if oldExpireGetSetStr, err = s.dao.GetSetTaskJob(c, expireStr); err != nil {
  95. return
  96. }
  97. if oldExpireGetSetStr != oldExpireStr {
  98. return
  99. }
  100. if err = s.taskDelJob(c); err != nil {
  101. s.dao.DelTaskJob(c)
  102. log.Error("taskDelJob,error(%v)", err)
  103. return
  104. }
  105. return
  106. }
  107. // TODO: operation_time && operation_rate
  108. func (s *Service) taskDelJob(c context.Context) (err error) {
  109. var (
  110. task *model.TaskInfo
  111. )
  112. if task, err = s.dao.OneTask(c); err != nil || task == nil {
  113. return
  114. }
  115. task.State = model.TaskStateDelDM
  116. s.dao.UpdateTask(c, task)
  117. var delCount int64
  118. if delCount, task.LastIndex, task.State, err = s.taskDelDM(c, task); err != nil {
  119. return
  120. }
  121. if task.State == model.TaskStateDelDM {
  122. task.State = model.TaskStateSuc
  123. }
  124. if _, err = s.dao.UptSubTask(c, task.ID, delCount, time.Now()); err != nil {
  125. return
  126. }
  127. _, err = s.dao.UpdateTask(c, task)
  128. return
  129. }
  130. func (s *Service) taskDelDM(c context.Context, eTask *model.TaskInfo) (delCount int64, lastIndex, state int32, err error) {
  131. taskDelNum := s.conf.TaskConf.DelNum
  132. taskResFieldLen := s.conf.TaskConf.ResFieldLen
  133. res, err := http.Get(eTask.Result)
  134. if err != nil {
  135. log.Error("s.taskDelDM.HttpGet(%s) error(%v)", eTask.Result, err)
  136. return
  137. }
  138. resp, err := ioutil.ReadAll(res.Body)
  139. if err != nil {
  140. res.Body.Close()
  141. log.Error("s.taskDelDM.ioutilRead error(%v)", err)
  142. return
  143. }
  144. res.Body.Close()
  145. lines := bytes.Split(resp, []byte("\n"))
  146. total := len(lines)
  147. n := (total-1)/taskDelNum + 1
  148. for i := int(eTask.LastIndex); i < n; i++ {
  149. var (
  150. task *model.TaskInfo
  151. subTask *model.SubTask
  152. )
  153. start := i * taskDelNum
  154. end := (i + 1) * taskDelNum
  155. if end > total {
  156. end = total
  157. }
  158. OidDMid := make(map[int64][]int64)
  159. for _, line := range lines[start:end] {
  160. var dmid, oid int64
  161. fields := bytes.Split(line, []byte("\001"))
  162. if len(fields) < taskResFieldLen {
  163. log.Error("fields lenth too small:%d", len(fields))
  164. continue
  165. }
  166. if dmid, err = strconv.ParseInt(string(fields[0]), 10, 64); err != nil {
  167. log.Error("ParseInt(%s) error(%v)", string(fields[0]), err)
  168. continue
  169. }
  170. if oid, err = strconv.ParseInt(string(fields[1]), 10, 64); err != nil {
  171. log.Error("ParseInt(%s) error(%v)", string(fields[1]), err)
  172. continue
  173. }
  174. OidDMid[oid] = append(OidDMid[oid], dmid)
  175. }
  176. for oid, dmids := range OidDMid {
  177. var affected int64
  178. if affected, err = s.dao.DelDMs(c, oid, dmids, model.StateTaskDel); err != nil {
  179. log.Error("dm task(id:%d) del dm(oid:%d,dmids:%v) error(%v)", eTask.ID, oid, dmids, err)
  180. continue
  181. }
  182. if affected > 0 {
  183. s.OpLog(c, oid, 0, time.Now().Unix(), int(model.SubTypeVideo), dmids, "status", "", strconv.FormatInt(int64(model.StateTaskDel), 10), "弹幕任务删除", oplog.SourceManager, oplog.OperatorSystem)
  184. delCount += affected
  185. if _, err = s.dao.UptSubjectCount(c, model.SubTypeVideo, oid, affected); err != nil {
  186. log.Error("dm task update count(oid:%d,affected:%d) error(%v)", oid, affected, err)
  187. }
  188. }
  189. time.Sleep(50 * time.Millisecond)
  190. }
  191. if len(OidDMid) > 0 {
  192. log.Warn("dm task(id:%d) del dm(oid,dmids:%+v)", eTask.ID, OidDMid)
  193. }
  194. lastIndex = int32(i + 1)
  195. task, err = s.dao.OneTask(c)
  196. if err == nil && task != nil && task.ID != eTask.ID && task.Priority > eTask.Priority {
  197. state = model.TaskStateWait
  198. return
  199. }
  200. if eTask, err = s.dao.TaskInfoByID(c, eTask.ID); err != nil || task == nil {
  201. continue
  202. }
  203. state = eTask.State
  204. if state != model.TaskStateDelDM {
  205. return
  206. }
  207. if subTask, err = s.dao.SubTask(c, eTask.ID); err != nil || subTask == nil {
  208. continue
  209. }
  210. tCount := subTask.Tcount + delCount
  211. if tCount >= s.conf.TaskConf.DelLimit && subTask.Tcount < s.conf.TaskConf.DelLimit {
  212. log.Warn("task(id:%d) del dm reach limit(count:%d)", eTask.ID, tCount)
  213. s.sendWechatWorkMsg(c, eTask, tCount)
  214. state = model.TaskStatePause
  215. return
  216. }
  217. }
  218. return
  219. }
  220. func (s *Service) sendWechatWorkMsg(c context.Context, task *model.TaskInfo, count int64) (err error) {
  221. content := fmt.Sprintf(model.TaskNoticeContent, task.ID, task.Title, count)
  222. users := s.conf.TaskConf.MsgCC
  223. users = append(users, task.Creator, task.Reviewer)
  224. return s.dao.SendWechatWorkMsg(c, content, model.TaskNoticeTitle, users)
  225. }
  226. // OpLog put a new infoc format operation log into the channel
  227. func (s *Service) OpLog(c context.Context, cid, operator, OperationTime int64, typ int, dmids []int64, subject, originVal, currentVal, remark string, source oplog.Source, operatorType oplog.OperatorType) (err error) {
  228. infoLog := new(oplog.Infoc)
  229. infoLog.Oid = cid
  230. infoLog.Type = typ
  231. infoLog.DMIds = dmids
  232. infoLog.Subject = subject
  233. infoLog.OriginVal = originVal
  234. infoLog.CurrentVal = currentVal
  235. infoLog.OperationTime = strconv.FormatInt(OperationTime, 10)
  236. infoLog.Source = source
  237. infoLog.OperatorType = operatorType
  238. infoLog.Operator = operator
  239. infoLog.Remark = remark
  240. select {
  241. case s.opsLogCh <- infoLog:
  242. default:
  243. err = fmt.Errorf("opsLogCh full")
  244. log.Error("opsLogCh full (%v)", infoLog)
  245. }
  246. return
  247. }
  248. func (s *Service) oplogproc() {
  249. for opLog := range s.opsLogCh {
  250. if len(opLog.Subject) == 0 || len(opLog.CurrentVal) == 0 || opLog.Source <= 0 ||
  251. opLog.Operator < 0 || opLog.OperatorType <= 0 {
  252. log.Warn("oplogproc() it is an illegal log, warn(%v, %v, %v)", opLog.Subject, opLog.Subject, opLog.CurrentVal)
  253. continue
  254. } else {
  255. for _, dmid := range opLog.DMIds {
  256. if dmid > 0 {
  257. s.dmOperationLogSvc.Info(opLog.Subject, strconv.FormatInt(opLog.Oid, 10), strconv.Itoa(opLog.Type),
  258. strconv.FormatInt(dmid, 10), opLog.Source.String(), opLog.OriginVal,
  259. opLog.CurrentVal, strconv.FormatInt(opLog.Operator, 10), opLog.OperatorType.String(),
  260. opLog.OperationTime, opLog.Remark)
  261. } else {
  262. log.Warn("oplogproc() it is an illegal log, for dmid value, warn(%d, %+v)", dmid, opLog)
  263. }
  264. }
  265. }
  266. }
  267. }