123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- package service
- import (
- "context"
- "crypto/aes"
- "crypto/cipher"
- "sync"
- "go-common/app/job/main/passport/conf"
- "go-common/app/job/main/passport/dao"
- igrpc "go-common/app/service/main/identify-game/rpc/client"
- "go-common/library/queue/databus"
- )
- const (
- _gameAppID = int32(876)
- _telBindTable = "aso_telephone_bind_log"
- _emailBindTable = "aso_email_bind_log"
- )
- // Service struct of service.
- type Service struct {
- c *conf.Config
- d *dao.Dao
- // RPC
- igRPC *igrpc.Client
- // game app ids
- gameAppIDs []int32
- // token proc
- dsToken *databus.Databus
- tokenMergeChans []chan *message
- tokenDoneChan chan []*message
- head, last *message
- mu sync.Mutex
- // user proc
- dsUser *databus.Databus
- userMergeChans []chan *message
- userDoneChan chan []*message
- userHead, userLast *message
- userMu sync.Mutex
- // log proc
- dsLog *databus.Databus
- logMergeChans []chan *message
- logDoneChan chan []*message
- logHead, logLast *message
- logMu sync.Mutex
- // pwd log proc
- dsPwdLog *databus.Databus
- pwdLogMergeChans []chan *message
- pwdLogDoneChan chan []*message
- pwdLogHead, pwdLogLast *message
- pwdLogMu sync.Mutex
- // auth bin log proc
- authBinLog *databus.Databus
- authBinLogMergeChans []chan *message
- authBinLogDoneChan chan []*message
- authBinLogHead, authBinLogLast *message
- authBinLogMu sync.Mutex
- dsContactBindLog *databus.Databus
- contactBindLogMergeChans []chan *message
- contactBindLogDoneChan chan []*message
- contactBindLogHead, contactBindLogLast *message
- contactBindLogMu sync.Mutex
- userLogPub *databus.Databus
- AESBlock cipher.Block
- hashSalt []byte
- }
- type message struct {
- next *message
- data *databus.Message
- object interface{}
- done bool
- }
- // New create service instance and return.
- func New(c *conf.Config) (s *Service) {
- gameAppIDs := make([]int32, 0)
- gameAppIDs = append(gameAppIDs, _gameAppID)
- for _, id := range c.Game.AppIDs {
- if id == _gameAppID {
- continue
- }
- gameAppIDs = append(gameAppIDs, id)
- }
- s = &Service{
- c: c,
- d: dao.New(c),
- gameAppIDs: gameAppIDs,
- // RPC
- igRPC: igrpc.New(c.RPC.IdentifyGame),
- // token
- dsToken: databus.New(c.DataBus.AsoBinLog),
- tokenMergeChans: make([]chan *message, c.Group.AsoBinLog.Num),
- tokenDoneChan: make(chan []*message, c.Group.AsoBinLog.Chan),
- // user
- dsUser: databus.New(c.DataBus.User),
- userMergeChans: make([]chan *message, c.Group.User.Num),
- userDoneChan: make(chan []*message, c.Group.User.Chan),
- // log
- dsLog: databus.New(c.DataBus.Log),
- logMergeChans: make([]chan *message, c.Group.Log.Num),
- logDoneChan: make(chan []*message, c.Group.Log.Chan),
- // log
- dsPwdLog: databus.New(c.DataBus.PwdLog),
- pwdLogMergeChans: make([]chan *message, c.Group.PwdLog.Num),
- pwdLogDoneChan: make(chan []*message, c.Group.PwdLog.Chan),
- // emial and tel log
- dsContactBindLog: databus.New(c.DataBus.ContactBindLog),
- contactBindLogMergeChans: make([]chan *message, c.Group.ContactBindLog.Num),
- contactBindLogDoneChan: make(chan []*message, c.Group.ContactBindLog.Chan),
- userLogPub: databus.New(c.DataBus.UserLog),
- // auth bin log
- authBinLog: databus.New(c.DataBus.AuthBinLog),
- authBinLogMergeChans: make([]chan *message, c.Group.AuthBinLog.Num),
- authBinLogDoneChan: make(chan []*message, c.Group.AuthBinLog.Chan),
- hashSalt: []byte(c.Encode.Salt),
- }
- s.AESBlock, _ = aes.NewCipher([]byte(c.Encode.AesKey))
- // start token proc
- go s.tokencommitproc()
- for i := 0; i < c.Group.AsoBinLog.Num; i++ {
- ch := make(chan *message, c.Group.AsoBinLog.Chan)
- s.tokenMergeChans[i] = ch
- go s.tokenmergeproc(ch)
- }
- go s.tokenconsumeproc()
- // start user proc
- go s.usercommitproc()
- for i := 0; i < c.Group.User.Num; i++ {
- ch := make(chan *message, c.Group.User.Chan)
- s.userMergeChans[i] = ch
- go s.usermergeproc(ch)
- }
- go s.userconsumeproc()
- // start log proc
- go s.logcommitproc()
- for i := 0; i < c.Group.Log.Num; i++ {
- ch := make(chan *message, c.Group.Log.Chan)
- s.logMergeChans[i] = ch
- go s.logmergeproc(ch)
- }
- go s.logconsumeproc()
- // start pwd log proc
- go s.pwdlogcommitproc()
- for i := 0; i < c.Group.PwdLog.Num; i++ {
- ch := make(chan *message, c.Group.PwdLog.Chan)
- s.pwdLogMergeChans[i] = ch
- go s.pwdlogmergeproc(ch)
- }
- go s.pwdlogconsumeproc()
- go s.contactBindLogcommitproc()
- for i := 0; i < c.Group.ContactBindLog.Num; i++ {
- ch := make(chan *message, c.Group.ContactBindLog.Chan)
- s.contactBindLogMergeChans[i] = ch
- go s.contactBindLogMergeproc(ch)
- }
- go s.contactBindLogconsumeproc()
- // start auth bin log token proc
- go s.authBinLogcommitproc()
- for i := 0; i < c.Group.AuthBinLog.Num; i++ {
- ch := make(chan *message, c.Group.AuthBinLog.Chan)
- s.authBinLogMergeChans[i] = ch
- go s.authBinLogmergeproc(ch)
- }
- go s.authBinLogconsumeproc()
- // end auth bin log token proc
- go s.syncPwdLog()
- return
- }
- // Ping ping check service health.
- func (s *Service) Ping(c context.Context) (err error) {
- err = s.d.Ping(c)
- return
- }
- // Close close service.
- func (s *Service) Close() (err error) {
- if s.dsToken != nil {
- s.dsToken.Close()
- }
- if s.dsUser != nil {
- s.dsUser.Close()
- }
- if s.dsLog != nil {
- s.dsLog.Close()
- }
- if s.d != nil {
- s.d.Close()
- }
- if s.dsContactBindLog != nil {
- s.dsContactBindLog.Close()
- }
- if s.userLogPub != nil {
- s.userLogPub.Close()
- }
- if s.dsPwdLog != nil {
- s.dsPwdLog.Close()
- }
- return
- }
|