task_job.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package service
  2. import (
  3. "context"
  4. "strconv"
  5. "strings"
  6. "time"
  7. "go-common/app/job/main/aegis/model"
  8. )
  9. func (s *Service) taskProc() {
  10. for {
  11. // 检索超时任务,进行释放
  12. s.dao.TaskRelease(context.Background(), time.Now().Add(-10*time.Minute))
  13. // 检索过期登陆用户,进行踢出
  14. s.checkKickOut(context.Background())
  15. time.Sleep(10 * time.Minute)
  16. s.syncReport(context.Background())
  17. s.taskClear()
  18. }
  19. }
  20. func (s *Service) checkKickOut(c context.Context) {
  21. s.ccMux.RLock()
  22. defer s.ccMux.RUnlock()
  23. for bizfwid, uidm := range s.consumerCache {
  24. for uid := range uidm {
  25. pos := strings.Index(bizfwid, "-")
  26. bizid, _ := strconv.Atoi(bizfwid[:pos])
  27. flowid, _ := strconv.Atoi(bizfwid[pos+1:])
  28. if on, err := s.dao.IsConsumerOn(c, bizid, flowid, uid); err == nil && !on {
  29. delete(s.consumerCache[bizfwid], uid)
  30. s.KickOut(c, int64(bizid), int64(flowid), uid)
  31. }
  32. }
  33. }
  34. }
  35. // KickOut 踢出过期用户并释放任务
  36. func (s *Service) KickOut(c context.Context, bizid, flowid, uid int64) {
  37. // 1. 踢出用户
  38. s.dao.KickOutConsumer(c, int64(bizid), int64(flowid), uid)
  39. s.sendTaskLog(c, &model.Task{BusinessID: bizid, FlowID: flowid}, model.LogTypeTaskConsumer, "kickout", uid, "")
  40. // 2. 释放任务
  41. s.dao.ReleaseByConsumer(c, bizid, flowid, uid)
  42. }
  43. func (s *Service) taskClear() {
  44. mt := time.Now().Add(-3 * 24 * time.Hour)
  45. for {
  46. rows, err := s.dao.TaskClear(context.Background(), mt, 1000)
  47. if err != nil || rows == 0 {
  48. break
  49. }
  50. time.Sleep(time.Second)
  51. }
  52. }