123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- package service
- import (
- "context"
- "strconv"
- "strings"
- "time"
- "go-common/app/job/main/aegis/model"
- )
- func (s *Service) taskProc() {
- for {
- // 检索超时任务,进行释放
- s.dao.TaskRelease(context.Background(), time.Now().Add(-10*time.Minute))
- // 检索过期登陆用户,进行踢出
- s.checkKickOut(context.Background())
- time.Sleep(10 * time.Minute)
- s.syncReport(context.Background())
- s.taskClear()
- }
- }
- func (s *Service) checkKickOut(c context.Context) {
- s.ccMux.RLock()
- defer s.ccMux.RUnlock()
- for bizfwid, uidm := range s.consumerCache {
- for uid := range uidm {
- pos := strings.Index(bizfwid, "-")
- bizid, _ := strconv.Atoi(bizfwid[:pos])
- flowid, _ := strconv.Atoi(bizfwid[pos+1:])
- if on, err := s.dao.IsConsumerOn(c, bizid, flowid, uid); err == nil && !on {
- delete(s.consumerCache[bizfwid], uid)
- s.KickOut(c, int64(bizid), int64(flowid), uid)
- }
- }
- }
- }
- // KickOut 踢出过期用户并释放任务
- func (s *Service) KickOut(c context.Context, bizid, flowid, uid int64) {
- // 1. 踢出用户
- s.dao.KickOutConsumer(c, int64(bizid), int64(flowid), uid)
- s.sendTaskLog(c, &model.Task{BusinessID: bizid, FlowID: flowid}, model.LogTypeTaskConsumer, "kickout", uid, "")
- // 2. 释放任务
- s.dao.ReleaseByConsumer(c, bizid, flowid, uid)
- }
- func (s *Service) taskClear() {
- mt := time.Now().Add(-3 * 24 * time.Hour)
- for {
- rows, err := s.dao.TaskClear(context.Background(), mt, 1000)
- if err != nil || rows == 0 {
- break
- }
- time.Sleep(time.Second)
- }
- }
|