123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- package service
- import (
- "context"
- "sync"
- "go-common/app/admin/ep/marthe/conf"
- "go-common/app/admin/ep/marthe/dao"
- "go-common/app/admin/ep/marthe/model"
- "go-common/library/sync/pipeline/fanout"
- "github.com/robfig/cron"
- )
- // Service struct
- type Service struct {
- c *conf.Config
- dao *dao.Dao
- batchRunCache *fanout.Fanout
- tapdBugCache *fanout.Fanout
- taskCache *fanout.Fanout
- cron *cron.Cron
- syncWechatContactsLock sync.Mutex
- syncTapdBugInsertLock sync.Mutex
- mapTapdBugInsertLocks map[int64]*sync.Mutex
- syncBatchRunLock sync.Mutex
- mapBatchRunLocks map[int64]*sync.Mutex
- }
- // New init
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- dao: dao.New(c),
- batchRunCache: fanout.New("batchRunCache", fanout.Worker(5), fanout.Buffer(10240)),
- tapdBugCache: fanout.New("tapdBugCache", fanout.Worker(5), fanout.Buffer(10240)),
- taskCache: fanout.New("taskCache", fanout.Worker(5), fanout.Buffer(10240)),
- mapTapdBugInsertLocks: make(map[int64]*sync.Mutex),
- mapBatchRunLocks: make(map[int64]*sync.Mutex),
- }
- if c.Scheduler.Active {
- s.cron = cron.New()
- // 定时批量 跑enable version 抓bugly数据
- if err := s.cron.AddFunc(c.Scheduler.BatchRunEnableVersion, func() { s.BatchRunTask(model.TaskBatchRunVersions, s.BatchRunVersions) }); err != nil {
- panic(err)
- }
- // 定时把超过三小时为执行完毕的任务修改为失败
- if err := s.cron.AddFunc(c.Scheduler.DisableBatchRunOverTime, func() { s.BatchRunTask(model.TaskDisableBatchRunOverTime, s.DisableBatchRunOverTime) }); err != nil {
- panic(err)
- }
- // 定时更新tapd bug
- if err := s.cron.AddFunc(c.Scheduler.BatchRunUpdateTapdBug, func() { s.BatchRunTask(model.TaskBatchRunUpdateBugInTapd, s.BatchRunUpdateBugInTapd) }); err != nil {
- panic(err)
- }
- // 定时更新SyncWechatContact
- if err := s.cron.AddFunc(c.Scheduler.SyncWechatContact, func() { s.BatchRunTask(model.TaskSyncWechatContact, s.SyncWechatContacts) }); err != nil {
- panic(err)
- }
- s.cron.Start()
- }
- return s
- }
- // Ping Service
- func (s *Service) Ping(c context.Context) (err error) {
- return s.dao.Ping(c)
- }
- // Close Service
- func (s *Service) Close() {
- s.dao.Close()
- }
- // BatchRunTask Batch Run Task.
- func (s *Service) BatchRunTask(taskName string, task func() error) {
- var err error
- scheduleTask := &model.ScheduleTask{
- Name: taskName,
- Status: model.TaskStatusRunning,
- }
- if err = s.dao.InsertScheduleTask(scheduleTask); err != nil {
- return
- }
- err = task()
- defer func() {
- if err != nil {
- scheduleTask.Status = model.TaskStatusFailed
- } else {
- scheduleTask.Status = model.TaskStatusDone
- }
- if err = s.dao.UpdateScheduleTask(scheduleTask); err != nil {
- return
- }
- }()
- }
|