pwd_log.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "time"
  7. "go-common/app/job/main/passport/model"
  8. "go-common/library/log"
  9. "go-common/library/queue/databus"
  10. )
  11. type pwdLogBMsg struct {
  12. Action string
  13. Table string
  14. New *model.PwdLog
  15. }
  16. func (s *Service) pwdlogconsumeproc() {
  17. mergeRoutineNum := int64(s.c.Group.PwdLog.Num)
  18. for {
  19. msg, ok := <-s.dsPwdLog.Messages()
  20. if !ok {
  21. log.Error("s.pwdlogconsumeproc closed")
  22. return
  23. }
  24. // marked head to first commit
  25. m := &message{data: msg}
  26. p := &pwdLogBMsg{}
  27. if err := json.Unmarshal(msg.Value, p); err != nil {
  28. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  29. continue
  30. }
  31. // 只处理 aso_pwd_log insert binlog
  32. if p.Table != "aso_pwd_log" {
  33. continue
  34. }
  35. if p.Action != "insert" {
  36. continue
  37. }
  38. m.object = p
  39. s.pwdLogMu.Lock()
  40. if s.pwdLogHead == nil {
  41. s.pwdLogHead = m
  42. s.pwdLogLast = m
  43. } else {
  44. s.pwdLogLast.next = m
  45. s.pwdLogLast = m
  46. }
  47. s.pwdLogMu.Unlock()
  48. // use specify goroutine to merge messages
  49. s.pwdLogMergeChans[p.New.Mid%mergeRoutineNum] <- m
  50. log.Info("pwdlogconsumeproc key:%s partition:%d offset:%d", msg.Key, msg.Partition, msg.Offset)
  51. }
  52. }
  53. func (s *Service) pwdlogcommitproc() {
  54. commits := make(map[int32]*databus.Message, s.c.Group.PwdLog.Size)
  55. for {
  56. done := <-s.pwdLogDoneChan
  57. // merge partitions to commit offset
  58. for _, d := range done {
  59. d.done = true
  60. }
  61. s.pwdLogMu.Lock()
  62. for ; s.pwdLogHead != nil && s.pwdLogHead.done; s.pwdLogHead = s.pwdLogHead.next {
  63. commits[s.pwdLogHead.data.Partition] = s.pwdLogHead.data
  64. }
  65. s.pwdLogMu.Unlock()
  66. for k, m := range commits {
  67. log.Info("pwdlogcommitproc committed, key:%s partition:%d offset:%d", m.Key, m.Partition, m.Offset)
  68. m.Commit()
  69. delete(commits, k)
  70. }
  71. }
  72. }
  73. func (s *Service) pwdlogmergeproc(c chan *message) {
  74. var (
  75. max = s.c.Group.PwdLog.Size
  76. merges = make([]*model.PwdLog, 0, max)
  77. marked = make([]*message, 0, max)
  78. ticker = time.NewTicker(time.Duration(s.c.Group.PwdLog.Ticker))
  79. err error
  80. )
  81. for {
  82. select {
  83. case msg, ok := <-c:
  84. if !ok {
  85. log.Error("s.pwdlogmergeproc closed")
  86. return
  87. }
  88. bmsg := &model.BMsg{}
  89. if err = json.Unmarshal(msg.data.Value, bmsg); err != nil {
  90. log.Error("json.Unmarshal(%s) error(%v)", string(msg.data.Value), err)
  91. continue
  92. }
  93. if bmsg.Action == "insert" && strings.HasPrefix(bmsg.Table, "aso_pwd_log") {
  94. p := &model.PwdLog{}
  95. if err = json.Unmarshal(bmsg.New, p); err != nil {
  96. log.Error("json.Unmarshal(%s) error(%v)", string(bmsg.New), err)
  97. continue
  98. }
  99. merges = append(merges, p)
  100. }
  101. marked = append(marked, msg)
  102. if len(marked) < max && len(merges) < max {
  103. continue
  104. }
  105. case <-ticker.C:
  106. }
  107. if len(merges) > 0 {
  108. s.pwdlogprocessMerges(merges)
  109. merges = make([]*model.PwdLog, 0, max)
  110. }
  111. if len(marked) > 0 {
  112. s.logDoneChan <- marked
  113. marked = make([]*message, 0, max)
  114. }
  115. }
  116. }
  117. func (s *Service) pwdlogprocessMerges(merges []*model.PwdLog) {
  118. for _, v := range merges {
  119. for {
  120. res, err := s.d.GetPwdLog(context.Background(), v.ID)
  121. if err != nil {
  122. log.Error("fail to get pwd log, id(%d) err(%v)", v.ID, err)
  123. time.Sleep(_addHBaseRetryDuration)
  124. continue
  125. }
  126. if err := s.addPwdLog(context.Background(), res); err != nil {
  127. time.Sleep(_addHBaseRetryDuration)
  128. continue
  129. }
  130. break
  131. }
  132. }
  133. }
  134. func (s *Service) addPwdLog(c context.Context, v *model.PwdLog) (err error) {
  135. for i := 0; i < _addHBaseRetryCount; i++ {
  136. if err = s.d.AddPwdLogHBase(c, v); err == nil {
  137. return
  138. }
  139. log.Error("failed to add pwd log to hbase, service.dao.AddPwdLogHBase(%+v) error(%v)", v, err)
  140. time.Sleep(_addHBaseRetryDuration)
  141. }
  142. return
  143. }