bin_log.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "time"
  7. "go-common/app/job/main/passport-game-cloud/model"
  8. "go-common/library/log"
  9. "go-common/library/queue/databus"
  10. )
  11. func (s *Service) binlogconsumeproc() {
  12. mergeRoutineNum := int64(s.c.Group.BinLog.Num)
  13. for {
  14. msg, ok := <-s.binLogDataBus.Messages()
  15. if !ok {
  16. log.Error("binlogconsumeproc closed")
  17. return
  18. }
  19. // marked head to first commit
  20. m := &message{data: msg}
  21. s.mu.Lock()
  22. if s.head == nil {
  23. s.head = m
  24. s.last = m
  25. } else {
  26. s.last.next = m
  27. s.last = m
  28. }
  29. s.mu.Unlock()
  30. bmsg := new(model.BMsg)
  31. if err := json.Unmarshal(msg.Value, bmsg); err != nil {
  32. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  33. continue
  34. }
  35. mid := int64(0)
  36. if bmsg.Table == _asoAccountTable {
  37. t := new(model.OriginAsoAccount)
  38. if err := json.Unmarshal(bmsg.New, t); err != nil {
  39. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  40. continue
  41. }
  42. mid = t.Mid
  43. bmsg.MTS = s.asoAccountInterval.MTS(context.TODO(), t.Mtime)
  44. } else if strings.HasPrefix(bmsg.Table, _tokenTablePrefix) {
  45. t := new(model.OriginPerm)
  46. if err := json.Unmarshal(bmsg.New, t); err != nil {
  47. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  48. continue
  49. }
  50. mid = t.Mid
  51. bmsg.MTS = s.tokenInterval.MTS(context.TODO(), t.Mtime)
  52. } else if strings.HasPrefix(bmsg.Table, _memberTablePrefix) {
  53. t := new(model.OriginMember)
  54. if err := json.Unmarshal(bmsg.New, t); err != nil {
  55. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  56. continue
  57. }
  58. mid = t.Mid
  59. bmsg.MTS = s.memberInterval.MTS(context.TODO(), t.Mtime)
  60. }
  61. m.object = bmsg
  62. // use specify goroutine to merge messages
  63. s.binLogMergeChans[mid%mergeRoutineNum] <- m
  64. log.Info("binlogconsumeproc table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset)
  65. }
  66. }
  67. func (s *Service) binlogcommitproc() {
  68. commits := make(map[int32]*databus.Message, s.c.Group.BinLog.Size)
  69. for {
  70. done := <-s.binLogDoneChan
  71. // merge partitions to commit offset
  72. for _, d := range done {
  73. d.done = true
  74. }
  75. s.mu.Lock()
  76. for ; s.head != nil && s.head.done; s.head = s.head.next {
  77. commits[s.head.data.Partition] = s.head.data
  78. }
  79. s.mu.Unlock()
  80. for k, m := range commits {
  81. log.Info("binlogcommitproc committed, key:%s partition:%d offset:%d", m.Key, m.Partition, m.Offset)
  82. m.Commit()
  83. delete(commits, k)
  84. }
  85. }
  86. }
  87. func (s *Service) binlogmergeproc(c chan *message) {
  88. var (
  89. max = s.c.Group.BinLog.Size
  90. merges = make([]*model.BMsg, 0, max)
  91. marked = make([]*message, 0, max)
  92. ticker = time.NewTicker(time.Duration(s.c.Group.BinLog.Ticker))
  93. )
  94. for {
  95. select {
  96. case msg, ok := <-c:
  97. if !ok {
  98. log.Error("binlogmergeproc closed")
  99. return
  100. }
  101. p, assertOk := msg.object.(*model.BMsg)
  102. if assertOk {
  103. merges = append(merges, p)
  104. }
  105. marked = append(marked, msg)
  106. if len(marked) < max && len(merges) < max {
  107. continue
  108. }
  109. case <-ticker.C:
  110. }
  111. if len(merges) > 0 {
  112. s.process(merges)
  113. merges = make([]*model.BMsg, 0, max)
  114. }
  115. if len(marked) > 0 {
  116. s.binLogDoneChan <- marked
  117. marked = make([]*message, 0, max)
  118. }
  119. }
  120. }
  121. func (s *Service) process(bmsgs []*model.BMsg) {
  122. for _, bmsg := range bmsgs {
  123. if bmsg.Table == _asoAccountTable {
  124. s.processUserInfo(bmsg)
  125. } else if strings.HasPrefix(bmsg.Table, _tokenTablePrefix) {
  126. s.processToken(bmsg)
  127. } else if strings.HasPrefix(bmsg.Table, _memberTablePrefix) {
  128. s.processMemberInfo(bmsg)
  129. }
  130. }
  131. }