relation.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "time"
  7. "go-common/app/job/main/relation-cache/model"
  8. relation "go-common/app/service/main/relation/model"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. xtime "go-common/library/time"
  12. "github.com/pkg/errors"
  13. )
  14. const (
  15. _relationFidTable = "user_relation_fid_"
  16. _relationMidTable = "user_relation_mid_"
  17. _relationStatTable = "user_relation_stat_"
  18. _relationTagUserTable = "user_relation_tag_user_"
  19. )
  20. func (s *Service) relationBinLogproc(ctx context.Context) {
  21. for msg := range s.relationBinLog.Messages() {
  22. if err := s.handleRelationBinLog(ctx, msg); err != nil {
  23. log.Error("Failed to handle relation binlog: %s: %+v", BeautifyMessage(msg), err)
  24. continue
  25. }
  26. log.Info("Succeed to handle relation binlog: %s", BeautifyMessage(msg))
  27. }
  28. }
  29. func (s *Service) handleRelationBinLog(ctx context.Context, msg *databus.Message) error {
  30. defer func() {
  31. if err := msg.Commit(); err != nil {
  32. log.Error("Failed to commit message: %+v", BeautifyMessage(msg))
  33. return
  34. }
  35. }()
  36. mu := &model.Message{}
  37. if err := json.Unmarshal(msg.Value, mu); err != nil {
  38. return errors.WithStack(err)
  39. }
  40. switch {
  41. case strings.HasPrefix(mu.Table, _relationStatTable):
  42. if err := s.stat(ctx, mu.Action, mu.New, mu.Old); err != nil {
  43. return err
  44. }
  45. case strings.HasPrefix(mu.Table, _relationMidTable):
  46. if err := s.relationMid(ctx, mu.Action, mu.New, mu.Old); err != nil {
  47. return err
  48. }
  49. case strings.HasPrefix(mu.Table, _relationFidTable):
  50. if err := s.relationFid(ctx, mu.Action, mu.New, mu.Old); err != nil {
  51. return err
  52. }
  53. case strings.HasPrefix(mu.Table, _relationTagUserTable):
  54. if err := s.tagUser(ctx, mu.New); err != nil {
  55. return err
  56. }
  57. }
  58. return nil
  59. }
  60. // stat
  61. func (s *Service) stat(ctx context.Context, action string, nwMsg []byte, oldMsg []byte) (err error) {
  62. ms := &model.Stat{}
  63. if err = json.Unmarshal(nwMsg, ms); err != nil {
  64. log.Error("json.Unmarshal(%v) error(%v)", nwMsg, err)
  65. return
  66. }
  67. mo := &model.Stat{}
  68. if len(oldMsg) > 0 {
  69. if err = json.Unmarshal(oldMsg, mo); err != nil {
  70. log.Error("json.Unmarshal(%v) error(%v)", oldMsg, err)
  71. err = nil
  72. }
  73. }
  74. return s.dao.DelStatCache(ctx, ms.Mid)
  75. }
  76. // relationMid
  77. func (s *Service) relationMid(ctx context.Context, action string, nwMsg []byte, oldMsg []byte) error {
  78. mr := &model.Relation{}
  79. if err := json.Unmarshal(nwMsg, mr); err != nil {
  80. return errors.WithStack(err)
  81. }
  82. f := &relation.Following{
  83. Mid: mr.Fid,
  84. Attribute: mr.Attribute,
  85. MTime: xtime.Time(time.Now().Unix()),
  86. }
  87. if err := s.upFollowingCache(ctx, mr.Mid, f); err != nil {
  88. return err
  89. }
  90. return s.dao.DelTagsCache(ctx, mr.Mid)
  91. }
  92. // relationFid
  93. func (s *Service) relationFid(ctx context.Context, action string, nwMsg []byte, oldMsg []byte) error {
  94. var or *model.Relation
  95. mr := &model.Relation{}
  96. if err := json.Unmarshal(nwMsg, mr); err != nil {
  97. log.Error("json.Unmarshal(%v) error(%v)", nwMsg, err)
  98. return err
  99. }
  100. if len(oldMsg) > 0 {
  101. or = new(model.Relation)
  102. if err := json.Unmarshal(oldMsg, or); err != nil {
  103. log.Error("json.Unmarshal(%v) error(%v)", oldMsg, err)
  104. }
  105. }
  106. return s.dao.DelFollowerCache(ctx, mr.Fid)
  107. }
  108. func (s *Service) tagUser(ctx context.Context, newMsg []byte) (err error) {
  109. var tags struct {
  110. Fid int64 `json:"fid"`
  111. Mid int64 `json:"mid"`
  112. }
  113. if err = json.Unmarshal(newMsg, &tags); err != nil {
  114. log.Error("json.Unmarshal err(%v)", err)
  115. return
  116. }
  117. return s.dao.DelTagsCache(ctx, tags.Mid)
  118. }
  119. func (s *Service) upFollowingCache(c context.Context, mid int64, f *relation.Following) (err error) {
  120. if f.Attribute == 0 {
  121. s.dao.DelFollowing(c, mid, f)
  122. } else {
  123. if err = s.dao.AddFollowingCache(c, mid, f); err != nil {
  124. return
  125. }
  126. }
  127. if err = s.dao.DelFollowingCache(c, mid); err != nil {
  128. return
  129. }
  130. return s.dao.DelTagCountCache(c, mid)
  131. }