set_token.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "time"
  7. "go-common/app/job/main/passport/model"
  8. "go-common/library/log"
  9. "go-common/library/queue/databus"
  10. )
  11. func (s *Service) userconsumeproc() {
  12. mergeRoutineNum := int64(s.c.Group.User.Num)
  13. msgs := s.dsUser.Messages()
  14. for {
  15. msg, ok := <-msgs
  16. if !ok {
  17. log.Error("s.userconsumeproc closed")
  18. return
  19. }
  20. // marked head to first commit
  21. m := &message{data: msg}
  22. p := new(model.PMsg)
  23. if err := json.Unmarshal(msg.Value, p); err != nil {
  24. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  25. continue
  26. }
  27. s.userMu.Lock()
  28. if s.userHead == nil {
  29. s.userHead = m
  30. s.userLast = m
  31. } else {
  32. s.userLast.next = m
  33. s.userLast = m
  34. }
  35. s.userMu.Unlock()
  36. m.object = p
  37. // use specify goroutine to merge messages
  38. s.userMergeChans[p.Data.Mid%mergeRoutineNum] <- m
  39. log.Info("userconsumeproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  40. }
  41. }
  42. func (s *Service) usercommitproc() {
  43. commits := make(map[int32]*databus.Message, s.c.Group.User.Size)
  44. for {
  45. done := <-s.userDoneChan
  46. // merge partitions to commit offset
  47. for _, d := range done {
  48. d.done = true
  49. }
  50. s.userMu.Lock()
  51. for ; s.userHead != nil && s.userHead.done; s.userHead = s.userHead.next {
  52. commits[s.userHead.data.Partition] = s.userHead.data
  53. }
  54. s.userMu.Unlock()
  55. for k, m := range commits {
  56. log.Info("usercommitproc committed, key:%s partition:%d offset:%d", m.Key, m.Partition, m.Offset)
  57. m.Commit()
  58. delete(commits, k)
  59. }
  60. }
  61. }
  62. func (s *Service) usermergeproc(c chan *message) {
  63. var (
  64. max = s.c.Group.User.Size
  65. merges = make([]*model.PMsg, 0, max)
  66. marked = make([]*message, 0, max)
  67. ticker = time.NewTicker(time.Duration(s.c.Group.User.Ticker))
  68. )
  69. for {
  70. select {
  71. case msg, ok := <-c:
  72. if !ok {
  73. log.Error("s.usermergeproc closed")
  74. return
  75. }
  76. p, assertOk := msg.object.(*model.PMsg)
  77. if assertOk && strings.HasPrefix(p.Table, "aso_app_perm") && p.Action != "" {
  78. merges = append(merges, p)
  79. }
  80. marked = append(marked, msg)
  81. if len(marked) < max && len(merges) < max {
  82. continue
  83. }
  84. case <-ticker.C:
  85. }
  86. if len(merges) > 0 {
  87. s.setTokens(merges)
  88. merges = make([]*model.PMsg, 0, max)
  89. }
  90. if len(marked) > 0 {
  91. s.userDoneChan <- marked
  92. marked = make([]*message, 0, max)
  93. }
  94. }
  95. }
  96. // setTokens for set tokens.
  97. func (s *Service) setTokens(msgs []*model.PMsg) {
  98. for _, msg := range msgs {
  99. s.setToken(msg.Action, msg.Data)
  100. }
  101. }
  102. // setToken set single token.
  103. func (s *Service) setToken(action string, t *model.Token) {
  104. if action == "" || t == nil || t.Token == "" {
  105. return
  106. }
  107. switch action {
  108. case "insert":
  109. for {
  110. if err := s.d.SetToken(context.TODO(), t); err == nil {
  111. return
  112. }
  113. time.Sleep(time.Second)
  114. }
  115. }
  116. }