databus.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/reply-feed/model"
  6. "go-common/library/log"
  7. )
  8. // func (s *Service) eventproc() {
  9. // defer s.waiter.Done()
  10. // msgs := s.eventConsumer.Messages()
  11. // ctx := context.Background()
  12. // for {
  13. // msg, ok := <-msgs
  14. // if !ok {
  15. // log.Warn("databus consumer channel has been closed.")
  16. // return
  17. // }
  18. // if msg.Topic != s.c.Databus.Event.Topic {
  19. // log.Warn("wrong topic actual (%s) expect (%s)", msg.Topic, s.c.Databus.Stats.Topic)
  20. // continue
  21. // }
  22. // value := &model.EventMsg{}
  23. // if err := json.Unmarshal(msg.Value, value); err != nil {
  24. // log.Error("json.Unmarshal(%v) error(%v)", msg.Value, err)
  25. // continue
  26. // }
  27. // switch value.Action {
  28. // case model.DatabusActionReIdx:
  29. // s.setReplySetBatch(ctx, value.Oid, value.Tp)
  30. // s.upsertZSet(ctx, value.Oid, value.Tp)
  31. // default:
  32. // continue
  33. // }
  34. // msg.Commit()
  35. // log.Info("consumer topic:%s, partitionId:%d, offset:%d, Key:%s, Value:%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
  36. // }
  37. // }
  38. func (s *Service) statsproc() {
  39. defer s.waiter.Done()
  40. msgs := s.statsConsumer.Messages()
  41. for {
  42. msg, ok := <-msgs
  43. if !ok {
  44. log.Warn("databus consumer channel has been closed.")
  45. return
  46. }
  47. if msg.Topic != s.c.Databus.Stats.Topic {
  48. log.Warn("wrong topic actual (%s) expect (%s)", msg.Topic, s.c.Databus.Stats.Topic)
  49. continue
  50. }
  51. value := &model.StatsMsg{}
  52. if err := json.Unmarshal(msg.Value, value); err != nil {
  53. log.Error("json.Unmarshal(%v) error(%v)", msg.Value, err)
  54. continue
  55. }
  56. // 脏数据
  57. if value.Reply == nil || value.Subject == nil || (value.Action == model.DatabusActionReport && value.Report == nil) {
  58. log.Error("illegal message (%v)", value)
  59. continue
  60. }
  61. ctx := context.Background()
  62. // 针对评论列表的流程
  63. s.replyListFlow(ctx, value)
  64. // 针对统计数据的流程
  65. s.statisticsFlow(ctx, value)
  66. msg.Commit()
  67. log.Info("consumer topic:%s, partitionId:%d, offset:%d, Key:%s, Value:%s", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
  68. }
  69. }
  70. func (s *Service) statisticsFlow(ctx context.Context, value *model.StatsMsg) {
  71. var (
  72. reply = value.Reply
  73. oid = value.Subject.Oid
  74. tp = value.Subject.Type
  75. rpID = reply.RpID
  76. isHotReply bool
  77. name string
  78. err error
  79. )
  80. s.statisticsLock.RLock()
  81. name = s.statisticsStats[value.Sharding()].Name
  82. s.statisticsLock.RUnlock()
  83. if value.HotCondition() {
  84. if !reply.IsRoot() {
  85. rpID = reply.Root
  86. }
  87. if name == model.DefaultAlgorithm {
  88. if isHotReply, err = s.dao.IsOriginHot(ctx, oid, rpID, tp); err != nil {
  89. return
  90. }
  91. } else {
  92. if isHotReply, err = s.isHot(ctx, name, oid, rpID, tp); err != nil {
  93. return
  94. }
  95. }
  96. }
  97. s.addUV(ctx, value, isHotReply)
  98. switch value.Action {
  99. case model.DatabusActionLike:
  100. if isHotReply {
  101. s.statisticsStats[value.Sharding()].HotLike++
  102. }
  103. s.statisticsStats[value.Sharding()].TotalLike++
  104. case model.DatabusActionHate:
  105. if isHotReply {
  106. s.statisticsStats[value.Sharding()].HotHate++
  107. }
  108. s.statisticsStats[value.Sharding()].TotalHate++
  109. case model.DatabusActionCancelLike:
  110. if isHotReply && s.statisticsStats[value.Sharding()].HotLike > 0 {
  111. s.statisticsStats[value.Sharding()].HotLike--
  112. }
  113. if s.statisticsStats[value.Sharding()].TotalLike > 0 {
  114. s.statisticsStats[value.Sharding()].TotalLike--
  115. }
  116. case model.DatabusActionCancelHate:
  117. if isHotReply && s.statisticsStats[value.Sharding()].HotHate > 0 {
  118. s.statisticsStats[value.Sharding()].HotHate--
  119. }
  120. if s.statisticsStats[value.Sharding()].TotalHate > 0 {
  121. s.statisticsStats[value.Sharding()].TotalHate--
  122. }
  123. case model.DatabusActionReport:
  124. if isHotReply {
  125. s.statisticsStats[value.Sharding()].HotReport++
  126. }
  127. s.statisticsStats[value.Sharding()].TotalReport++
  128. case model.DatabusActionReply:
  129. if reply.IsRoot() {
  130. s.statisticsStats[value.Sharding()].TotalRootReply++
  131. } else {
  132. if isHotReply {
  133. s.statisticsStats[value.Sharding()].HotChildReply++
  134. }
  135. s.statisticsStats[value.Sharding()].TotalChildReply++
  136. }
  137. }
  138. }
  139. func (s *Service) replyListFlow(ctx context.Context, value *model.StatsMsg) {
  140. var (
  141. subject = value.Subject
  142. reply = value.Reply
  143. oid = subject.Oid
  144. tp = subject.Type
  145. stat *model.ReplyStat
  146. reportCount int
  147. err error
  148. )
  149. if value.Report == nil {
  150. reportCount = 0
  151. } else {
  152. reportCount = value.Report.Count
  153. }
  154. // if root reply get stat, else get root reply stat
  155. if reply.IsRoot() {
  156. stat = &model.ReplyStat{
  157. RpID: reply.RpID,
  158. Reply: reply.RCount,
  159. Like: reply.Like,
  160. Hate: reply.Hate,
  161. Report: reportCount,
  162. SubjectTime: subject.CTime,
  163. ReplyTime: reply.CTime,
  164. }
  165. } else {
  166. if stat, err = s.GetStatByID(ctx, oid, tp, reply.Root); err != nil || stat == nil {
  167. return
  168. }
  169. }
  170. if reply.IsRoot() {
  171. switch value.Action {
  172. case model.DatabusActionTop, model.DatabusActionDel, model.DatabusActionRptDel:
  173. s.remReply(ctx, oid, tp, stat.RpID)
  174. case model.DatabusActionUnTop, model.DatabusActionRecover, model.DatabusActionReply:
  175. s.addReplySet(ctx, oid, tp, stat.RpID)
  176. case model.DatabusActionLike, model.DatabusActionCancelLike, model.DatabusActionCancelHate, model.DatabusActionHate, model.DatabusActionReport:
  177. s.updateStat(ctx, stat.RpID, stat)
  178. default:
  179. return
  180. }
  181. } else {
  182. switch value.Action {
  183. case model.DatabusActionReply, model.DatabusActionRecover:
  184. stat.Reply++
  185. case model.DatabusActionDel, model.DatabusActionRptDel:
  186. if stat.Reply > 0 {
  187. stat.Reply--
  188. }
  189. default:
  190. return
  191. }
  192. s.updateStat(ctx, stat.RpID, stat)
  193. }
  194. s.upsertZSet(ctx, oid, tp)
  195. }