123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- package service
- import (
- "context"
- "errors"
- "time"
- "go-common/app/admin/main/aegis/model/common"
- taskmod "go-common/app/admin/main/aegis/model/task"
- "go-common/library/cache/redis"
- "go-common/library/ecode"
- "go-common/library/log"
- "github.com/jinzhu/gorm"
- )
- //ERROR
- var (
- ErrTaskMiss = errors.New("task miss")
- )
- // NextTask 下一个任务
- func (s *Service) NextTask(c context.Context, opt *taskmod.NextOptions) (tasks []*taskmod.Task, count int64, err error) {
- log.Info("task-NextTask opt(%+v)", opt)
- if count, err = s.countPersonalTask(c, &opt.BaseOptions, opt.NoCache); err != nil {
- return
- }
- if count < opt.DispatchCount {
- if count, err = s.syncSeize(c, opt); err != nil {
- return
- }
- }
- /* 去掉异步抢占
- else if count < opt.SeizeCount {
- s.asyncSeize(opt)
- }
- */
- if count == 0 {
- return
- }
- return s.dispatch(c, opt)
- }
- // ListTasks 实时列表,停滞列表,延迟列表
- func (s *Service) ListTasks(c context.Context, opt *taskmod.ListOptions) (tasks []*taskmod.Task, count int64, err error) {
- switch opt.State {
- case 1: // 实时任务,从redis取出,在数据库校验
- tasks, count, err = s.listUnseized(c, opt)
- case 2: // 停滞任务,组员的直接从redis取。组长的从数据库取id,redis取任务
- tasks, count, err = s.listMyTasks(c, "seized", opt)
- case 3: // 延迟任务,组员的直接从redis取。组长的从数据库取id,redis取任务
- tasks, count, err = s.listMyTasks(c, "delayd", opt)
- case 4: // 指派停滞任务,从数据库取id,redis取任务
- tasks, count, err = s.listMyTasks(c, "assignd", opt)
- default: // 所有未完成任务
- }
- if err != nil {
- tasks, count, err = s.mysql.ListTasks(c, opt)
- }
- opt.Total = int(count)
- return
- }
- // Task 直接读取某个任务
- func (s *Service) Task(c context.Context, tid int64) (task *taskmod.Task, err error) {
- return s.mysql.TaskFromDB(c, tid)
- }
- // TxSubmitTask 提交任务
- func (s *Service) TxSubmitTask(c context.Context, ormTx *gorm.DB, opt *common.BaseOptions, state int8) (ostate int8, otaskid, ouid int64, err error) {
- var (
- t *taskmod.Task
- rows int64
- )
- // 根据rid,flowid检索最新的未完成任务
- if t, err = s.gorm.TaskByRID(c, opt.RID, opt.FlowID); err != nil || t == nil || t.ID == 0 {
- log.Warn("TaskByRID(%d,%d) miss(%v)", opt.RID, opt.FlowID, err)
- t, err = s.gorm.TaskByRID(c, opt.RID, 0)
- }
- // TODO 先默认一个资源只在一个flow下分发,解决目前存在flow状态与task状态不同步
- if err != nil || t == nil || t.ID == 0 {
- log.Warn("s.gorm.TaskByRID(%d,%d) miss(%v)", opt.RID, 0, err)
- err = nil
- return
- }
- ostate = t.State
- ouid = t.UID
- otaskid = t.ID
- var utime uint64
- if !t.Gtime.Time().IsZero() {
- utime = uint64(time.Since(t.Gtime.Time()).Seconds())
- }
- subopt := &taskmod.SubmitOptions{
- BaseOptions: *opt,
- TaskID: t.ID,
- OldUID: t.UID,
- Utime: utime,
- OldState: t.State,
- }
- // 1. 改数据库
- if rows, err = s.gorm.TxSubmit(ormTx, subopt, state); err != nil {
- return
- }
- if rows != 1 {
- err = ecode.NothingFound
- log.Error("Submit (%v) error(%v)", opt, err)
- return
- }
- return
- }
- func (s *Service) submitTaskCache(c context.Context, opt *common.BaseOptions, ostate int8, taskid, ouid int64) {
- log.Info("SubmitTaskCache opt(%+v) ostate(%d) taskid(%d) ouid(%d)", opt, ostate, taskid, ouid)
- optc := &common.BaseOptions{
- BusinessID: opt.BusinessID,
- FlowID: opt.FlowID,
- UID: ouid,
- }
- if ostate == taskmod.TaskStateDelay {
- s.redis.RemoveDelayTask(c, optc, taskid)
- return
- }
- s.redis.RemovePersonalTask(c, optc, taskid)
- }
- // Delay 延迟任务
- func (s *Service) Delay(c context.Context, opt *taskmod.DelayOptions) (err error) {
- var (
- taskmod *taskmod.Task
- rows int64
- )
- if taskmod, err = s.mysql.TaskFromDB(c, opt.TaskID); err != nil || taskmod == nil {
- return
- }
- if !s.checkDelayOption(c, opt, taskmod) {
- log.Error("checkDelayOption error opt(%+v) taskmod(%+v)", opt, taskmod)
- return ecode.AegisTaskFinish
- }
- if rows, err = s.mysql.Delay(c, opt); err != nil {
- return
- }
- if rows != 1 {
- err = ecode.AegisTaskFinish
- log.Error("Submit (%v) error(%v)", opt, err)
- return
- }
- if err = s.redis.RemovePersonalTask(c, &opt.BaseOptions, opt.TaskID); err != nil {
- return
- }
- s.redis.PushDelayTask(c, &opt.BaseOptions, opt.TaskID)
- return
- }
- // Release 释放任务
- func (s *Service) Release(c context.Context, opt *common.BaseOptions, delay bool) (rows int64, err error) {
- if rows, err = s.mysql.Release(c, opt, delay); err != nil {
- return
- }
- //err = s.redis.Release(c, opt, delay)
- return
- }
- // MaxWeight 当前最高权重
- func (s *Service) MaxWeight(c context.Context, opt *common.BaseOptions) (max int64, err error) {
- return s.gorm.MaxWeight(c, opt.BusinessID, opt.FlowID)
- }
- // UnDoStat undo stat
- func (s *Service) UnDoStat(c context.Context, opt *common.BaseOptions) (stat *taskmod.UnDOStat, err error) {
- return s.gorm.UndoStat(c, opt.BusinessID, opt.FlowID, opt.UID)
- }
- // TaskStat task stat
- func (s *Service) TaskStat(c context.Context, opt *common.BaseOptions) (stat *taskmod.Stat, err error) {
- return s.gorm.TaskStat(c, opt.BusinessID, opt.FlowID, opt.UID)
- }
- func (s *Service) countPersonalTask(c context.Context, opt *common.BaseOptions, nocache bool) (count int64, err error) {
- log.Info("task-countPersonalTask opt(%+v) nocache(%v)", opt, nocache)
- defer func() { log.Info("task-countPersonalTask count(%d) err(%v)", count, err) }()
- if nocache {
- return s.mysql.CountPersonal(c, opt)
- }
- if count, err = s.redis.CountPersonalTask(c, opt); err != nil {
- // redis 挂了
- if count, err = s.mysql.CountPersonal(c, opt); err != nil {
- return
- }
- }
- return
- }
- func (s *Service) syncSeize(c context.Context, opt *taskmod.NextOptions) (count int64, err error) {
- return s.seize(c, opt)
- }
- func (s *Service) seize(c context.Context, opt *taskmod.NextOptions) (count int64, err error) {
- log.Info("task-seize opt(%+v)", opt)
- defer func() { log.Info("task-seize count(%d) err(%v)", count, err) }()
- var (
- hitids, missids []int64
- others map[int64]int64
- )
- // TODO: 抢占任务要根据用户是否在线,处理任务指派
- if opt.NoCache {
- hitids, err = s.mysql.QueryForSeize(c, opt.BusinessID, opt.FlowID, opt.UID, opt.SeizeCount)
- } else {
- hitids, missids, others, err = s.redis.SeizeTask(c, opt.BusinessID, opt.FlowID, opt.UID, opt.SeizeCount)
- if err != nil {
- hitids, err = s.mysql.QueryForSeize(c, opt.BusinessID, opt.FlowID, opt.UID, opt.SeizeCount)
- }
- }
- if err != nil {
- return
- }
- log.Info("seize uid(%d) hitids(%v), missids(%v), others(%v)", opt.UID, hitids, missids, others)
- if !opt.NoCache && len(missids) > 0 {
- log.Error("seize uid(%d) missids(%v)", opt.UID, missids)
- for _, id := range missids {
- if err = s.syncTask(c, id); err != nil {
- s.redis.RemovePublicTask(c, &opt.BaseOptions, id)
- }
- }
- }
- if len(hitids) > 0 {
- log.Info("seize uid(%d) hitids(%v)", opt.UID, hitids)
- mhits := make(map[int64]int64)
- for _, id := range hitids {
- mhits[id] = opt.UID
- }
- if count, err = s.mysql.Seize(c, mhits); err != nil || count == 0 {
- return
- }
- return
- }
- return
- }
- func (s *Service) dispatch(c context.Context, opt *taskmod.NextOptions) (tasks []*taskmod.Task, count int64, err error) {
- log.Info("task-dispatch opt(%+v)", opt)
- defer func() { log.Info("task-dispatch tasks(%+v) count(%d) err(%v)", tasks, count, err) }()
- listopt := &taskmod.ListOptions{
- BaseOptions: opt.BaseOptions,
- Pager: common.Pager{
- Pn: 1,
- Ps: int(opt.DispatchCount),
- }}
- tasks, count, err = s.calibur(c, listopt, s.redis.RangePersonalTask, s.mysql.DispatchByID, s.redis.RemovePersonalTask)
- if err != nil {
- tasks, count, err = s.mysql.DBDispatch(c, opt)
- }
- return
- }
- func (s *Service) syncTask(c context.Context, taskID int64) (err error) {
- var task *taskmod.Task
- if task, err = s.mysql.TaskFromDB(c, taskID); err != nil || task == nil {
- return ErrTaskMiss
- }
- var option = &common.BaseOptions{
- BusinessID: task.BusinessID,
- FlowID: task.FlowID,
- UID: task.UID,
- }
- s.redis.SetTask(c, task)
- switch task.State {
- case taskmod.TaskStateInit:
- s.redis.PushPublicTask(c, task)
- case taskmod.TaskStateDispatch:
- s.redis.RemovePublicTask(c, option, task.ID)
- s.redis.PushPersonalTask(c, option, task.ID)
- case taskmod.TaskStateDelay:
- s.redis.RemovePublicTask(c, option, task.ID)
- s.redis.PushDelayTask(c, option, task.ID)
- default:
- s.redis.RemovePublicTask(c, option, task.ID)
- }
- return
- }
- func (s *Service) listUnseized(c context.Context, opt *taskmod.ListOptions) (tasks []*taskmod.Task, count int64, err error) {
- return s.calibur(c, opt, s.redis.RangePublicTask, s.mysql.ListCheckUnSeized, s.redis.RemovePublicTask)
- }
- func (s *Service) listMyTasks(c context.Context, ltp string, opt *taskmod.ListOptions) (tasks []*taskmod.Task, count int64, err error) {
- if !opt.BisLeader {
- if ltp == "delayd" {
- return s.calibur(c, opt, s.redis.RangeDealyTask, s.mysql.ListCheckDelay, s.redis.RemoveDelayTask)
- }
- if ltp == "seized" {
- return s.calibur(c, opt, s.redis.RangePersonalTask, s.mysql.ListCheckSeized, s.redis.RemovePersonalTask)
- }
- }
- if opt.BisLeader {
- opt.UID = 0
- }
- var ids []int64
- switch ltp {
- case "delayd":
- ids, count, err = s.gorm.TaskListDelayd(c, opt)
- case "seized":
- ids, count, err = s.gorm.TaskListSeized(c, opt)
- case "assignd":
- ids, count, err = s.gorm.TaskListAssignd(c, opt)
- }
- if err != nil || len(ids) == 0 {
- return
- }
- if tasks, err = s.redis.GetTask(c, ids); err != nil {
- err = redis.ErrNil
- }
- return
- }
- func (s *Service) calibur(c context.Context, opt *taskmod.ListOptions, rfunc taskmod.RangeFunc, lfunc taskmod.ListFuncDB, remove taskmod.RemoveFunc) (taskmods []*taskmod.Task, count int64, err error) {
- var (
- hitids, missids []int64
- missmap map[int64]struct{}
- mtaskmods map[int64]*taskmod.Task
- )
- mtaskmods, count, hitids, missids, err = rfunc(c, opt)
- log.Info("calibur(%+v) rfunc count(%d) hitids(%v) missids(%v)", opt, count, hitids, missids)
- if err != nil {
- return
- }
- if len(missids) > 0 {
- for _, id := range missids {
- if err = s.syncTask(c, id); err != nil {
- log.Error("syncTask error(%v)", err)
- remove(c, &opt.BaseOptions, id)
- }
- }
- }
- if len(hitids) > 0 {
- if missmap, err = lfunc(c, mtaskmods, hitids, opt.UID); err != nil {
- log.Error("calibur lfunc error(%v)", err)
- return
- }
- if len(missmap) > 0 {
- log.Info("calibur personal任务移除%v", missmap)
- for id := range missmap {
- remove(c, &opt.BaseOptions, id)
- }
- }
- }
- for _, id := range hitids {
- if _, ok := missmap[id]; ok && opt.Action != "release" {
- delete(mtaskmods, id)
- } else {
- taskmods = append(taskmods, mtaskmods[id])
- }
- }
- return
- }
- /*
- func (s *Service) checkSubmitOption(c context.Context, opt *taskmod.SubmitOptions, task *taskmod.Task) bool {
- opt.OldState = task.State
- opt.OldUID = task.UID
- // 1. 组员只能处理自己的延迟任务
- if task.State == taskmod.TaskStateDelay {
- if opt.BisLeader {
- return true
- }
- if task.UID != opt.UID {
- return false
- }
- }
- if task.State == taskmod.TaskStateDispatch && opt.UID == task.UID {
- opt.Utime = uint64(time.Since(task.Gtime.Time()).Seconds())
- return true
- }
- return false
- }
- */
- func (s *Service) checkDelayOption(c context.Context, opt *taskmod.DelayOptions, task *taskmod.Task) bool {
- if task.State == taskmod.TaskStateDispatch && task.UID == opt.UID {
- return true
- }
- return false
- }
- func (s *Service) syncUpCache(c context.Context) (err error) {
- if s.Debug() == "local" {
- return
- }
- upGroup := make(map[int64]*common.Group)
- upgs, err := s.rpc.UpGroups(c)
- if err != nil || upgs == nil {
- return
- }
- for gid, upg := range upgs {
- if _, ok := upGroup[gid]; !ok {
- upGroup[gid] = &common.Group{
- ID: gid,
- Name: upg.Name,
- Note: upg.Note,
- Tag: upg.Tag,
- FontColor: upg.FontColor,
- BgColor: upg.BgColor,
- }
- log.Info("groupCache upg(%+v) upGroup(%+v)", upg, upGroup[gid])
- }
- }
- s.groupCache = upGroup
- log.Info("groupCache(%+v)", s.groupCache)
- return
- }
|