123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- package service
- import (
- "context"
- "go-common/app/job/main/passport-user/conf"
- "go-common/app/job/main/passport-user/dao"
- "go-common/app/job/main/passport-user/model"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "go-common/library/queue/databus/databusutil"
- )
- const (
- _asoAccountTable = "aso_account"
- _asoAccountInfoTable = "aso_account_info"
- _asoAccountSnsTable = "aso_account_sns"
- _asoAccountRegOriginTable = "aso_account_reg_origin"
- _insertAction = "insert"
- _updateAction = "update"
- _deleteAction = "delete"
- _asoAccountInfoSharding = 30
- _asoAccountRegOriginSharding = 20
- _mySQLErrCodeDuplicateEntry = 1062
- _retry = 3
- )
- // Service service.
- type Service struct {
- c *conf.Config
- d *dao.Dao
- // aso binlog consumer
- asoBinLogConsumer *databus.Databus
- group *databusutil.Group
- // fullSync chan
- asoAccountChan []chan *model.OriginAccount
- asoAccountInfoChan []chan *model.OriginAccountInfo
- asoAccountRegChan []chan *model.OriginAccountReg
- asoAccountSnsChan []chan *model.OriginAccountSns
- asoTelBindLogChan []chan *model.UserTel
- countryMap map[int64]string
- aesKey []byte
- salt []byte
- //cron *cron.Cron
- ch chan func()
- }
- // New new a service instance.
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- c: c,
- d: dao.New(c),
- asoAccountChan: make([]chan *model.OriginAccount, c.FullSync.AsoAccount.ChanNum),
- asoAccountInfoChan: make([]chan *model.OriginAccountInfo, c.FullSync.AsoAccountInfo.ChanNum),
- asoAccountRegChan: make([]chan *model.OriginAccountReg, c.FullSync.AsoAccountReg.ChanNum),
- asoAccountSnsChan: make([]chan *model.OriginAccountSns, c.FullSync.AsoAccountSns.ChanNum),
- asoTelBindLogChan: make([]chan *model.UserTel, c.FullSync.AsoTelBindLog.ChanNum),
- ch: make(chan func(), 1024000),
- }
- go s.cacheproc()
- if c.FullSync.AsoCountryCodeSwitch {
- err := s.syncAsoCountryCode()
- if err != nil {
- log.Error("fail to sync AsoCountryCode")
- panic(err)
- }
- }
- aesKey, err := s.d.AesKey(context.Background())
- if err != nil || aesKey == "" {
- log.Error("fail to get rsaKey")
- panic(err)
- }
- s.aesKey = []byte(aesKey)
- salt, err := s.d.Salt(context.Background())
- if err != nil || salt == "" {
- log.Error("fail to get salt")
- panic(err)
- }
- s.salt = []byte(salt)
- s.countryMap, err = s.d.CountryCodeMap(context.Background())
- if err != nil || len(s.countryMap) == 0 {
- log.Error("fail to get country map")
- panic(err)
- }
- if c.FullSync.AsoAccount.Switch {
- for i := 0; i < c.FullSync.AsoAccount.ChanNum; i++ {
- ch := make(chan *model.OriginAccount, c.FullSync.AsoAccount.ChanSize)
- s.asoAccountChan[i] = ch
- go s.asoAccountConsume(ch)
- }
- go s.getAsoAccount(c.FullSync.AsoAccount.Start, c.FullSync.AsoAccount.End, c.FullSync.AsoAccount.Count)
- }
- if c.FullSync.AsoAccountInfo.Switch {
- for i := 0; i < c.FullSync.AsoAccountInfo.ChanNum; i++ {
- ch := make(chan *model.OriginAccountInfo, c.FullSync.AsoAccountInfo.ChanSize)
- s.asoAccountInfoChan[i] = ch
- go s.asoAccountInfoConsume(ch)
- }
- go s.getAsoAccountInfo(c.FullSync.AsoAccountInfo.Start, c.FullSync.AsoAccountInfo.End, c.FullSync.AsoAccountInfo.Count)
- }
- if c.FullSync.AsoAccountReg.Switch {
- for i := 0; i < c.FullSync.AsoAccountReg.ChanNum; i++ {
- ch := make(chan *model.OriginAccountReg, c.FullSync.AsoAccountReg.ChanSize)
- s.asoAccountRegChan[i] = ch
- go s.asoAccountRegConsume(ch)
- }
- go s.getAsoAccountReg(c.FullSync.AsoAccountReg.Start, c.FullSync.AsoAccountReg.End, c.FullSync.AsoAccountReg.Count)
- }
- if c.FullSync.AsoAccountSns.Switch {
- for i := 0; i < c.FullSync.AsoAccountSns.ChanNum; i++ {
- ch := make(chan *model.OriginAccountSns, c.FullSync.AsoAccountSns.ChanSize)
- s.asoAccountSnsChan[i] = ch
- go s.asoAccountSnsConsume(ch)
- }
- go s.getAsoAccountSns(c.FullSync.AsoAccountSns.Start, c.FullSync.AsoAccountSns.End, c.FullSync.AsoAccountSns.Count)
- }
- if c.FullSync.AsoTelBindLog.Switch {
- for i := 0; i < c.FullSync.AsoTelBindLog.ChanNum; i++ {
- ch := make(chan *model.UserTel, c.FullSync.AsoTelBindLog.ChanSize)
- s.asoTelBindLogChan[i] = ch
- go s.asoTelBindLogConsume(ch)
- }
- go s.getAsoTelBindLog(c.FullSync.AsoTelBindLog.Start, c.FullSync.AsoTelBindLog.End, c.FullSync.AsoTelBindLog.Count)
- }
- if c.IncSync.Switch {
- s.asoBinLogConsumer = databus.New(c.DataBus.AsoBinLogSub)
- s.group = databusutil.NewGroup(
- c.DatabusUtil,
- s.asoBinLogConsumer.Messages(),
- )
- s.consumeproc()
- }
- //if c.Scheduler.Switch {
- // s.cron = cron.New()
- // if err := s.cron.AddFunc(c.Scheduler.EmailDuplicateCron, s.checkEmailDuplicateJob); err != nil {
- // panic(err)
- // }
- // if err := s.cron.AddFunc(c.Scheduler.TelDuplicateCron, s.checkTelDuplicateJob); err != nil {
- // panic(err)
- // }
- // s.cron.Start()
- //}
- return
- }
- // Ping check server ok.
- func (s *Service) Ping(c context.Context) (err error) {
- return s.d.Ping(c)
- }
- // Close close service, including databus and outer service.
- func (s *Service) Close() (err error) {
- if err = s.group.Close(); err != nil {
- log.Error("s.group.Close() error(%v)", err)
- }
- s.d.Close()
- return
- }
- func (s *Service) addCache(f func()) {
- select {
- case s.ch <- f:
- default:
- log.Warn("cacheproc chan full")
- }
- }
- // cacheproc is a routine for executing closure.
- func (s *Service) cacheproc() {
- for {
- f := <-s.ch
- f()
- }
- }
|