relation.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package service
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "strings"
  8. "time"
  9. "go-common/app/interface/main/push-archive/model"
  10. "go-common/library/conf/env"
  11. "go-common/library/log"
  12. )
  13. const (
  14. _relationMidTable = "user_relation_mid_"
  15. _relationTagUserTable = "user_relation_tag_user_"
  16. _retry = 3
  17. _relationStatusDeleted = 1 // 取消关注
  18. _relationTagSpecial = int64(-10) // 特殊关注的tag
  19. )
  20. func (s *Service) consumeRelationproc() {
  21. defer s.wg.Done()
  22. var err error
  23. for {
  24. msg, ok := <-s.relationSub.Messages()
  25. if !ok {
  26. log.Warn("s.RelationSub has been closed.")
  27. return
  28. }
  29. msg.Commit()
  30. time.Sleep(time.Millisecond)
  31. s.relMo++
  32. log.Info("consume relation key(%s) offset(%d) message(%s)", msg.Key, msg.Offset, msg.Value)
  33. m := new(model.Message)
  34. if err = json.Unmarshal(msg.Value, &m); err != nil {
  35. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  36. continue
  37. }
  38. switch {
  39. case strings.HasPrefix(m.Table, _relationMidTable):
  40. err = s.relationMid(m.Action, m.New, m.Old)
  41. case strings.HasPrefix(m.Table, _relationTagUserTable):
  42. err = s.relationTagUser(m.Action, m.New, m.Old)
  43. default:
  44. continue
  45. }
  46. if err != nil {
  47. log.Error("consumeRelationproc data(%s) error(%+v)", msg.Value, err)
  48. if env.DeployEnv == env.DeployEnvProd {
  49. s.dao.WechatMessage(fmt.Sprintf("push-archive sync relation fail error(%v)", err))
  50. }
  51. }
  52. }
  53. }
  54. func (s *Service) addFans(upper, fans int64, tp int) (err error) {
  55. for i := 0; i < _retry; i++ {
  56. if err = s.dao.AddFans(context.TODO(), upper, fans, tp); err == nil {
  57. break
  58. }
  59. log.Info("retry s.dao.AddFans(%d,%d,%d)", upper, fans, tp)
  60. }
  61. if err != nil {
  62. log.Error("s.dao.AddFans(%d,%d,%d) error(%v)", upper, fans, tp, err)
  63. }
  64. return
  65. }
  66. func (s *Service) delFans(upper, fans int64) (err error) {
  67. for i := 0; i < _retry; i++ {
  68. if err = s.dao.DelFans(context.TODO(), upper, fans); err == nil {
  69. break
  70. }
  71. log.Info("retry s.dao.DelFans(%d,%d)", upper, fans)
  72. }
  73. if err != nil {
  74. log.Error("s.dao.DelFans(%d,%d) error(%v)", upper, fans, err)
  75. }
  76. return
  77. }
  78. func (s *Service) delSpecialAttention(upper, fans int64) (err error) {
  79. for i := 0; i < _retry; i++ {
  80. if err = s.dao.DelSpecialAttention(context.TODO(), upper, fans); err == nil {
  81. break
  82. }
  83. log.Info("retry s.dao.DelSpecialAttention(%d,%d)", upper, fans)
  84. }
  85. if err != nil {
  86. log.Error("s.dao.DelSpecialAttention(%d,%d) error(%v)", upper, fans, err)
  87. }
  88. return
  89. }
  90. // relationMid .
  91. func (s *Service) relationMid(action string, nwMsg, oldMsg []byte) (err error) {
  92. n := &model.Relation{}
  93. if err = json.Unmarshal(nwMsg, n); err != nil {
  94. log.Error("json.Unmarshal(%s) error(%v)", nwMsg, err)
  95. return
  96. }
  97. switch action {
  98. case _insertAct:
  99. if !n.Following() {
  100. return
  101. }
  102. err = s.addFans(n.Fid, n.Mid, model.RelationAttention)
  103. case _updateAct:
  104. o := &model.Relation{}
  105. if err = json.Unmarshal(oldMsg, o); err != nil {
  106. log.Error("json.Unmarshal(%s) error(%v)", oldMsg, err)
  107. return
  108. }
  109. if n.Status == o.Status && n.Attribute == o.Attribute {
  110. return
  111. }
  112. if n.Status == _relationStatusDeleted || !n.Following() {
  113. err = s.delFans(n.Fid, n.Mid) // 删除粉丝关系
  114. } else {
  115. err = s.addFans(n.Fid, n.Mid, model.RelationAttention) // 增加、更新关注数据
  116. }
  117. }
  118. if err != nil {
  119. log.Error("s.relationMid(%s,%s) error(%v)", nwMsg, oldMsg, err)
  120. }
  121. return
  122. }
  123. // relationTagUser .
  124. func (s *Service) relationTagUser(action string, nwMsg, oldMsg []byte) (err error) {
  125. n := &model.RelationTagUser{}
  126. if err = json.Unmarshal(nwMsg, n); err != nil {
  127. log.Error("json.Unmarshal(%s) error(%v)", nwMsg, err)
  128. return
  129. }
  130. tagB, _ := base64.StdEncoding.DecodeString(n.Tag)
  131. n.Tag = string(tagB)
  132. switch action {
  133. case _insertAct:
  134. if !n.HasTag(_relationTagSpecial) {
  135. return
  136. }
  137. err = s.addFans(n.Fid, n.Mid, model.RelationSpecial)
  138. case _updateAct:
  139. o := &model.RelationTagUser{}
  140. if err = json.Unmarshal(oldMsg, o); err != nil {
  141. log.Error("json.Unmarshal(%s) error(%v)", oldMsg, err)
  142. return
  143. }
  144. tagB, _ = base64.StdEncoding.DecodeString(o.Tag)
  145. o.Tag = string(tagB)
  146. nt := n.HasTag(_relationTagSpecial)
  147. ot := o.HasTag(_relationTagSpecial)
  148. if nt && !ot {
  149. err = s.addFans(n.Fid, n.Mid, model.RelationSpecial)
  150. } else if !nt && ot {
  151. err = s.delSpecialAttention(n.Fid, n.Mid)
  152. }
  153. case _deleteAct:
  154. err = s.delSpecialAttention(n.Fid, n.Mid)
  155. }
  156. if err != nil {
  157. log.Error("s.relationTagUser(%s,%s) error(%v)", action, nwMsg, err)
  158. }
  159. return
  160. }