online.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package service
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "time"
  6. "go-common/app/interface/main/web/model"
  7. arcmdl "go-common/app/service/main/archive/api"
  8. "go-common/app/service/main/archive/model/archive"
  9. "go-common/library/log"
  10. )
  11. const (
  12. _onlineListNum = 50
  13. _onlinePubdateLimit = 86400
  14. _onlineDanmuLimit = 3
  15. )
  16. // OnlineArchiveCount Get Archive Count.
  17. func (s *Service) OnlineArchiveCount(c context.Context) (rs *model.Online) {
  18. rs = &model.Online{
  19. RegionCount: s.regionCount,
  20. AllCount: s.allArchivesCount,
  21. PlayOnline: s.playOnline,
  22. WebOnline: s.webOnline,
  23. }
  24. return
  25. }
  26. // OnlineList online archive list.
  27. func (s *Service) OnlineList(c context.Context) (res []*model.OnlineArc, err error) {
  28. res = s.onlineArcs
  29. if len(res) > 0 {
  30. return
  31. }
  32. return s.dao.OnlineListBakCache(c)
  33. }
  34. func (s *Service) newCountproc() {
  35. var (
  36. allCount int64
  37. reIDs []int16
  38. err error
  39. res map[int16]int
  40. )
  41. for {
  42. for len(s.rids) == 0 {
  43. time.Sleep(time.Second)
  44. }
  45. reIDs = []int16{}
  46. allCount = 0
  47. for regionID := range s.rids {
  48. reIDs = append(reIDs, int16(regionID))
  49. }
  50. arg := &archive.ArgRankTopsCount2{ReIDs: reIDs}
  51. if res, err = s.arc.RanksTopCount2(context.Background(), arg); err != nil {
  52. log.Error("s.arc.RanksTopCount2(%v) error (%v)", arg, err)
  53. time.Sleep(time.Second)
  54. continue
  55. } else if len(res) == 0 {
  56. log.Error("s.arc.RanksTopCount2(%v) res len(%d) == 0", arg, res)
  57. time.Sleep(time.Second)
  58. continue
  59. }
  60. s.regionCount = res
  61. for _, count := range res {
  62. allCount += int64(count)
  63. }
  64. if allCount > 0 {
  65. atomic.StoreInt64(&s.allArchivesCount, allCount)
  66. }
  67. time.Sleep(time.Duration(s.c.WEB.PullRegionInterval))
  68. }
  69. }
  70. func (s *Service) onlineCountproc() {
  71. var (
  72. count *model.OnlineCount
  73. liveCount *model.LiveOnlineCount
  74. playOnline, webOnline int64
  75. err error
  76. )
  77. for {
  78. if count, err = s.dao.OnlineCount(context.Background()); err != nil {
  79. time.Sleep(time.Second)
  80. continue
  81. } else if count != nil {
  82. playOnline = count.ConnCount
  83. webOnline = count.IPCount
  84. }
  85. if liveCount, err = s.dao.LiveOnlineCount(context.Background()); err != nil || liveCount == nil {
  86. time.Sleep(time.Second)
  87. continue
  88. } else if liveCount != nil {
  89. playOnline += liveCount.TotalOnline
  90. webOnline += liveCount.IPConnect
  91. }
  92. if playOnline > 0 && webOnline > 0 {
  93. atomic.StoreInt64(&s.playOnline, playOnline)
  94. atomic.StoreInt64(&s.webOnline, webOnline)
  95. }
  96. time.Sleep(time.Duration(s.c.WEB.PullRegionInterval))
  97. }
  98. }
  99. func (s *Service) onlineListproc() {
  100. var (
  101. err error
  102. aids []*model.OnlineAid
  103. arcs *arcmdl.ArcsReply
  104. )
  105. for {
  106. if aids, err = s.dao.OnlineList(context.Background(), _onlineListNum); err != nil {
  107. time.Sleep(time.Second)
  108. continue
  109. } else if len(aids) == 0 {
  110. log.Error("s.dao.OnlineList data len == 0")
  111. time.Sleep(time.Second)
  112. continue
  113. }
  114. var aidArg []int64
  115. for _, v := range aids {
  116. aidArg = append(aidArg, v.Aid)
  117. }
  118. archivesArgLog("onlineListproc", aidArg)
  119. if arcs, err = s.arcClient.Arcs(context.Background(), &arcmdl.ArcsRequest{Aids: aidArg}); err != nil {
  120. log.Error("s.arcClient.Arcs(%v) error (%v)", aidArg, err)
  121. time.Sleep(time.Second)
  122. continue
  123. } else {
  124. var onlineArcs []*model.OnlineArc
  125. for _, v := range aids {
  126. if arc, ok := arcs.Arcs[v.Aid]; ok && arc != nil && arc.IsNormal() && arc.AttrVal(archive.AttrBitNoRank) == archive.AttrNo {
  127. if arc.AttrVal(archive.AttrBitIsBangumi) == archive.AttrNo && arc.AttrVal(archive.AttrBitIsMovie) == archive.AttrNo {
  128. if time.Now().Unix()-int64(arc.PubDate) > _onlinePubdateLimit {
  129. if arc.Stat.Danmaku == 0 || v.Count/int64(arc.Stat.Danmaku) > _onlineDanmuLimit {
  130. continue
  131. }
  132. }
  133. }
  134. onlineArcs = append(onlineArcs, &model.OnlineArc{Arc: arc, OnlineCount: v.Count})
  135. if len(onlineArcs) >= s.c.WEB.OnlineCount {
  136. break
  137. }
  138. }
  139. }
  140. if len(onlineArcs) >= s.c.WEB.OnlineCount {
  141. s.onlineArcs = onlineArcs
  142. s.cache.Do(context.Background(), func(c context.Context) {
  143. s.dao.SetOnlineListBakCache(context.Background(), onlineArcs)
  144. })
  145. } else {
  146. log.Error("s.dao.OnlineList data len(%d) error", len(onlineArcs))
  147. time.Sleep(time.Second)
  148. continue
  149. }
  150. }
  151. time.Sleep(time.Duration(s.c.WEB.PullOnlineInterval))
  152. }
  153. }