1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package service
- import (
- "context"
- "go-common/app/job/main/passport-sns/conf"
- "go-common/app/job/main/passport-sns/dao"
- "go-common/app/job/main/passport-sns/model"
- "go-common/library/queue/databus"
- "go-common/library/queue/databus/databusutil"
- "go-common/library/sync/pipeline/fanout"
- )
- // Service service.
- type Service struct {
- c *conf.Config
- d *dao.Dao
- snsLogConsumer *databus.Databus
- asoBinLogConsumer *databus.Databus
- group *databusutil.Group
- snsChan []chan *model.AsoAccountSns
- checkChan []chan *model.AsoAccountSns
- cache *fanout.Fanout
- }
- // New new a service instance.
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- d: dao.New(c),
- snsLogConsumer: databus.New(c.DataBus.SnsLogSub),
- snsChan: make([]chan *model.AsoAccountSns, c.SyncConf.ChanNum),
- checkChan: make([]chan *model.AsoAccountSns, c.SyncConf.ChanNum),
- cache: fanout.New("cache", fanout.Worker(10), fanout.Buffer(10240)),
- }
- go s.snsLogConsume()
- if c.SyncConf.IncSwitch {
- s.asoBinLogConsumer = databus.New(c.DataBus.AsoBinLogSub)
- s.group = databusutil.NewGroup(
- c.DatabusUtil,
- s.asoBinLogConsumer.Messages(),
- )
- s.asoBinLogConsume()
- }
- if c.SyncConf.FullSwitch {
- for i := 0; i < c.SyncConf.ChanNum; i++ {
- ch := make(chan *model.AsoAccountSns, c.SyncConf.ChanSize)
- s.snsChan[i] = ch
- go s.fullSyncSnsConsume(ch)
- }
- go s.fullSyncSns()
- }
- if c.SyncConf.CheckSwitch {
- for i := 0; i < c.SyncConf.ChanNum; i++ {
- ch := make(chan *model.AsoAccountSns, c.SyncConf.ChanSize)
- s.checkChan[i] = ch
- go s.checkConsume(ch)
- }
- go s.checkAll()
- }
- return
- }
- // Ping check server ok.
- func (s *Service) Ping(c context.Context) (err error) {
- return s.d.Ping(c)
- }
- // Close close service, including databus and outer service.
- func (s *Service) Close() (err error) {
- s.d.Close()
- return
- }
- func parsePlatformStr(platform int) string {
- switch platform {
- case model.PlatformQQ:
- return model.PlatformQQStr
- case model.PlatformWEIBO:
- return model.PlatformWEIBOStr
- }
- return ""
- }
|