reply_zset.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/job/main/reply-feed/model"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. )
  10. // func (s *Service) delZSet(ctx context.Context, oid int64, tp int) (err error) {
  11. // var names []string
  12. // s.algorithmsLock.RLock()
  13. // for _, algorithm := range s.algorithms {
  14. // names = append(names, algorithm.Name())
  15. // }
  16. // s.algorithmsLock.RUnlock()
  17. // if err = s.dao.DelReplyZSetRds(ctx, names, oid, tp); err != nil {
  18. // log.Error("Del ZSet from redis oid(%d) type(%d) error(%v)", oid, tp, err)
  19. // }
  20. // return
  21. // }
  22. func (s *Service) remZSet(ctx context.Context, oid int64, tp int, rpID int64) (err error) {
  23. var (
  24. names []string
  25. )
  26. s.algorithmsLock.RLock()
  27. for _, algorithm := range s.algorithms {
  28. names = append(names, algorithm.Name())
  29. }
  30. s.algorithmsLock.RUnlock()
  31. for _, name := range names {
  32. if err = s.dao.RemReplyZSetRds(ctx, name, oid, tp, rpID); err != nil {
  33. log.Error("Remove reply (name: %s, oid: %d, type: %d, rpID: %d) from ZSet failed.", name, oid, tp, rpID)
  34. return
  35. }
  36. }
  37. return
  38. }
  39. func (s *Service) upsertZSet(ctx context.Context, oid int64, tp int) {
  40. var (
  41. rpIDs []int64
  42. rs []*model.ReplyStat
  43. err error
  44. ts int64
  45. )
  46. // 获取计时器
  47. if ts, err = s.dao.CheckerTsRds(ctx, oid, tp); err != nil && err != redis.ErrNil {
  48. // 出错不刷新,如果缓存里还没有的话刷新
  49. return
  50. } else if time.Now().Unix()-ts < s.c.RefreshTime {
  51. // 小于CD时间不刷新
  52. return
  53. }
  54. // 从reply set中取rpIDs
  55. ok, err := s.dao.ExpireReplySetRds(ctx, oid, tp)
  56. if err != nil {
  57. return
  58. }
  59. if ok {
  60. // 缓存有则从redis中取
  61. if rpIDs, err = s.dao.ReplySetRds(ctx, oid, tp); err != nil {
  62. return
  63. }
  64. } else {
  65. // 缓存中没有从DB中取
  66. if rpIDs, err = s.dao.RpIDs(ctx, oid, tp); err != nil {
  67. return
  68. }
  69. // 异步回源
  70. s.taskQ.Do(ctx, func(ctx context.Context) {
  71. s.setReplySetBatch(ctx, oid, tp)
  72. })
  73. }
  74. // 从MC中获取reply stat
  75. if rs, err = s.GetStatsByID(ctx, oid, tp, rpIDs); err != nil {
  76. return
  77. }
  78. // 重新计算分值
  79. rsMap, err := s.recalculateScore(ctx, rs)
  80. if err != nil {
  81. return
  82. }
  83. for name, rs := range rsMap {
  84. name, rs := name, rs
  85. s.replyListQ.Do(ctx, func(ctx context.Context) {
  86. s.dao.SetReplyZSetRds(ctx, name, oid, tp, rs)
  87. })
  88. }
  89. // 更新完后更新计时器
  90. if err = s.dao.SetCheckerTsRds(ctx, oid, tp); err != nil {
  91. log.Error("set refresh checker error (%v)", err)
  92. }
  93. }
  94. // recalculateScore recalculate all e group reply list score.
  95. func (s *Service) recalculateScore(ctx context.Context, stats []*model.ReplyStat) (rsMap map[string][]*model.ReplyScore, err error) {
  96. rsMap = make(map[string][]*model.ReplyScore)
  97. s.algorithmsLock.RLock()
  98. defer s.algorithmsLock.RUnlock()
  99. for _, algorithm := range s.algorithms {
  100. wg := sync.WaitGroup{}
  101. rs := make([]*model.ReplyScore, len(stats))
  102. for i := range stats {
  103. j := i
  104. wg.Add(1)
  105. s.calculator.JobQueue <- func() {
  106. rs[j] = algorithm.Score(stats[j])
  107. wg.Done()
  108. }
  109. }
  110. wg.Wait()
  111. rsMap[algorithm.Name()] = rs
  112. }
  113. return
  114. }
  115. func (s *Service) isHot(ctx context.Context, name string, oid, rpID int64, tp int) (isHot bool, err error) {
  116. rpIDs, err := s.dao.RangeReplyZSetRds(ctx, name, oid, tp, 0, 5)
  117. if err != nil || len(rpIDs) <= 0 {
  118. return
  119. }
  120. rs, err := s.GetStatsByID(ctx, oid, tp, rpIDs)
  121. if err != nil {
  122. return
  123. }
  124. for _, r := range rs {
  125. if r.RpID == rpID && r.Like >= 3 {
  126. isHot = true
  127. return
  128. }
  129. }
  130. return
  131. }