service.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/interface/main/mcn/tool/worker"
  7. "go-common/app/job/main/up/conf"
  8. "go-common/app/job/main/up/dao/account"
  9. "go-common/app/job/main/up/dao/email"
  10. "go-common/app/job/main/up/dao/upcrm"
  11. archive "go-common/app/service/main/archive/api"
  12. upGRPCv1 "go-common/app/service/main/up/api/v1"
  13. "go-common/library/queue/databus"
  14. "github.com/robfig/cron"
  15. "go-common/app/admin/main/up/util/databusutil"
  16. )
  17. // Service struct
  18. type Service struct {
  19. c *conf.Config
  20. maildao *email.Dao
  21. crmdb *upcrm.Dao
  22. acc *account.Dao
  23. arcRPC archive.ArchiveClient
  24. cron *cron.Cron
  25. worker *worker.Pool
  26. wg sync.WaitGroup
  27. archiveNotifyT *databus.Databus
  28. archiveT *databus.Databus
  29. closeCh chan struct{}
  30. upRPC upGRPCv1.UpClient
  31. databusHandler *databusutil.DatabusHandler
  32. }
  33. // New init
  34. func New(c *conf.Config) (s *Service) {
  35. s = &Service{
  36. c: c,
  37. cron: cron.New(),
  38. crmdb: upcrm.New(c),
  39. acc: account.New(c),
  40. maildao: email.New(c),
  41. worker: worker.New(&worker.Conf{
  42. WorkerProcMax: 10,
  43. QueueSize: 1024,
  44. WorkerNumber: 4}),
  45. archiveNotifyT: databus.New(c.DatabusConf.ArchiveNotify),
  46. archiveT: databus.New(c.DatabusConf.Archive),
  47. closeCh: make(chan struct{}),
  48. databusHandler: databusutil.NewDatabusHandler(),
  49. }
  50. var err error
  51. s.arcRPC, err = archive.NewClient(c.GRPCClient.Archive)
  52. if err != nil {
  53. panic(err)
  54. }
  55. if err = s.initEmailTemplate(); err != nil {
  56. panic(err)
  57. }
  58. if s.upRPC, err = upGRPCv1.NewClient(c.GRPCClient.Up); err != nil {
  59. panic(err)
  60. }
  61. s.createJobs()
  62. s.databusHandler.GoWatch(s.archiveNotifyT, s.handleArchiveNotifyT)
  63. s.databusHandler.GoWatch(s.archiveT, s.handleArchiveT)
  64. return s
  65. }
  66. func (s *Service) createJobs() {
  67. s.cron.AddFunc(conf.Conf.Job.UpCheckDateDueTaskTime, cronWrap(s.CheckDateDueJob))
  68. s.cron.AddFunc(conf.Conf.Job.TaskScheduleTime, cronWrap(s.CheckTaskJob))
  69. s.cron.AddFunc(conf.Conf.Job.CheckStateJobTime, cronWrap(s.CheckStateJob))
  70. s.cron.AddFunc(conf.Conf.Job.UpdateUpTidJobTime, cronWrap(s.UpdateUpTidJob))
  71. s.cron.Start()
  72. }
  73. func cronWrap(f func(tm time.Time)) func() {
  74. return func() {
  75. f(time.Now())
  76. }
  77. }
  78. // Ping Service
  79. func (s *Service) Ping(c context.Context) (err error) {
  80. return s.crmdb.Ping(c)
  81. }
  82. // Close Service
  83. func (s *Service) Close() {
  84. s.databusHandler.Close()
  85. s.wg.Wait()
  86. s.crmdb.Close()
  87. }