service.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/account-summary/conf"
  5. "go-common/app/job/main/account-summary/dao"
  6. "go-common/library/log"
  7. "go-common/library/queue/databus"
  8. )
  9. // Service struct
  10. type Service struct {
  11. c *conf.Config
  12. dao *dao.Dao
  13. MemberBinLog *databus.Databus
  14. BlockBinLog *databus.Databus
  15. PassportBinLog *databus.Databus
  16. RelationBinLog *databus.Databus
  17. AccountSummaryProducer *databus.Databus
  18. }
  19. // New init
  20. func New(c *conf.Config) *Service {
  21. s := &Service{
  22. c: c,
  23. dao: dao.New(c),
  24. RelationBinLog: databus.New(c.RelationBinLog),
  25. MemberBinLog: databus.New(c.MemberBinLog),
  26. BlockBinLog: databus.New(c.BlockBinLog),
  27. PassportBinLog: databus.New(c.PassportBinLog),
  28. AccountSummaryProducer: databus.New(c.AccountSummaryProducer),
  29. }
  30. s.Main()
  31. return s
  32. }
  33. // Ping Service
  34. func (s *Service) Ping(c context.Context) error {
  35. return s.dao.Ping(c)
  36. }
  37. // Close Service
  38. func (s *Service) Close() {
  39. s.dao.Close()
  40. }
  41. // Main is
  42. func (s *Service) Main() {
  43. subproc := func() {
  44. worker := s.c.AccountSummary.SubProcessWorker
  45. if worker <= 0 {
  46. worker = 1
  47. }
  48. log.Info("Starting sub process with %d workers", worker)
  49. for i := uint64(0); i < worker; i++ {
  50. go s.memberBinLogproc(context.Background())
  51. go s.blockBinLogproc(context.Background())
  52. go s.passportBinLogproc(context.Background())
  53. go s.relationBinLogproc(context.Background())
  54. }
  55. }
  56. syncrange := func() {
  57. start := s.c.AccountSummary.SyncRangeStart
  58. if start <= 0 {
  59. start = 1
  60. }
  61. end := s.c.AccountSummary.SyncRangeEnd
  62. if end <= 0 {
  63. end = 1
  64. }
  65. worker := s.c.AccountSummary.SyncRangeWorker
  66. if worker <= 0 {
  67. worker = 1
  68. }
  69. go s.syncRangeproc(context.Background(), start, end, worker)
  70. }
  71. // initial := func() {
  72. // go s.initialproc(context.Background())
  73. // }
  74. if !s.c.FeatureGate.DisableSubProcess {
  75. subproc()
  76. }
  77. if s.c.FeatureGate.SyncRange {
  78. syncrange()
  79. }
  80. // if s.c.FeatureGate.Initial {
  81. // initial()
  82. // }
  83. }