stat.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/favorite/model"
  6. favmdl "go-common/app/service/main/favorite/model"
  7. "go-common/library/log"
  8. )
  9. // consumeStat consumes folder's stat.
  10. func (s *Service) consumeStat() {
  11. defer s.waiter.Done()
  12. for {
  13. select {
  14. case msg, ok := <-s.playStatSub.Messages():
  15. if !ok {
  16. break
  17. }
  18. msg.Commit()
  19. m := &model.PlayReport{}
  20. if err := json.Unmarshal(msg.Value, m); err != nil {
  21. log.Error("fav json.Unmarshal(%s) error(%+v)", msg.Value, err)
  22. continue
  23. }
  24. if s.intercept(context.TODO(), m.ID, m.Mid, m.IP, m.Buvid) { // 防刷
  25. continue
  26. }
  27. if err := s.updatePlayStat(context.TODO(), m.ID); err != nil {
  28. log.Error("s.updatePlayStat(%d) error(%v)", m.ID, err)
  29. }
  30. log.Info("consumePlayStat key:%s partition:%d offset:%d msg: %+v)", msg.Key, msg.Partition, msg.Offset, m)
  31. case msg, ok := <-s.favStatSub.Messages():
  32. if !ok {
  33. break
  34. }
  35. msg.Commit()
  36. m := &model.StatCount{}
  37. if err := json.Unmarshal(msg.Value, m); err != nil {
  38. log.Error("fav json.Unmarshal(%s) error(%+v)", msg.Value, err)
  39. continue
  40. }
  41. if m.Type != "fav_playlist" {
  42. continue
  43. }
  44. if err := s.updateFavStat(context.TODO(), m.ID, m.Count); err != nil {
  45. log.Error("s.updateFav(%d,%d) error(%v)", m.ID, m.Count, err)
  46. }
  47. log.Info("consumeFavStat key:%s partition:%d offset:%d msg: %+v)", msg.Key, msg.Partition, msg.Offset, m)
  48. case msg, ok := <-s.shareStatSub.Messages():
  49. if !ok {
  50. break
  51. }
  52. msg.Commit()
  53. m := &model.StatCount{}
  54. if err := json.Unmarshal(msg.Value, m); err != nil {
  55. log.Error("share json.Unmarshal(%s) error(%+v)", msg.Value, err)
  56. continue
  57. }
  58. if m.Type != "playlist" {
  59. continue
  60. }
  61. if err := s.updateShareStat(context.TODO(), m.ID, m.Count); err != nil {
  62. log.Error("s.statDao.UpdateShare(%d,%d) error(%v)", m.ID, m.Count, err)
  63. }
  64. log.Info("consumeShareStat key:%s partition:%d offset:%d msg: %+v)", msg.Key, msg.Partition, msg.Offset, m)
  65. }
  66. }
  67. }
  68. func (s *Service) intercept(c context.Context, id, mid int64, ip, buvid string) (ban bool) {
  69. if ban = s.statDao.IPBan(c, id, ip); ban {
  70. return
  71. }
  72. return s.statDao.BuvidBan(c, id, mid, ip, buvid)
  73. }
  74. func (s *Service) updatePlayStat(c context.Context, id int64) (err error) {
  75. f, err := s.stat(c, id)
  76. if err != nil {
  77. return
  78. }
  79. f.PlayCount = f.PlayCount + 1
  80. rows, err := s.statDao.UpdatePlay(context.TODO(), id, int64(f.PlayCount))
  81. if err != nil {
  82. log.Error("s.statDao.UpdatePlay(%d,%d) error(%v)", id, f.PlayCount, err)
  83. return
  84. }
  85. if rows > 0 {
  86. if err := s.statDao.SetFolderStatMc(c, id, f); err != nil {
  87. log.Error("s.SetFolderStatMc(%d,%+v) error(%v)", id, f, err)
  88. }
  89. }
  90. return
  91. }
  92. func (s *Service) updateFavStat(c context.Context, id, count int64) (err error) {
  93. f, err := s.stat(c, id)
  94. if err != nil {
  95. return
  96. }
  97. f.FavedCount = int32(count)
  98. rows, err := s.statDao.UpdateFav(context.TODO(), id, count)
  99. if err != nil {
  100. log.Error("s.statDao.UpdateFav(%d,%d) error(%v)", id, count, err)
  101. return
  102. }
  103. if rows > 0 {
  104. if err := s.statDao.SetFolderStatMc(c, id, f); err != nil {
  105. log.Error("s.SetFolderStatMc(%d,%+v) error(%v)", id, f, err)
  106. }
  107. }
  108. return
  109. }
  110. func (s *Service) updateShareStat(c context.Context, id, count int64) (err error) {
  111. f, err := s.stat(c, id)
  112. if err != nil {
  113. return
  114. }
  115. f.ShareCount = int32(count)
  116. rows, err := s.statDao.UpdateShare(context.TODO(), id, count)
  117. if err != nil {
  118. log.Error("s.statDao.UpdateShare(%d,%d) error(%v)", id, count, err)
  119. return
  120. }
  121. if rows > 0 {
  122. if err := s.statDao.SetFolderStatMc(c, id, f); err != nil {
  123. log.Error("s.SetFolderStatMc(%d,%+v) error(%v)", id, f, err)
  124. }
  125. }
  126. return
  127. }
  128. func (s *Service) stat(c context.Context, id int64) (f *favmdl.Folder, err error) {
  129. if f, err = s.statDao.FolderStatMc(c, id); err != nil {
  130. log.Error("s.statDao.FolderStatMc(%d) error(%v)", id, err)
  131. return
  132. }
  133. if f != nil {
  134. return
  135. }
  136. if f, err = s.statDao.Stat(c, id); err != nil {
  137. log.Error("s.statDao.FolderStatMc(%d) error(%v)", id, err)
  138. return
  139. }
  140. if f == nil {
  141. f = new(favmdl.Folder)
  142. }
  143. if err := s.statDao.SetFolderStatMc(c, id, f); err != nil {
  144. log.Error("s.statDao.SetFolderStatMc(%d,%+v) error(%v)", id, f, err)
  145. }
  146. return
  147. }