service.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "go-common/app/admin/ep/marthe/conf"
  6. "go-common/app/admin/ep/marthe/dao"
  7. "go-common/app/admin/ep/marthe/model"
  8. "go-common/library/sync/pipeline/fanout"
  9. "github.com/robfig/cron"
  10. )
  11. // Service struct
  12. type Service struct {
  13. c *conf.Config
  14. dao *dao.Dao
  15. batchRunCache *fanout.Fanout
  16. tapdBugCache *fanout.Fanout
  17. taskCache *fanout.Fanout
  18. cron *cron.Cron
  19. syncWechatContactsLock sync.Mutex
  20. syncTapdBugInsertLock sync.Mutex
  21. mapTapdBugInsertLocks map[int64]*sync.Mutex
  22. syncBatchRunLock sync.Mutex
  23. mapBatchRunLocks map[int64]*sync.Mutex
  24. }
  25. // New init
  26. func New(c *conf.Config) (s *Service) {
  27. s = &Service{
  28. c: c,
  29. dao: dao.New(c),
  30. batchRunCache: fanout.New("batchRunCache", fanout.Worker(5), fanout.Buffer(10240)),
  31. tapdBugCache: fanout.New("tapdBugCache", fanout.Worker(5), fanout.Buffer(10240)),
  32. taskCache: fanout.New("taskCache", fanout.Worker(5), fanout.Buffer(10240)),
  33. mapTapdBugInsertLocks: make(map[int64]*sync.Mutex),
  34. mapBatchRunLocks: make(map[int64]*sync.Mutex),
  35. }
  36. if c.Scheduler.Active {
  37. s.cron = cron.New()
  38. // 定时批量 跑enable version 抓bugly数据
  39. if err := s.cron.AddFunc(c.Scheduler.BatchRunEnableVersion, func() { s.BatchRunTask(model.TaskBatchRunVersions, s.BatchRunVersions) }); err != nil {
  40. panic(err)
  41. }
  42. // 定时把超过三小时为执行完毕的任务修改为失败
  43. if err := s.cron.AddFunc(c.Scheduler.DisableBatchRunOverTime, func() { s.BatchRunTask(model.TaskDisableBatchRunOverTime, s.DisableBatchRunOverTime) }); err != nil {
  44. panic(err)
  45. }
  46. // 定时更新tapd bug
  47. if err := s.cron.AddFunc(c.Scheduler.BatchRunUpdateTapdBug, func() { s.BatchRunTask(model.TaskBatchRunUpdateBugInTapd, s.BatchRunUpdateBugInTapd) }); err != nil {
  48. panic(err)
  49. }
  50. // 定时更新SyncWechatContact
  51. if err := s.cron.AddFunc(c.Scheduler.SyncWechatContact, func() { s.BatchRunTask(model.TaskSyncWechatContact, s.SyncWechatContacts) }); err != nil {
  52. panic(err)
  53. }
  54. s.cron.Start()
  55. }
  56. return s
  57. }
  58. // Ping Service
  59. func (s *Service) Ping(c context.Context) (err error) {
  60. return s.dao.Ping(c)
  61. }
  62. // Close Service
  63. func (s *Service) Close() {
  64. s.dao.Close()
  65. }
  66. // BatchRunTask Batch Run Task.
  67. func (s *Service) BatchRunTask(taskName string, task func() error) {
  68. var err error
  69. scheduleTask := &model.ScheduleTask{
  70. Name: taskName,
  71. Status: model.TaskStatusRunning,
  72. }
  73. if err = s.dao.InsertScheduleTask(scheduleTask); err != nil {
  74. return
  75. }
  76. err = task()
  77. defer func() {
  78. if err != nil {
  79. scheduleTask.Status = model.TaskStatusFailed
  80. } else {
  81. scheduleTask.Status = model.TaskStatusDone
  82. }
  83. if err = s.dao.UpdateScheduleTask(scheduleTask); err != nil {
  84. return
  85. }
  86. }()
  87. }