service.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/passport-sns/conf"
  5. "go-common/app/job/main/passport-sns/dao"
  6. "go-common/app/job/main/passport-sns/model"
  7. "go-common/library/queue/databus"
  8. "go-common/library/queue/databus/databusutil"
  9. "go-common/library/sync/pipeline/fanout"
  10. )
  11. // Service service.
  12. type Service struct {
  13. c *conf.Config
  14. d *dao.Dao
  15. snsLogConsumer *databus.Databus
  16. asoBinLogConsumer *databus.Databus
  17. group *databusutil.Group
  18. snsChan []chan *model.AsoAccountSns
  19. checkChan []chan *model.AsoAccountSns
  20. cache *fanout.Fanout
  21. }
  22. // New new a service instance.
  23. func New(c *conf.Config) (s *Service) {
  24. s = &Service{
  25. c: c,
  26. d: dao.New(c),
  27. snsLogConsumer: databus.New(c.DataBus.SnsLogSub),
  28. snsChan: make([]chan *model.AsoAccountSns, c.SyncConf.ChanNum),
  29. checkChan: make([]chan *model.AsoAccountSns, c.SyncConf.ChanNum),
  30. cache: fanout.New("cache", fanout.Worker(10), fanout.Buffer(10240)),
  31. }
  32. go s.snsLogConsume()
  33. if c.SyncConf.IncSwitch {
  34. s.asoBinLogConsumer = databus.New(c.DataBus.AsoBinLogSub)
  35. s.group = databusutil.NewGroup(
  36. c.DatabusUtil,
  37. s.asoBinLogConsumer.Messages(),
  38. )
  39. s.asoBinLogConsume()
  40. }
  41. if c.SyncConf.FullSwitch {
  42. for i := 0; i < c.SyncConf.ChanNum; i++ {
  43. ch := make(chan *model.AsoAccountSns, c.SyncConf.ChanSize)
  44. s.snsChan[i] = ch
  45. go s.fullSyncSnsConsume(ch)
  46. }
  47. go s.fullSyncSns()
  48. }
  49. if c.SyncConf.CheckSwitch {
  50. for i := 0; i < c.SyncConf.ChanNum; i++ {
  51. ch := make(chan *model.AsoAccountSns, c.SyncConf.ChanSize)
  52. s.checkChan[i] = ch
  53. go s.checkConsume(ch)
  54. }
  55. go s.checkAll()
  56. }
  57. return
  58. }
  59. // Ping check server ok.
  60. func (s *Service) Ping(c context.Context) (err error) {
  61. return s.d.Ping(c)
  62. }
  63. // Close close service, including databus and outer service.
  64. func (s *Service) Close() (err error) {
  65. s.d.Close()
  66. return
  67. }
  68. func parsePlatformStr(platform int) string {
  69. switch platform {
  70. case model.PlatformQQ:
  71. return model.PlatformQQStr
  72. case model.PlatformWEIBO:
  73. return model.PlatformWEIBOStr
  74. }
  75. return ""
  76. }