service.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/passport-user/conf"
  5. "go-common/app/job/main/passport-user/dao"
  6. "go-common/app/job/main/passport-user/model"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. "go-common/library/queue/databus/databusutil"
  10. )
  11. const (
  12. _asoAccountTable = "aso_account"
  13. _asoAccountInfoTable = "aso_account_info"
  14. _asoAccountSnsTable = "aso_account_sns"
  15. _asoAccountRegOriginTable = "aso_account_reg_origin"
  16. _insertAction = "insert"
  17. _updateAction = "update"
  18. _deleteAction = "delete"
  19. _asoAccountInfoSharding = 30
  20. _asoAccountRegOriginSharding = 20
  21. _mySQLErrCodeDuplicateEntry = 1062
  22. _retry = 3
  23. )
  24. // Service service.
  25. type Service struct {
  26. c *conf.Config
  27. d *dao.Dao
  28. // aso binlog consumer
  29. asoBinLogConsumer *databus.Databus
  30. group *databusutil.Group
  31. // fullSync chan
  32. asoAccountChan []chan *model.OriginAccount
  33. asoAccountInfoChan []chan *model.OriginAccountInfo
  34. asoAccountRegChan []chan *model.OriginAccountReg
  35. asoAccountSnsChan []chan *model.OriginAccountSns
  36. asoTelBindLogChan []chan *model.UserTel
  37. countryMap map[int64]string
  38. aesKey []byte
  39. salt []byte
  40. //cron *cron.Cron
  41. ch chan func()
  42. }
  43. // New new a service instance.
  44. func New(c *conf.Config) (s *Service) {
  45. s = &Service{
  46. c: c,
  47. d: dao.New(c),
  48. asoAccountChan: make([]chan *model.OriginAccount, c.FullSync.AsoAccount.ChanNum),
  49. asoAccountInfoChan: make([]chan *model.OriginAccountInfo, c.FullSync.AsoAccountInfo.ChanNum),
  50. asoAccountRegChan: make([]chan *model.OriginAccountReg, c.FullSync.AsoAccountReg.ChanNum),
  51. asoAccountSnsChan: make([]chan *model.OriginAccountSns, c.FullSync.AsoAccountSns.ChanNum),
  52. asoTelBindLogChan: make([]chan *model.UserTel, c.FullSync.AsoTelBindLog.ChanNum),
  53. ch: make(chan func(), 1024000),
  54. }
  55. go s.cacheproc()
  56. if c.FullSync.AsoCountryCodeSwitch {
  57. err := s.syncAsoCountryCode()
  58. if err != nil {
  59. log.Error("fail to sync AsoCountryCode")
  60. panic(err)
  61. }
  62. }
  63. aesKey, err := s.d.AesKey(context.Background())
  64. if err != nil || aesKey == "" {
  65. log.Error("fail to get rsaKey")
  66. panic(err)
  67. }
  68. s.aesKey = []byte(aesKey)
  69. salt, err := s.d.Salt(context.Background())
  70. if err != nil || salt == "" {
  71. log.Error("fail to get salt")
  72. panic(err)
  73. }
  74. s.salt = []byte(salt)
  75. s.countryMap, err = s.d.CountryCodeMap(context.Background())
  76. if err != nil || len(s.countryMap) == 0 {
  77. log.Error("fail to get country map")
  78. panic(err)
  79. }
  80. if c.FullSync.AsoAccount.Switch {
  81. for i := 0; i < c.FullSync.AsoAccount.ChanNum; i++ {
  82. ch := make(chan *model.OriginAccount, c.FullSync.AsoAccount.ChanSize)
  83. s.asoAccountChan[i] = ch
  84. go s.asoAccountConsume(ch)
  85. }
  86. go s.getAsoAccount(c.FullSync.AsoAccount.Start, c.FullSync.AsoAccount.End, c.FullSync.AsoAccount.Count)
  87. }
  88. if c.FullSync.AsoAccountInfo.Switch {
  89. for i := 0; i < c.FullSync.AsoAccountInfo.ChanNum; i++ {
  90. ch := make(chan *model.OriginAccountInfo, c.FullSync.AsoAccountInfo.ChanSize)
  91. s.asoAccountInfoChan[i] = ch
  92. go s.asoAccountInfoConsume(ch)
  93. }
  94. go s.getAsoAccountInfo(c.FullSync.AsoAccountInfo.Start, c.FullSync.AsoAccountInfo.End, c.FullSync.AsoAccountInfo.Count)
  95. }
  96. if c.FullSync.AsoAccountReg.Switch {
  97. for i := 0; i < c.FullSync.AsoAccountReg.ChanNum; i++ {
  98. ch := make(chan *model.OriginAccountReg, c.FullSync.AsoAccountReg.ChanSize)
  99. s.asoAccountRegChan[i] = ch
  100. go s.asoAccountRegConsume(ch)
  101. }
  102. go s.getAsoAccountReg(c.FullSync.AsoAccountReg.Start, c.FullSync.AsoAccountReg.End, c.FullSync.AsoAccountReg.Count)
  103. }
  104. if c.FullSync.AsoAccountSns.Switch {
  105. for i := 0; i < c.FullSync.AsoAccountSns.ChanNum; i++ {
  106. ch := make(chan *model.OriginAccountSns, c.FullSync.AsoAccountSns.ChanSize)
  107. s.asoAccountSnsChan[i] = ch
  108. go s.asoAccountSnsConsume(ch)
  109. }
  110. go s.getAsoAccountSns(c.FullSync.AsoAccountSns.Start, c.FullSync.AsoAccountSns.End, c.FullSync.AsoAccountSns.Count)
  111. }
  112. if c.FullSync.AsoTelBindLog.Switch {
  113. for i := 0; i < c.FullSync.AsoTelBindLog.ChanNum; i++ {
  114. ch := make(chan *model.UserTel, c.FullSync.AsoTelBindLog.ChanSize)
  115. s.asoTelBindLogChan[i] = ch
  116. go s.asoTelBindLogConsume(ch)
  117. }
  118. go s.getAsoTelBindLog(c.FullSync.AsoTelBindLog.Start, c.FullSync.AsoTelBindLog.End, c.FullSync.AsoTelBindLog.Count)
  119. }
  120. if c.IncSync.Switch {
  121. s.asoBinLogConsumer = databus.New(c.DataBus.AsoBinLogSub)
  122. s.group = databusutil.NewGroup(
  123. c.DatabusUtil,
  124. s.asoBinLogConsumer.Messages(),
  125. )
  126. s.consumeproc()
  127. }
  128. //if c.Scheduler.Switch {
  129. // s.cron = cron.New()
  130. // if err := s.cron.AddFunc(c.Scheduler.EmailDuplicateCron, s.checkEmailDuplicateJob); err != nil {
  131. // panic(err)
  132. // }
  133. // if err := s.cron.AddFunc(c.Scheduler.TelDuplicateCron, s.checkTelDuplicateJob); err != nil {
  134. // panic(err)
  135. // }
  136. // s.cron.Start()
  137. //}
  138. return
  139. }
  140. // Ping check server ok.
  141. func (s *Service) Ping(c context.Context) (err error) {
  142. return s.d.Ping(c)
  143. }
  144. // Close close service, including databus and outer service.
  145. func (s *Service) Close() (err error) {
  146. if err = s.group.Close(); err != nil {
  147. log.Error("s.group.Close() error(%v)", err)
  148. }
  149. s.d.Close()
  150. return
  151. }
  152. func (s *Service) addCache(f func()) {
  153. select {
  154. case s.ch <- f:
  155. default:
  156. log.Warn("cacheproc chan full")
  157. }
  158. }
  159. // cacheproc is a routine for executing closure.
  160. func (s *Service) cacheproc() {
  161. for {
  162. f := <-s.ch
  163. f()
  164. }
  165. }