service.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/job/main/passport-game-cloud/conf"
  7. "go-common/app/job/main/passport-game-cloud/dao"
  8. "go-common/library/queue/databus"
  9. )
  10. const (
  11. // cache retry count and duration
  12. _accountCacheRetryCount = 3
  13. _accountCacheRetryDuration = time.Second
  14. // table and duration
  15. _asoAccountTable = "aso_account"
  16. _memberTablePrefix = "dede_member"
  17. _changePwd = "changePwd"
  18. _updateUserInfo = "updateUserInfo"
  19. _gameAppID = int32(876)
  20. )
  21. // Service service.
  22. type Service struct {
  23. c *conf.Config
  24. d *dao.Dao
  25. missch chan func()
  26. // prom
  27. tokenInterval *Interval
  28. memberInterval *Interval
  29. asoAccountInterval *Interval
  30. transInterval *Interval
  31. gameAppIDs []int32
  32. // bin log proc
  33. binLogDataBus *databus.Databus
  34. binLogMergeChans []chan *message
  35. binLogDoneChan chan []*message
  36. head, last *message
  37. mu sync.Mutex
  38. // aso acc proc
  39. encryptTransDataBus *databus.Databus
  40. encryptTransMergeChans []chan *message
  41. encryptTransDoneChan chan []*message
  42. asoHead, asoLast *message
  43. asoMu sync.Mutex
  44. }
  45. type message struct {
  46. next *message
  47. data *databus.Message
  48. object interface{}
  49. done bool
  50. }
  51. // New new a service instance.
  52. func New(c *conf.Config) (s *Service) {
  53. gameAppIDs := make([]int32, 0)
  54. gameAppIDs = append(gameAppIDs, _gameAppID)
  55. for _, id := range c.Game.AppIDs {
  56. if id == _gameAppID {
  57. continue
  58. }
  59. gameAppIDs = append(gameAppIDs, id)
  60. }
  61. s = &Service{
  62. c: c,
  63. d: dao.New(c),
  64. missch: make(chan func(), 10240),
  65. // prom
  66. tokenInterval: NewInterval(&IntervalConfig{
  67. Name: "interval_token",
  68. Rate: 1000,
  69. }),
  70. memberInterval: NewInterval(&IntervalConfig{
  71. Name: "interval_member",
  72. Rate: 1000,
  73. }),
  74. asoAccountInterval: NewInterval(&IntervalConfig{
  75. Name: "interval_aso_account",
  76. Rate: 1000,
  77. }),
  78. transInterval: NewInterval(&IntervalConfig{
  79. Name: "interval_trans",
  80. Rate: 1000,
  81. }),
  82. gameAppIDs: gameAppIDs,
  83. binLogDataBus: databus.New(c.DataBus.BinLogSub),
  84. binLogMergeChans: make([]chan *message, c.Group.BinLog.Num),
  85. binLogDoneChan: make(chan []*message, c.Group.BinLog.Chan),
  86. // aso acc proc
  87. encryptTransDataBus: databus.New(c.DataBus.EncryptTransSub),
  88. encryptTransMergeChans: make([]chan *message, c.Group.EncryptTrans.Num),
  89. encryptTransDoneChan: make(chan []*message, c.Group.EncryptTrans.Chan),
  90. }
  91. // start bin log proc
  92. go s.binlogcommitproc()
  93. for i := 0; i < c.Group.BinLog.Num; i++ {
  94. ch := make(chan *message, c.Group.BinLog.Chan)
  95. s.binLogMergeChans[i] = ch
  96. go s.binlogmergeproc(ch)
  97. }
  98. go s.binlogconsumeproc()
  99. // start encrypt trans proc
  100. go s.encrypttranscommitproc()
  101. for i := 0; i < c.Group.EncryptTrans.Num; i++ {
  102. ch := make(chan *message, c.Group.EncryptTrans.Chan)
  103. s.encryptTransMergeChans[i] = ch
  104. go s.encrypttransmergeproc(ch)
  105. }
  106. go s.encrypttransconsumeproc()
  107. // go s.cacheproc()
  108. return
  109. }
  110. //func (s *Service) addCache(f func()) {
  111. // select {
  112. // case s.missch <- f:
  113. // default:
  114. // log.Warn("cache chan full")
  115. // }
  116. //}
  117. // cacheproc is a routine for executing closure.
  118. //func (s *Service) cacheproc() {
  119. // for {
  120. // f := <-s.missch
  121. // f()
  122. // }
  123. //}
  124. // Ping check server ok.
  125. func (s *Service) Ping(c context.Context) (err error) {
  126. err = s.d.Ping(c)
  127. return
  128. }
  129. // Close close service, including closing dao.
  130. func (s *Service) Close() (err error) {
  131. s.binLogDataBus.Close()
  132. s.encryptTransDataBus.Close()
  133. s.d.Close()
  134. return
  135. }