encrypt_trans.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/passport-game-cloud/model"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. )
  10. func (s *Service) encrypttransconsumeproc() {
  11. var (
  12. mergeRoutineNum = int64(s.c.Group.EncryptTrans.Num)
  13. msgs = s.encryptTransDataBus.Messages()
  14. )
  15. for {
  16. msg, ok := <-msgs
  17. if !ok {
  18. log.Error("encrypttransconsumeproc closed")
  19. return
  20. }
  21. // marked head to first commit
  22. m := &message{data: msg}
  23. s.asoMu.Lock()
  24. if s.asoHead == nil {
  25. s.asoHead = m
  26. s.asoLast = m
  27. } else {
  28. s.asoLast.next = m
  29. s.asoLast = m
  30. }
  31. s.asoMu.Unlock()
  32. p := new(model.PMsg)
  33. if err := json.Unmarshal(msg.Value, p); err != nil {
  34. log.Error("encrypttransconsumeproc unmarshal failed, json.Unmarshal(%s) error(%v)", msg.Value, err)
  35. continue
  36. }
  37. if p.Table == _asoAccountTable {
  38. p.MTS = s.transInterval.MTS(context.TODO(), p.Data.Mtime)
  39. }
  40. m.object = p
  41. s.encryptTransMergeChans[p.Data.Mid%mergeRoutineNum] <- m
  42. log.Info("encrypttransconsumeproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  43. }
  44. }
  45. func (s *Service) encrypttranscommitproc() {
  46. commits := make(map[int32]*databus.Message, s.c.Group.EncryptTrans.Size)
  47. for {
  48. done := <-s.encryptTransDoneChan
  49. for _, d := range done {
  50. d.done = true
  51. }
  52. s.asoMu.Lock()
  53. for ; s.asoHead != nil && s.asoHead.done; s.asoHead = s.asoHead.next {
  54. commits[s.asoHead.data.Partition] = s.asoHead.data
  55. }
  56. s.asoMu.Unlock()
  57. for k, m := range commits {
  58. log.Info("encrypttranscommitproc committed, key:%s partition:%d offset:%d", m.Key, m.Partition, m.Offset)
  59. m.Commit()
  60. delete(commits, k)
  61. }
  62. }
  63. }
  64. func (s *Service) encrypttransmergeproc(c chan *message) {
  65. var (
  66. max = s.c.Group.EncryptTrans.Size
  67. merges = make([]*model.PMsg, 0, max)
  68. marked = make([]*message, 0, max)
  69. ticker = time.NewTicker(time.Duration(s.c.Group.EncryptTrans.Ticker))
  70. )
  71. for {
  72. select {
  73. case msg, ok := <-c:
  74. if !ok {
  75. log.Error("encrypttransmergeproc closed")
  76. return
  77. }
  78. p, assertOK := msg.object.(*model.PMsg)
  79. if assertOK {
  80. merges = append(merges, p)
  81. }
  82. marked = append(marked, msg)
  83. if len(marked) < max && len(merges) < max {
  84. continue
  85. }
  86. case <-ticker.C:
  87. }
  88. if len(merges) > 0 {
  89. s.processAsoAcc(merges)
  90. merges = make([]*model.PMsg, 0, max)
  91. }
  92. if len(marked) > 0 {
  93. s.encryptTransDoneChan <- marked
  94. marked = make([]*message, 0, max)
  95. }
  96. }
  97. }
  98. func (s *Service) processAsoAcc(pmsgs []*model.PMsg) {
  99. for _, p := range pmsgs {
  100. if p.Action != "" && p.Table == _asoAccountTable {
  101. s.processAsoAccSub(p)
  102. }
  103. }
  104. }