task_consumer.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/admin/main/aegis/model/common"
  5. "go-common/app/admin/main/aegis/model/task"
  6. "go-common/library/log"
  7. "go-common/library/sync/errgroup"
  8. )
  9. // On 上线
  10. func (s *Service) On(c context.Context, opt *common.BaseOptions) (err error) {
  11. if err = s.mc.ConsumerOn(c, opt); err != nil {
  12. log.Error("s.mc.ConsumerOn mc错误(%v)", err)
  13. err = nil
  14. }
  15. if err = s.mysql.ConsumerOn(c, opt); err != nil {
  16. return
  17. }
  18. go s.sendTaskConsumerLog(context.TODO(), "on", opt)
  19. return
  20. }
  21. // Off 离线
  22. func (s *Service) Off(c context.Context, opt *common.BaseOptions) (err error) {
  23. if err = s.mc.ConsumerOff(c, opt); err != nil {
  24. log.Error("s.mc.ConsumerOff mc错误(%v)", err)
  25. err = nil
  26. }
  27. if err = s.mysql.ConsumerOff(c, opt); err != nil {
  28. return
  29. }
  30. go s.sendTaskConsumerLog(context.TODO(), "off", opt)
  31. return
  32. }
  33. // KickOut 踢出
  34. func (s *Service) KickOut(c context.Context, opt *common.BaseOptions, kickuid int64) (err error) {
  35. opt.UID = kickuid
  36. unames, _ := s.http.GetUnames(c, []int64{kickuid})
  37. opt.Uname = unames[kickuid]
  38. return s.Off(c, opt)
  39. }
  40. // Watcher 监控管理
  41. func (s *Service) Watcher(c context.Context, bizid, flowid int64, role int8) (watchers []*task.WatchItem, err error) {
  42. // 从数据库拿出24小时内有变化的或者依然在线的
  43. var wis []*task.WatchItem
  44. if wis, err = s.mysql.ConsumerStat(c, bizid, flowid); err != nil {
  45. return
  46. }
  47. bopt := &common.BaseOptions{
  48. BusinessID: bizid,
  49. FlowID: flowid,
  50. }
  51. var (
  52. onuids, offuids []int64
  53. inxmap = make(map[int64]int)
  54. )
  55. for _, item := range wis {
  56. bopt.UID = item.UID
  57. bopt.Uname = ""
  58. uname, urole, err := s.GetRole(c, bopt)
  59. // 组员列表展示 组员 + 角色获取失败的 + 非任务角色(例如平台管理员)
  60. if err != nil || role != urole || len(uname) == 0 {
  61. if role == task.TaskRoleLeader || urole == task.TaskRoleLeader {
  62. continue
  63. }
  64. }
  65. isOn, _ := s.mc.IsConsumerOn(c, bopt)
  66. item.Role = urole
  67. item.IsOnLine = isOn
  68. item.Uname = uname
  69. if item.IsOnLine {
  70. onuids = append(onuids, item.UID)
  71. item.LastOn = item.Mtime.Format("2006-01-02 15:04:05")
  72. } else {
  73. offuids = append(offuids, item.UID)
  74. item.LastOff = item.Mtime.Format("2006-01-02 15:04:05")
  75. }
  76. inxmap[item.UID] = len(watchers)
  77. watchers = append(watchers, item)
  78. }
  79. // 补充laston 或者 lastoff
  80. wg, ctx := errgroup.WithContext(c)
  81. if len(onuids) > 0 {
  82. wg.Go(func() error {
  83. at, err := s.searchConsumerLog(ctx, bizid, flowid, []string{"off", "kickout"}, onuids, len(onuids))
  84. if err == nil {
  85. for uid, ctime := range at {
  86. watchers[inxmap[uid]].LastOff = ctime
  87. }
  88. }
  89. return err
  90. })
  91. }
  92. if len(offuids) > 0 {
  93. wg.Go(func() error {
  94. at, err := s.searchConsumerLog(ctx, bizid, flowid, []string{"on"}, offuids, len(offuids))
  95. if err == nil {
  96. for uid, ctime := range at {
  97. watchers[inxmap[uid]].LastOn = ctime
  98. }
  99. }
  100. return err
  101. })
  102. }
  103. wg.Go(func() error {
  104. trans := func(c context.Context, ids []int64) (map[int64][]interface{}, error) {
  105. return s.MemberStats(c, bizid, flowid, ids)
  106. }
  107. return s.mulIDtoName(ctx, watchers, trans, "UID", "Count", "CompleteRate", "PassRate", "AvgUT")
  108. })
  109. wg.Wait()
  110. return
  111. }
  112. // IsOn .
  113. func (s *Service) IsOn(c context.Context, opt *common.BaseOptions) bool {
  114. on, err := s.mc.IsConsumerOn(c, opt)
  115. if err != nil {
  116. log.Error("s.mc.ConsumerOff mc错误(%v)", err)
  117. if on, err = s.mysql.IsConsumerOn(c, opt); err != nil {
  118. log.Error("s.mysql.ConsumerOff mysql错误(%v)", err)
  119. }
  120. }
  121. return on
  122. }