123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- package service
- import (
- "context"
- "go-common/app/admin/main/aegis/model/common"
- "go-common/app/admin/main/aegis/model/task"
- "go-common/library/log"
- "go-common/library/sync/errgroup"
- )
- // On 上线
- func (s *Service) On(c context.Context, opt *common.BaseOptions) (err error) {
- if err = s.mc.ConsumerOn(c, opt); err != nil {
- log.Error("s.mc.ConsumerOn mc错误(%v)", err)
- err = nil
- }
- if err = s.mysql.ConsumerOn(c, opt); err != nil {
- return
- }
- go s.sendTaskConsumerLog(context.TODO(), "on", opt)
- return
- }
- // Off 离线
- func (s *Service) Off(c context.Context, opt *common.BaseOptions) (err error) {
- if err = s.mc.ConsumerOff(c, opt); err != nil {
- log.Error("s.mc.ConsumerOff mc错误(%v)", err)
- err = nil
- }
- if err = s.mysql.ConsumerOff(c, opt); err != nil {
- return
- }
- go s.sendTaskConsumerLog(context.TODO(), "off", opt)
- return
- }
- // KickOut 踢出
- func (s *Service) KickOut(c context.Context, opt *common.BaseOptions, kickuid int64) (err error) {
- opt.UID = kickuid
- unames, _ := s.http.GetUnames(c, []int64{kickuid})
- opt.Uname = unames[kickuid]
- return s.Off(c, opt)
- }
- // Watcher 监控管理
- func (s *Service) Watcher(c context.Context, bizid, flowid int64, role int8) (watchers []*task.WatchItem, err error) {
- // 从数据库拿出24小时内有变化的或者依然在线的
- var wis []*task.WatchItem
- if wis, err = s.mysql.ConsumerStat(c, bizid, flowid); err != nil {
- return
- }
- bopt := &common.BaseOptions{
- BusinessID: bizid,
- FlowID: flowid,
- }
- var (
- onuids, offuids []int64
- inxmap = make(map[int64]int)
- )
- for _, item := range wis {
- bopt.UID = item.UID
- bopt.Uname = ""
- uname, urole, err := s.GetRole(c, bopt)
- // 组员列表展示 组员 + 角色获取失败的 + 非任务角色(例如平台管理员)
- if err != nil || role != urole || len(uname) == 0 {
- if role == task.TaskRoleLeader || urole == task.TaskRoleLeader {
- continue
- }
- }
- isOn, _ := s.mc.IsConsumerOn(c, bopt)
- item.Role = urole
- item.IsOnLine = isOn
- item.Uname = uname
- if item.IsOnLine {
- onuids = append(onuids, item.UID)
- item.LastOn = item.Mtime.Format("2006-01-02 15:04:05")
- } else {
- offuids = append(offuids, item.UID)
- item.LastOff = item.Mtime.Format("2006-01-02 15:04:05")
- }
- inxmap[item.UID] = len(watchers)
- watchers = append(watchers, item)
- }
- // 补充laston 或者 lastoff
- wg, ctx := errgroup.WithContext(c)
- if len(onuids) > 0 {
- wg.Go(func() error {
- at, err := s.searchConsumerLog(ctx, bizid, flowid, []string{"off", "kickout"}, onuids, len(onuids))
- if err == nil {
- for uid, ctime := range at {
- watchers[inxmap[uid]].LastOff = ctime
- }
- }
- return err
- })
- }
- if len(offuids) > 0 {
- wg.Go(func() error {
- at, err := s.searchConsumerLog(ctx, bizid, flowid, []string{"on"}, offuids, len(offuids))
- if err == nil {
- for uid, ctime := range at {
- watchers[inxmap[uid]].LastOn = ctime
- }
- }
- return err
- })
- }
- wg.Go(func() error {
- trans := func(c context.Context, ids []int64) (map[int64][]interface{}, error) {
- return s.MemberStats(c, bizid, flowid, ids)
- }
- return s.mulIDtoName(ctx, watchers, trans, "UID", "Count", "CompleteRate", "PassRate", "AvgUT")
- })
- wg.Wait()
- return
- }
- // IsOn .
- func (s *Service) IsOn(c context.Context, opt *common.BaseOptions) bool {
- on, err := s.mc.IsConsumerOn(c, opt)
- if err != nil {
- log.Error("s.mc.ConsumerOff mc错误(%v)", err)
- if on, err = s.mysql.IsConsumerOn(c, opt); err != nil {
- log.Error("s.mysql.ConsumerOff mysql错误(%v)", err)
- }
- }
- return on
- }
|