member.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "go-common/app/job/main/member-cache/model"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. "github.com/pkg/errors"
  10. )
  11. // consts
  12. const (
  13. _MemberBaseInfo = "user_base_"
  14. _MemberMoral = "user_moral"
  15. _memberExp = "user_exp_"
  16. _memberRealnameApply = "realname_apply"
  17. _memberRealnameInfo = "realname_info"
  18. )
  19. // consts
  20. const (
  21. expMulti = 100
  22. level1 = 1
  23. level2 = 200
  24. level3 = 1500
  25. level4 = 4500
  26. level5 = 10800
  27. level6 = 28800
  28. )
  29. func isExpAndLevelChange(mu *model.Binlog) (bool, bool) {
  30. if mu.Action == "insert" {
  31. return true, true
  32. }
  33. if len(mu.Old) <= 0 || len(mu.New) <= 0 {
  34. return false, false
  35. }
  36. old := &model.ExpMessage{}
  37. new := &model.ExpMessage{}
  38. if err := json.Unmarshal(mu.New, new); err != nil {
  39. return false, false
  40. }
  41. if err := json.Unmarshal(mu.Old, old); err != nil {
  42. return false, false
  43. }
  44. expChange := false
  45. levelChange := false
  46. if old.Exp != new.Exp {
  47. expChange = true
  48. }
  49. if level(old.Exp) != level(new.Exp) {
  50. levelChange = true
  51. }
  52. return expChange, levelChange
  53. }
  54. func level(exp int64) int8 {
  55. exp = exp / expMulti
  56. switch {
  57. case exp < level1:
  58. return 0
  59. case exp < level2:
  60. return 1
  61. case exp < level3:
  62. return 2
  63. case exp < level4:
  64. return 3
  65. case exp < level5:
  66. return 4
  67. case exp < level6:
  68. return 5
  69. default:
  70. return 6
  71. }
  72. }
  73. func resolveBaseAct(old json.RawMessage, new json.RawMessage) string {
  74. if old == nil || new == nil {
  75. return model.ActUpdateFace
  76. }
  77. ob := &model.MemberBase{}
  78. if err := json.Unmarshal(old, ob); err != nil {
  79. log.Error("Failed to parse data: %s", string(old))
  80. return model.ActUpdateFace
  81. }
  82. nb := &model.MemberBase{}
  83. if err := json.Unmarshal(new, nb); err != nil {
  84. log.Error("Failed to parse data: %s", string(new))
  85. return model.ActUpdateFace
  86. }
  87. if ob.Name != nb.Name {
  88. return model.ActUpdateUname
  89. }
  90. if ob.Face != nb.Face {
  91. return model.ActUpdateFace
  92. }
  93. return model.ActUpdateFace
  94. }
  95. func (s *Service) handleMemberBinLog(ctx context.Context, msg *databus.Message) error {
  96. defer func() {
  97. if err := msg.Commit(); err != nil {
  98. log.Error("Failed to commit message: %+v", BeautifyMessage(msg))
  99. return
  100. }
  101. }()
  102. mu := &model.Binlog{}
  103. if err := json.Unmarshal(msg.Value, mu); err != nil {
  104. return errors.WithStack(err)
  105. }
  106. mmid := &model.NeastMid{}
  107. bs := mu.New
  108. if len(bs) <= 0 {
  109. bs = mu.Old
  110. }
  111. if err := json.Unmarshal(bs, mmid); err != nil {
  112. return errors.WithStack(err)
  113. }
  114. switch {
  115. case strings.HasPrefix(mu.Table, _MemberBaseInfo):
  116. if err := s.dao.DelBaseInfoCache(ctx, mmid.Mid); err != nil {
  117. return err
  118. }
  119. s.NotifyPurgeCache(ctx, mmid.Mid, resolveBaseAct(mu.Old, mu.New))
  120. case mu.Table == _MemberMoral:
  121. if err := s.dao.DelMoralCache(ctx, mmid.Mid); err != nil {
  122. return err
  123. }
  124. s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateMoral)
  125. case mu.Table == _memberRealnameInfo || mu.Table == _memberRealnameApply:
  126. if err := s.dao.DeleteRealnameCache(ctx, mmid.Mid); err != nil {
  127. return err
  128. }
  129. s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateRealname)
  130. case strings.HasPrefix(mu.Table, _memberExp):
  131. mexp := &model.NewExp{}
  132. if err := json.Unmarshal(mu.New, mexp); err != nil {
  133. return errors.WithStack(err)
  134. }
  135. if err := s.dao.SetExpCache(ctx, mexp.Mid, mexp.Exp); err != nil {
  136. return err
  137. }
  138. expChange, levelChange := isExpAndLevelChange(mu)
  139. if expChange {
  140. s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateExp)
  141. }
  142. if levelChange {
  143. s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateLevel)
  144. }
  145. default:
  146. if mmid.Mid <= 0 {
  147. log.Info("Invalid message: %+v", BeautifyMessage(msg))
  148. return nil
  149. }
  150. s.deleteAllCache(ctx, mmid.Mid)
  151. s.NotifyPurgeCache(ctx, mmid.Mid, model.ActUpdateByAdmin)
  152. }
  153. return nil
  154. }
  155. func (s *Service) deleteAllCache(ctx context.Context, mid int64) error {
  156. if err := s.dao.DelBaseInfoCache(ctx, mid); err != nil {
  157. log.Error("Failed to delete cache: %+v", err)
  158. }
  159. if err := s.dao.DelMoralCache(ctx, mid); err != nil {
  160. log.Error("Failed to delete cache: %+v", err)
  161. }
  162. if err := s.dao.DeleteRealnameCache(ctx, mid); err != nil {
  163. log.Error("Failed to delete cache: %+v", err)
  164. }
  165. if err := s.dao.DelExpCache(ctx, mid); err != nil {
  166. log.Error("Failed to delete cache: %+v", err)
  167. }
  168. return nil
  169. }
  170. func (s *Service) memberBinLogproc(ctx context.Context) {
  171. for msg := range s.memberBinLog.Messages() {
  172. if err := s.handleMemberBinLog(ctx, msg); err != nil {
  173. log.Error("Failed to handle member binlog: %s: %+v", BeautifyMessage(msg), err)
  174. continue
  175. }
  176. log.Info("Succeed to handle member binlog: %s", BeautifyMessage(msg))
  177. }
  178. }