unread_count.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/service/main/feed/dao"
  5. "go-common/app/service/main/feed/model"
  6. "sync/atomic"
  7. "time"
  8. "go-common/app/service/main/archive/model/archive"
  9. feedmdl "go-common/app/service/main/feed/model"
  10. "go-common/library/log"
  11. xtime "go-common/library/time"
  12. "go-common/library/sync/errgroup"
  13. )
  14. // UnreadCount get count of unread archives
  15. func (s *Service) UnreadCount(c context.Context, app bool, withoutBangumi bool, mid int64, ip string) (count int, err error) {
  16. var (
  17. t int64
  18. last time.Time
  19. pullInterval xtime.Duration
  20. marcs map[int64][]*archive.AidPubTime
  21. minTotalCount int
  22. bangumiFeeds []*feedmdl.Feed
  23. unreadCount int64
  24. ft = model.FeedType(app)
  25. )
  26. if t, err = s.dao.LastAccessCache(c, ft, mid); err != nil {
  27. dao.PromError("未读数获取上次访问时间缓存", "s.dao.LastAccessCache(app:%v, mid:%v, err: %v", app, mid, err)
  28. return 0, nil
  29. }
  30. if t == 0 {
  31. return
  32. }
  33. last = time.Unix(t, 0)
  34. pullInterval = s.pullInterval(ft)
  35. if time.Now().Sub(last) < time.Duration(pullInterval) {
  36. count, _ = s.dao.UnreadCountCache(c, ft, mid)
  37. return
  38. }
  39. group, errCtx := errgroup.WithContext(c)
  40. group.Go(func() error {
  41. if app {
  42. minTotalCount = s.c.Feed.AppLength
  43. } else {
  44. minTotalCount = s.c.Feed.WebLength
  45. }
  46. if marcs, err = s.attenUpArcs(errCtx, minTotalCount, mid, ip); err != nil {
  47. dao.PromError("未读数attenUpArcs", "s.attenUpArcs(count:%v, mid:%v, err: %v", minTotalCount, mid, err)
  48. err = nil
  49. return nil
  50. }
  51. for _, arcs := range marcs {
  52. for _, arc := range arcs {
  53. if int64(arc.PubDate) > t {
  54. atomic.AddInt64(&unreadCount, 1)
  55. }
  56. }
  57. }
  58. return nil
  59. })
  60. group.Go(func() error {
  61. if withoutBangumi {
  62. return nil
  63. }
  64. bangumiFeeds = s.genBangumiFeed(errCtx, mid, ip)
  65. for _, f := range bangumiFeeds {
  66. if int64(f.PubDate) > t {
  67. atomic.AddInt64(&unreadCount, 1)
  68. }
  69. }
  70. return nil
  71. })
  72. group.Wait()
  73. count = int(unreadCount)
  74. if count > s.c.Feed.MaxTotalCnt {
  75. count = s.c.Feed.MaxTotalCnt
  76. }
  77. s.addCache(func() {
  78. s.dao.AddUnreadCountCache(context.Background(), ft, mid, count)
  79. })
  80. return
  81. }
  82. // ArticleUnreadCount get count of unread articles
  83. func (s *Service) ArticleUnreadCount(c context.Context, mid int64, ip string) (count int, err error) {
  84. var (
  85. t int64
  86. last time.Time
  87. pullInterval xtime.Duration
  88. ft = model.TypeArt
  89. )
  90. if t, err = s.dao.LastAccessCache(c, ft, mid); err != nil {
  91. log.Error("s.dao.LastAccessCache(app:%v, mid:%v), err: %v", ft, mid, err)
  92. return 0, nil
  93. }
  94. if t == 0 {
  95. // no access, no unread
  96. return
  97. }
  98. last = time.Unix(t, 0)
  99. pullInterval = s.pullInterval(ft)
  100. if time.Now().Sub(last) < time.Duration(pullInterval) {
  101. count, _ = s.dao.UnreadCountCache(c, ft, mid)
  102. return
  103. }
  104. res := s.genArticleFeed(c, mid, s.c.Feed.ArticleFeedLength, ip)
  105. for _, f := range res {
  106. if int64(f.PublishTime) > t {
  107. count++
  108. }
  109. }
  110. if count > s.c.Feed.MaxTotalCnt {
  111. count = s.c.Feed.MaxTotalCnt
  112. }
  113. s.addCache(func() {
  114. s.dao.AddUnreadCountCache(context.Background(), ft, mid, count)
  115. })
  116. return
  117. }