123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- package service
- import (
- "context"
- "sync"
- "time"
- "go-common/app/interface/main/mcn/tool/worker"
- "go-common/app/job/main/up/conf"
- "go-common/app/job/main/up/dao/account"
- "go-common/app/job/main/up/dao/email"
- "go-common/app/job/main/up/dao/upcrm"
- archive "go-common/app/service/main/archive/api"
- upGRPCv1 "go-common/app/service/main/up/api/v1"
- "go-common/library/queue/databus"
- "github.com/robfig/cron"
- "go-common/app/admin/main/up/util/databusutil"
- )
- // Service struct
- type Service struct {
- c *conf.Config
- maildao *email.Dao
- crmdb *upcrm.Dao
- acc *account.Dao
- arcRPC archive.ArchiveClient
- cron *cron.Cron
- worker *worker.Pool
- wg sync.WaitGroup
- archiveNotifyT *databus.Databus
- archiveT *databus.Databus
- closeCh chan struct{}
- upRPC upGRPCv1.UpClient
- databusHandler *databusutil.DatabusHandler
- }
- // New init
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- cron: cron.New(),
- crmdb: upcrm.New(c),
- acc: account.New(c),
- maildao: email.New(c),
- worker: worker.New(&worker.Conf{
- WorkerProcMax: 10,
- QueueSize: 1024,
- WorkerNumber: 4}),
- archiveNotifyT: databus.New(c.DatabusConf.ArchiveNotify),
- archiveT: databus.New(c.DatabusConf.Archive),
- closeCh: make(chan struct{}),
- databusHandler: databusutil.NewDatabusHandler(),
- }
- var err error
- s.arcRPC, err = archive.NewClient(c.GRPCClient.Archive)
- if err != nil {
- panic(err)
- }
- if err = s.initEmailTemplate(); err != nil {
- panic(err)
- }
- if s.upRPC, err = upGRPCv1.NewClient(c.GRPCClient.Up); err != nil {
- panic(err)
- }
- s.createJobs()
- s.databusHandler.GoWatch(s.archiveNotifyT, s.handleArchiveNotifyT)
- s.databusHandler.GoWatch(s.archiveT, s.handleArchiveT)
- return s
- }
- func (s *Service) createJobs() {
- s.cron.AddFunc(conf.Conf.Job.UpCheckDateDueTaskTime, cronWrap(s.CheckDateDueJob))
- s.cron.AddFunc(conf.Conf.Job.TaskScheduleTime, cronWrap(s.CheckTaskJob))
- s.cron.AddFunc(conf.Conf.Job.CheckStateJobTime, cronWrap(s.CheckStateJob))
- s.cron.AddFunc(conf.Conf.Job.UpdateUpTidJobTime, cronWrap(s.UpdateUpTidJob))
- s.cron.Start()
- }
- func cronWrap(f func(tm time.Time)) func() {
- return func() {
- f(time.Now())
- }
- }
- // Ping Service
- func (s *Service) Ping(c context.Context) (err error) {
- return s.crmdb.Ping(c)
- }
- // Close Service
- func (s *Service) Close() {
- s.databusHandler.Close()
- s.wg.Wait()
- s.crmdb.Close()
- }
|