archive.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package service
  2. import (
  3. "context"
  4. "sort"
  5. "time"
  6. likemdl "go-common/app/interface/main/activity/model/like"
  7. "go-common/app/job/main/activity/model/like"
  8. "go-common/app/service/main/archive/api"
  9. "go-common/app/service/main/archive/model/archive"
  10. "go-common/library/log"
  11. )
  12. const (
  13. _rankViewPieceSize = 100
  14. _rankCount = 50
  15. )
  16. func (s *Service) subsRankproc() {
  17. for {
  18. if s.closed {
  19. return
  20. }
  21. var (
  22. subs []*like.Subject
  23. err error
  24. )
  25. now := time.Now()
  26. if subs, err = s.dao.SubjectList(context.Background(), []int64{likemdl.PHONEVIDEO, likemdl.SMALLVIDEO}, now); err != nil {
  27. log.Error("viewRankproc s.dao.SubjectList error(%+v)", err)
  28. time.Sleep(100 * time.Millisecond)
  29. continue
  30. }
  31. if len(subs) == 0 {
  32. log.Warn("viewRankproc no subjects time(%d)", now.Unix())
  33. time.Sleep(time.Duration(s.c.Interval.ViewRankInterval))
  34. continue
  35. }
  36. for _, v := range subs {
  37. s.viewRankproc(v.ID)
  38. time.Sleep(100 * time.Millisecond)
  39. }
  40. time.Sleep(time.Duration(s.c.Interval.ViewRankInterval))
  41. }
  42. }
  43. func (s *Service) viewRankproc(sid int64) {
  44. var (
  45. likeCnt int
  46. likes []*like.Like
  47. rankArcs []*api.Arc
  48. err error
  49. )
  50. if likeCnt, err = s.dao.LikeCnt(context.Background(), sid); err != nil {
  51. log.Error("viewRankproc s.dao.LikeCnt(sid:%d) error(%v)", sid, err)
  52. return
  53. }
  54. if likeCnt == 0 {
  55. log.Warn("viewRankproc s.dao.LikeCnt(sid:%d) likeCnt == 0", sid)
  56. return
  57. }
  58. for i := 0; i < likeCnt; i += _rankViewPieceSize {
  59. if likes, err = s.likeList(context.Background(), sid, i, _objectPieceSize, _retryTimes); err != nil {
  60. log.Error("viewRankproc s.likeList(%d,%d,%d) error(%+v)", sid, i, _objectPieceSize, err)
  61. time.Sleep(100 * time.Millisecond)
  62. continue
  63. } else {
  64. var aids []int64
  65. for _, v := range likes {
  66. if v.Wid > 0 {
  67. aids = append(aids, v.Wid)
  68. }
  69. }
  70. var arcs map[int64]*api.Arc
  71. if arcs, err = s.arcs(context.Background(), aids, _retryTimes); err != nil {
  72. log.Error("viewRankproc s.arcs(%v) error(%v)", aids, err)
  73. time.Sleep(100 * time.Millisecond)
  74. continue
  75. } else {
  76. for _, aid := range aids {
  77. if arc, ok := arcs[aid]; ok && arc.IsNormal() {
  78. rankArcs = append(rankArcs, arc)
  79. }
  80. }
  81. sort.Slice(rankArcs, func(i, j int) bool {
  82. return rankArcs[i].Stat.View > rankArcs[j].Stat.View
  83. })
  84. if len(rankArcs) > _rankCount {
  85. rankArcs = rankArcs[:_rankCount]
  86. }
  87. }
  88. }
  89. }
  90. if len(rankArcs) > 0 {
  91. var rankAids []int64
  92. for _, v := range rankArcs {
  93. rankAids = append(rankAids, v.Aid)
  94. }
  95. if err = s.setViewRank(context.Background(), sid, rankAids, _retryTimes); err != nil {
  96. log.Error("viewRankproc s.setObjectStat(%d,%v) error(%+v)", sid, rankAids, err)
  97. }
  98. }
  99. }
  100. func (s *Service) arcs(c context.Context, aids []int64, retryCnt int) (arcs map[int64]*api.Arc, err error) {
  101. for i := 0; i < retryCnt; i++ {
  102. if arcs, err = s.arcRPC.Archives3(c, &archive.ArgAids2{Aids: aids}); err == nil {
  103. break
  104. }
  105. time.Sleep(100 * time.Millisecond)
  106. }
  107. return
  108. }
  109. func (s *Service) setViewRank(c context.Context, sid int64, aids []int64, retryTime int) (err error) {
  110. for i := 0; i < retryTime; i++ {
  111. if err = s.dao.SetViewRank(c, sid, aids); err == nil {
  112. break
  113. }
  114. time.Sleep(100 * time.Millisecond)
  115. }
  116. return
  117. }