aso_incr_migration.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/passport-encrypt/model"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. )
  10. func (s *Service) asobinlogconsumeproc() {
  11. mergeNum := int64(s.c.Group.AsoBinLog.Num)
  12. for {
  13. msg, ok := <-s.dsAsoBinLogSub.Messages()
  14. if !ok {
  15. log.Error("asobinlogconsumeproc closed")
  16. return
  17. }
  18. // marked head to first commit
  19. m := &message{data: msg}
  20. s.mu.Lock()
  21. if s.head == nil {
  22. s.head = m
  23. s.last = m
  24. } else {
  25. s.last.next = m
  26. s.last = m
  27. }
  28. s.mu.Unlock()
  29. bmsg := new(model.BMsg)
  30. if err := json.Unmarshal(msg.Value, bmsg); err != nil {
  31. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  32. continue
  33. }
  34. mid := int64(0)
  35. if bmsg.Table == _asoAccountTable {
  36. t := new(model.OriginAccount)
  37. if err := json.Unmarshal(bmsg.New, t); err != nil {
  38. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  39. }
  40. mid = t.Mid
  41. m.object = bmsg
  42. log.Info("asobinlogconsumeproc table:%s key:%s partition:%d offset:%d", bmsg.Table, msg.Key, msg.Partition, msg.Offset)
  43. } else {
  44. continue
  45. }
  46. s.merges[mid%mergeNum] <- m
  47. }
  48. }
  49. func (s *Service) asobinlogcommitproc() {
  50. for {
  51. done := <-s.done
  52. commits := make(map[int32]*databus.Message)
  53. for _, d := range done {
  54. d.done = true
  55. }
  56. s.mu.Lock()
  57. for ; s.head != nil && s.head.done; s.head = s.head.next {
  58. commits[s.head.data.Partition] = s.head.data
  59. }
  60. s.mu.Unlock()
  61. for _, m := range commits {
  62. m.Commit()
  63. }
  64. }
  65. }
  66. func (s *Service) asobinlogmergeproc(c chan *message) {
  67. var (
  68. max = s.c.Group.AsoBinLog.Size
  69. merges = make([]*model.BMsg, 0, max)
  70. marked = make([]*message, 0, max)
  71. ticker = time.NewTicker(time.Duration(s.c.Group.AsoBinLog.Ticker))
  72. )
  73. for {
  74. select {
  75. case msg, ok := <-c:
  76. if !ok {
  77. log.Error("asobinlogmergeproc closed")
  78. return
  79. }
  80. p, assertOk := msg.object.(*model.BMsg)
  81. if assertOk && p.Action != "" && (p.Table == _asoAccountTable) {
  82. merges = append(merges, p)
  83. }
  84. marked = append(marked, msg)
  85. if len(marked) < max && len(merges) < max {
  86. continue
  87. }
  88. case <-ticker.C:
  89. }
  90. if len(merges) > 0 {
  91. s.processAsoAccLogInfo(merges)
  92. merges = make([]*model.BMsg, 0, max)
  93. }
  94. if len(marked) > 0 {
  95. s.done <- marked
  96. marked = make([]*message, 0, max)
  97. }
  98. }
  99. }
  100. func (s *Service) processAsoAccLogInfo(bmsgs []*model.BMsg) {
  101. for _, msg := range bmsgs {
  102. s.processAsoAccLog(msg)
  103. }
  104. }
  105. func (s *Service) processAsoAccLog(msg *model.BMsg) {
  106. aso := new(model.OriginAccount)
  107. if err := json.Unmarshal(msg.New, aso); err != nil {
  108. log.Error("failed to parse binlog new, json.Unmarshal(%s) error(%v)", string(msg.New), err)
  109. return
  110. }
  111. if _updateAction == msg.Action {
  112. enAso := EncryptAccount(aso)
  113. s.updateEncryptAccount(context.TODO(), enAso)
  114. }
  115. if _insertAction == msg.Action {
  116. enAso := EncryptAccount(aso)
  117. s.saveEncryptAccount(context.TODO(), enAso)
  118. }
  119. if _deleteAction == msg.Action {
  120. s.delEncryptAccount(context.TODO(), aso.Mid)
  121. }
  122. }