clean_token.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "go-common/app/job/main/passport/model"
  9. igmdl "go-common/app/service/main/identify-game/model"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. const (
  14. _changePwd = "changePwd"
  15. _retryCount = 3
  16. _retryDuration = time.Second
  17. )
  18. func (s *Service) tokenconsumeproc() {
  19. mergeNum := s.c.Group.AsoBinLog.Num
  20. var (
  21. err error
  22. n int
  23. msgs = s.dsToken.Messages()
  24. )
  25. for {
  26. msg, ok := <-msgs
  27. if !ok {
  28. log.Error("s.tokenconsumeproc closed")
  29. return
  30. }
  31. // marked head to first commit
  32. m := &message{data: msg}
  33. if n, err = strconv.Atoi(msg.Key); err != nil {
  34. log.Error("strconv.Atoi(%s) error(%v)", msg.Key, err)
  35. continue
  36. }
  37. s.mu.Lock()
  38. if s.head == nil {
  39. s.head = m
  40. s.last = m
  41. } else {
  42. s.last.next = m
  43. s.last = m
  44. }
  45. s.mu.Unlock()
  46. // use specify goroutine to merge messages
  47. s.tokenMergeChans[n%mergeNum] <- m
  48. log.Info("tokenconsumeproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  49. }
  50. }
  51. func (s *Service) tokencommitproc() {
  52. commits := make(map[int32]*databus.Message, s.c.Group.AsoBinLog.Size)
  53. for {
  54. done := <-s.tokenDoneChan
  55. // merge partitions to commit offset
  56. for _, d := range done {
  57. d.done = true
  58. }
  59. s.mu.Lock()
  60. for ; s.head != nil && s.head.done; s.head = s.head.next {
  61. commits[s.head.data.Partition] = s.head.data
  62. }
  63. s.mu.Unlock()
  64. for k, m := range commits {
  65. log.Info("tokencommitproc committed, key:%s partition:%d offset:%d", m.Key, m.Partition, m.Offset)
  66. m.Commit()
  67. delete(commits, k)
  68. }
  69. }
  70. }
  71. func (s *Service) tokenmergeproc(c chan *message) {
  72. var (
  73. err error
  74. max = s.c.Group.AsoBinLog.Size
  75. merges = make([]*model.AccessInfo, 0, max)
  76. marked = make([]*message, 0, max)
  77. ticker = time.NewTicker(time.Duration(s.c.Group.AsoBinLog.Ticker))
  78. )
  79. for {
  80. select {
  81. case msg, ok := <-c:
  82. if !ok {
  83. log.Error("s.tokenmergeproc closed")
  84. return
  85. }
  86. bmsg := &model.BMsg{}
  87. if err = json.Unmarshal(msg.data.Value, bmsg); err != nil {
  88. log.Error("json.Unmarshal(%s) error(%v)", string(msg.data.Value), err)
  89. continue
  90. }
  91. if bmsg.Action == "delete" && strings.HasPrefix(bmsg.Table, "aso_app_perm") {
  92. t := &model.AccessInfo{}
  93. if err = json.Unmarshal(bmsg.New, t); err != nil {
  94. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  95. continue
  96. }
  97. merges = append(merges, t)
  98. }
  99. marked = append(marked, msg)
  100. if len(marked) < max && len(merges) < max {
  101. continue
  102. }
  103. case <-ticker.C:
  104. }
  105. if len(merges) > 0 {
  106. s.cleanTokens(merges)
  107. merges = make([]*model.AccessInfo, 0, max)
  108. }
  109. if len(marked) > 0 {
  110. s.tokenDoneChan <- marked
  111. marked = make([]*message, 0, max)
  112. }
  113. }
  114. }
  115. // cleanTokens clean tokens.
  116. func (s *Service) cleanTokens(tokens []*model.AccessInfo) {
  117. for _, token := range tokens {
  118. s.cleanToken(token)
  119. }
  120. }
  121. // cleanToken to notify other clean access token.
  122. func (s *Service) cleanToken(token *model.AccessInfo) (err error) {
  123. if token == nil || token.Expires < time.Now().Unix() {
  124. return
  125. }
  126. isGame := false
  127. for _, id := range s.gameAppIDs {
  128. if id == token.AppID {
  129. isGame = true
  130. break
  131. }
  132. }
  133. if !isGame {
  134. return
  135. }
  136. for {
  137. if err = s.d.DelCache(context.TODO(), token.Token); err == nil {
  138. break
  139. }
  140. time.Sleep(_retryDuration)
  141. }
  142. for i := 0; i < _retryCount; i++ {
  143. arg := &igmdl.CleanCacheArgs{
  144. Token: token.Token,
  145. Mid: token.Mid,
  146. }
  147. if err = s.igRPC.DelCache(context.TODO(), arg); err == nil {
  148. break
  149. }
  150. log.Error("service.identifyGameRPC.DelCache(%+v) error(%v)", arg, err)
  151. time.Sleep(_retryDuration)
  152. }
  153. for i := 0; i < _retryCount; i++ {
  154. if err = s.d.NotifyGame(token, _changePwd); err == nil {
  155. return
  156. }
  157. time.Sleep(_retryDuration)
  158. }
  159. log.Error("notify err, token(%+v)", token)
  160. return
  161. }