service.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package service
  2. import (
  3. "context"
  4. "crypto/aes"
  5. "crypto/cipher"
  6. "sync"
  7. "go-common/app/job/main/passport/conf"
  8. "go-common/app/job/main/passport/dao"
  9. igrpc "go-common/app/service/main/identify-game/rpc/client"
  10. "go-common/library/queue/databus"
  11. )
  12. const (
  13. _gameAppID = int32(876)
  14. _telBindTable = "aso_telephone_bind_log"
  15. _emailBindTable = "aso_email_bind_log"
  16. )
  17. // Service struct of service.
  18. type Service struct {
  19. c *conf.Config
  20. d *dao.Dao
  21. // RPC
  22. igRPC *igrpc.Client
  23. // game app ids
  24. gameAppIDs []int32
  25. // token proc
  26. dsToken *databus.Databus
  27. tokenMergeChans []chan *message
  28. tokenDoneChan chan []*message
  29. head, last *message
  30. mu sync.Mutex
  31. // user proc
  32. dsUser *databus.Databus
  33. userMergeChans []chan *message
  34. userDoneChan chan []*message
  35. userHead, userLast *message
  36. userMu sync.Mutex
  37. // log proc
  38. dsLog *databus.Databus
  39. logMergeChans []chan *message
  40. logDoneChan chan []*message
  41. logHead, logLast *message
  42. logMu sync.Mutex
  43. // pwd log proc
  44. dsPwdLog *databus.Databus
  45. pwdLogMergeChans []chan *message
  46. pwdLogDoneChan chan []*message
  47. pwdLogHead, pwdLogLast *message
  48. pwdLogMu sync.Mutex
  49. // auth bin log proc
  50. authBinLog *databus.Databus
  51. authBinLogMergeChans []chan *message
  52. authBinLogDoneChan chan []*message
  53. authBinLogHead, authBinLogLast *message
  54. authBinLogMu sync.Mutex
  55. dsContactBindLog *databus.Databus
  56. contactBindLogMergeChans []chan *message
  57. contactBindLogDoneChan chan []*message
  58. contactBindLogHead, contactBindLogLast *message
  59. contactBindLogMu sync.Mutex
  60. userLogPub *databus.Databus
  61. AESBlock cipher.Block
  62. hashSalt []byte
  63. }
  64. type message struct {
  65. next *message
  66. data *databus.Message
  67. object interface{}
  68. done bool
  69. }
  70. // New create service instance and return.
  71. func New(c *conf.Config) (s *Service) {
  72. gameAppIDs := make([]int32, 0)
  73. gameAppIDs = append(gameAppIDs, _gameAppID)
  74. for _, id := range c.Game.AppIDs {
  75. if id == _gameAppID {
  76. continue
  77. }
  78. gameAppIDs = append(gameAppIDs, id)
  79. }
  80. s = &Service{
  81. c: c,
  82. d: dao.New(c),
  83. gameAppIDs: gameAppIDs,
  84. // RPC
  85. igRPC: igrpc.New(c.RPC.IdentifyGame),
  86. // token
  87. dsToken: databus.New(c.DataBus.AsoBinLog),
  88. tokenMergeChans: make([]chan *message, c.Group.AsoBinLog.Num),
  89. tokenDoneChan: make(chan []*message, c.Group.AsoBinLog.Chan),
  90. // user
  91. dsUser: databus.New(c.DataBus.User),
  92. userMergeChans: make([]chan *message, c.Group.User.Num),
  93. userDoneChan: make(chan []*message, c.Group.User.Chan),
  94. // log
  95. dsLog: databus.New(c.DataBus.Log),
  96. logMergeChans: make([]chan *message, c.Group.Log.Num),
  97. logDoneChan: make(chan []*message, c.Group.Log.Chan),
  98. // log
  99. dsPwdLog: databus.New(c.DataBus.PwdLog),
  100. pwdLogMergeChans: make([]chan *message, c.Group.PwdLog.Num),
  101. pwdLogDoneChan: make(chan []*message, c.Group.PwdLog.Chan),
  102. // emial and tel log
  103. dsContactBindLog: databus.New(c.DataBus.ContactBindLog),
  104. contactBindLogMergeChans: make([]chan *message, c.Group.ContactBindLog.Num),
  105. contactBindLogDoneChan: make(chan []*message, c.Group.ContactBindLog.Chan),
  106. userLogPub: databus.New(c.DataBus.UserLog),
  107. // auth bin log
  108. authBinLog: databus.New(c.DataBus.AuthBinLog),
  109. authBinLogMergeChans: make([]chan *message, c.Group.AuthBinLog.Num),
  110. authBinLogDoneChan: make(chan []*message, c.Group.AuthBinLog.Chan),
  111. hashSalt: []byte(c.Encode.Salt),
  112. }
  113. s.AESBlock, _ = aes.NewCipher([]byte(c.Encode.AesKey))
  114. // start token proc
  115. go s.tokencommitproc()
  116. for i := 0; i < c.Group.AsoBinLog.Num; i++ {
  117. ch := make(chan *message, c.Group.AsoBinLog.Chan)
  118. s.tokenMergeChans[i] = ch
  119. go s.tokenmergeproc(ch)
  120. }
  121. go s.tokenconsumeproc()
  122. // start user proc
  123. go s.usercommitproc()
  124. for i := 0; i < c.Group.User.Num; i++ {
  125. ch := make(chan *message, c.Group.User.Chan)
  126. s.userMergeChans[i] = ch
  127. go s.usermergeproc(ch)
  128. }
  129. go s.userconsumeproc()
  130. // start log proc
  131. go s.logcommitproc()
  132. for i := 0; i < c.Group.Log.Num; i++ {
  133. ch := make(chan *message, c.Group.Log.Chan)
  134. s.logMergeChans[i] = ch
  135. go s.logmergeproc(ch)
  136. }
  137. go s.logconsumeproc()
  138. // start pwd log proc
  139. go s.pwdlogcommitproc()
  140. for i := 0; i < c.Group.PwdLog.Num; i++ {
  141. ch := make(chan *message, c.Group.PwdLog.Chan)
  142. s.pwdLogMergeChans[i] = ch
  143. go s.pwdlogmergeproc(ch)
  144. }
  145. go s.pwdlogconsumeproc()
  146. go s.contactBindLogcommitproc()
  147. for i := 0; i < c.Group.ContactBindLog.Num; i++ {
  148. ch := make(chan *message, c.Group.ContactBindLog.Chan)
  149. s.contactBindLogMergeChans[i] = ch
  150. go s.contactBindLogMergeproc(ch)
  151. }
  152. go s.contactBindLogconsumeproc()
  153. // start auth bin log token proc
  154. go s.authBinLogcommitproc()
  155. for i := 0; i < c.Group.AuthBinLog.Num; i++ {
  156. ch := make(chan *message, c.Group.AuthBinLog.Chan)
  157. s.authBinLogMergeChans[i] = ch
  158. go s.authBinLogmergeproc(ch)
  159. }
  160. go s.authBinLogconsumeproc()
  161. // end auth bin log token proc
  162. go s.syncPwdLog()
  163. return
  164. }
  165. // Ping ping check service health.
  166. func (s *Service) Ping(c context.Context) (err error) {
  167. err = s.d.Ping(c)
  168. return
  169. }
  170. // Close close service.
  171. func (s *Service) Close() (err error) {
  172. if s.dsToken != nil {
  173. s.dsToken.Close()
  174. }
  175. if s.dsUser != nil {
  176. s.dsUser.Close()
  177. }
  178. if s.dsLog != nil {
  179. s.dsLog.Close()
  180. }
  181. if s.d != nil {
  182. s.d.Close()
  183. }
  184. if s.dsContactBindLog != nil {
  185. s.dsContactBindLog.Close()
  186. }
  187. if s.userLogPub != nil {
  188. s.userLogPub.Close()
  189. }
  190. if s.dsPwdLog != nil {
  191. s.dsPwdLog.Close()
  192. }
  193. return
  194. }