aso_bin_log.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strconv"
  6. "time"
  7. "go-common/app/job/main/passport-game-local/model"
  8. "go-common/library/log"
  9. "go-common/library/queue/databus"
  10. )
  11. func (s *Service) asobinlogconsumeproc() {
  12. mergeNum := int64(s.c.Group.AsoBinLog.Num)
  13. for {
  14. msg, ok := <-s.dsAsoBinLogSub.Messages()
  15. if !ok {
  16. log.Error("asobinlogconsumeproc closed")
  17. return
  18. }
  19. // marked head to first commit
  20. m := &message{data: msg}
  21. s.mu.Lock()
  22. if s.head == nil {
  23. s.head = m
  24. s.last = m
  25. } else {
  26. s.last.next = m
  27. s.last = m
  28. }
  29. s.mu.Unlock()
  30. bmsg := new(model.BMsg)
  31. if err := json.Unmarshal(msg.Value, bmsg); err != nil {
  32. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  33. continue
  34. }
  35. mid := int64(0)
  36. if bmsg.Table == _asoAccountTable {
  37. t := new(model.AsoAccount)
  38. if err := json.Unmarshal(bmsg.New, t); err != nil {
  39. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  40. }
  41. mid = t.Mid
  42. m.object = bmsg
  43. log.Info("asobinlogconsumeproc table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset)
  44. } else {
  45. continue
  46. }
  47. s.merges[mid%mergeNum] <- m
  48. }
  49. }
  50. func (s *Service) asobinlogcommitproc() {
  51. for {
  52. done := <-s.done
  53. commits := make(map[int32]*databus.Message)
  54. for _, d := range done {
  55. d.done = true
  56. }
  57. s.mu.Lock()
  58. for ; s.head != nil && s.head.done; s.head = s.head.next {
  59. commits[s.head.data.Partition] = s.head.data
  60. }
  61. s.mu.Unlock()
  62. for _, m := range commits {
  63. m.Commit()
  64. }
  65. }
  66. }
  67. func (s *Service) asobinlogmergeproc(c chan *message) {
  68. var (
  69. max = s.c.Group.AsoBinLog.Size
  70. merges = make([]*model.BMsg, 0, max)
  71. marked = make([]*message, 0, max)
  72. ticker = time.NewTicker(time.Duration(s.c.Group.AsoBinLog.Ticker))
  73. )
  74. for {
  75. select {
  76. case msg, ok := <-c:
  77. if !ok {
  78. log.Error("asobinlogmergeproc closed")
  79. return
  80. }
  81. p, assertOk := msg.object.(*model.BMsg)
  82. if assertOk && p.Action != "" && (p.Table == _asoAccountTable) {
  83. merges = append(merges, p)
  84. }
  85. marked = append(marked, msg)
  86. if len(marked) < max && len(merges) < max {
  87. continue
  88. }
  89. case <-ticker.C:
  90. }
  91. if len(merges) > 0 {
  92. s.processAsoAccLogInfo(merges)
  93. merges = make([]*model.BMsg, 0, max)
  94. }
  95. if len(marked) > 0 {
  96. s.done <- marked
  97. marked = make([]*message, 0, max)
  98. }
  99. }
  100. }
  101. func (s *Service) processAsoAccLogInfo(bmsgs []*model.BMsg) {
  102. for _, msg := range bmsgs {
  103. s.processAsoAccLog(msg)
  104. }
  105. }
  106. func (s *Service) processAsoAccLog(msg *model.BMsg) {
  107. aso := new(model.OriginAsoAccount)
  108. if err := json.Unmarshal(msg.New, aso); err != nil {
  109. log.Error("failed to parse binlog new, json.Unmarshal(%s) error(%v)", string(msg.New), err)
  110. return
  111. }
  112. pmsg := new(model.PMsg)
  113. if "update" == msg.Action {
  114. old := new(model.AsoAccount)
  115. if err := json.Unmarshal(msg.Old, old); err != nil {
  116. log.Error("failed to parse binlog new, json.Unmarshal(%s) error(%v)", string(msg.New), err)
  117. return
  118. }
  119. if old.Pwd != aso.Pwd {
  120. pmsg.Flag = 1
  121. }
  122. }
  123. pmsg.Action = msg.Action
  124. pmsg.Table = msg.Table
  125. pmsg.Data = model.Default(aso)
  126. key := strconv.FormatInt(aso.Mid, 10)
  127. for {
  128. if err := s.dsAsoEncryptTransPub.Send(context.TODO(), key, pmsg); err == nil {
  129. return
  130. }
  131. time.Sleep(time.Second)
  132. }
  133. }