stat.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package service
  2. import (
  3. "context"
  4. "strconv"
  5. "time"
  6. artmdl "go-common/app/interface/openplatform/article/model"
  7. "go-common/app/job/openplatform/article/dao"
  8. "go-common/app/job/openplatform/article/model"
  9. "go-common/library/log"
  10. )
  11. func (s *Service) statproc(i int64) {
  12. defer s.waiter.Done()
  13. var (
  14. err error
  15. ls *lastTimeStat
  16. c = context.TODO()
  17. ch = s.statCh[i]
  18. last = s.statLastTime[i]
  19. )
  20. for {
  21. stat, ok := <-ch
  22. if !ok {
  23. log.Warn("statproc(%d) quit", i)
  24. s.multiUpdateDB(i, last)
  25. return
  26. }
  27. // filter view count
  28. if stat.View != nil && *stat.View > 0 {
  29. var ban bool
  30. var reason, valid string
  31. if ban = s.intercept(stat); ban {
  32. log.Info("intercept view count (aid:%d, ip:%s, mid:%d)", stat.Aid, stat.IP, stat.Mid)
  33. dao.PromInfo("stat:访问计数拦截")
  34. reason = "访问计数拦截"
  35. } else if ban = s.dao.DupViewIntercept(c, stat.Aid, stat.Mid); ban {
  36. log.Info("dupintercept view count (aid:%d, ip:%s, mid:%d)", stat.Aid, stat.IP, stat.Mid)
  37. dao.PromInfo("stat:重复访问计数拦截")
  38. reason = "重复访问计数拦截"
  39. } else if stat.CheatInfo != nil {
  40. viewLv, _ := strconv.Atoi(stat.CheatInfo.Lv)
  41. if limitLv, ok1 := s.cheatArts[stat.Aid]; ok1 && (viewLv <= limitLv) {
  42. ban = true
  43. log.Info("lvintercept view count (aid:%d, ip:%s, mid:%d)", stat.Aid, stat.IP, stat.Mid)
  44. dao.PromInfo("stat:等级访问计数拦截")
  45. reason = "等级访问计数拦截"
  46. }
  47. }
  48. if ban {
  49. valid = "0"
  50. } else {
  51. valid = "1"
  52. }
  53. if stat.CheatInfo != nil {
  54. stat.CheatInfo.Valid = valid
  55. stat.CheatInfo.Reason = reason
  56. }
  57. s.cheatInfo(stat.CheatInfo)
  58. if ban {
  59. continue
  60. }
  61. }
  62. // get stat
  63. if ls, ok = last[stat.Aid]; !ok {
  64. var st *artmdl.StatMsg
  65. if st, err = s.dao.Stat(c, stat.Aid); err != nil {
  66. log.Error("s.dao.Stat(%d) error(%+v)", stat.Aid, err)
  67. continue
  68. }
  69. ls = &lastTimeStat{}
  70. if st == nil {
  71. ls.stat = &artmdl.StatMsg{Aid: stat.Aid, View: new(int64), Like: new(int64), Dislike: new(int64), Favorite: new(int64), Reply: new(int64), Share: new(int64), Coin: new(int64)}
  72. ls.time = 0 // NOTE: make sure update db in first.
  73. } else {
  74. ls.stat = st
  75. ls.time = time.Now().Unix()
  76. }
  77. last[stat.Aid] = ls
  78. }
  79. changed := model.Merge(ls.stat, stat)
  80. // update cache
  81. s.updateCache(c, ls.stat, 0)
  82. s.updateSortCache(c, ls.stat.Aid, changed)
  83. // update db after 120s
  84. if time.Now().Unix()-ls.time > s.updateDbInterval {
  85. s.updateDB(c, ls.stat, 0)
  86. s.updateRecheckDB(c, ls.stat)
  87. s.updateSearchStats(c, ls.stat)
  88. delete(last, stat.Aid) // NOTE: delete ensures that memory should be normal in 120s after channel has been closed.
  89. }
  90. }
  91. }
  92. // updateCache purge stat info in cache
  93. func (s *Service) updateCache(c context.Context, sm *artmdl.StatMsg, count int) (err error) {
  94. stat := &artmdl.ArgStats{
  95. Aid: sm.Aid,
  96. Stats: &artmdl.Stats{
  97. View: *sm.View,
  98. Like: *sm.Like,
  99. Dislike: *sm.Dislike,
  100. Favorite: *sm.Favorite,
  101. Reply: *sm.Reply,
  102. Share: *sm.Share,
  103. Coin: *sm.Coin,
  104. },
  105. }
  106. if err = s.articleRPC.SetStat(context.TODO(), stat); err != nil {
  107. log.Error("s.articleRPC.SetStat aid(%d) view(%d) likes(%d) dislike(%d) favorite(%d) reply(%d) share(%d) coin(%d) error(%+v)",
  108. sm.Aid, *sm.View, *sm.Like, *sm.Dislike, *sm.Favorite, *sm.Reply, *sm.Share, *sm.Coin, err)
  109. dao.PromError("stat:更新计数缓存")
  110. s.dao.PushStat(c, &dao.StatRetry{
  111. Action: dao.RetryUpdateStatCache,
  112. Count: count,
  113. Data: sm,
  114. })
  115. return
  116. }
  117. log.Info("update cache success aid(%d) view(%d) likes(%d) dislike(%d) favorite(%d) reply(%d) share(%d) coin(%d)",
  118. sm.Aid, *sm.View, *sm.Like, *sm.Dislike, *sm.Favorite, *sm.Reply, *sm.Share, *sm.Coin)
  119. dao.PromInfo("stat:更新计数缓存")
  120. return
  121. }
  122. // updateSortCache update sort cache
  123. func (s *Service) updateSortCache(c context.Context, aid int64, changed [][2]int64) (err error) {
  124. if len(changed) == 0 {
  125. return
  126. }
  127. arg := &artmdl.ArgSort{
  128. Aid: aid,
  129. Changed: changed,
  130. }
  131. if err = s.articleRPC.UpdateSortCache(context.TODO(), arg); err != nil {
  132. log.Error("s.articleRPC.UpdateSortCache(aid:%v arg: %+v)", aid, arg)
  133. dao.PromError("stat:更新排序缓存")
  134. return
  135. }
  136. log.Info("success s.articleRPC.UpdateSortCache(aid:%v arg: %+v)", aid, arg)
  137. dao.PromInfo("stat:更新排序缓存")
  138. return
  139. }
  140. // updateDB update stat in db.
  141. func (s *Service) updateDB(c context.Context, stat *artmdl.StatMsg, count int) (err error) {
  142. if _, err = s.dao.Update(context.TODO(), stat); err != nil {
  143. dao.PromError("stat:更新计数DB")
  144. s.dao.PushStat(c, &dao.StatRetry{
  145. Action: dao.RetryUpdateStatDB,
  146. Count: count,
  147. Data: stat,
  148. })
  149. return
  150. }
  151. log.Info("update db success aid(%d) view(%d) likes(%d) dislike(%d) favorite(%d) reply(%d) share(%d) coin(%d)",
  152. stat.Aid, *stat.View, *stat.Like, *stat.Dislike, *stat.Favorite, *stat.Reply, *stat.Share, *stat.Coin)
  153. return
  154. }
  155. // multiUpdateDB update some stat in db.
  156. func (s *Service) multiUpdateDB(i int64, last map[int64]*lastTimeStat) (err error) {
  157. log.Info("multiUpdateDB close(%d) start", i)
  158. c := context.TODO()
  159. for aid, ls := range last {
  160. s.updateDB(c, ls.stat, 0)
  161. log.Info("multiUpdateDB close(%d) update stats aid: %d, value: %+v", i, aid, ls.stat)
  162. }
  163. log.Info("multiUpdateDB close(%d) end", i)
  164. return
  165. }
  166. // intercept intercepts illegal views.
  167. func (s *Service) intercept(stat *artmdl.StatMsg) bool {
  168. return s.dao.Intercept(context.TODO(), stat.Aid, stat.Mid, stat.IP)
  169. }
  170. func (s *Service) cheatInfo(cheat *artmdl.CheatInfo) {
  171. if cheat == nil {
  172. return
  173. }
  174. log.Info("cheatInfo %+v", cheat)
  175. if err := s.cheatInfoc.Info(cheat.Valid, cheat.Client, cheat.Cvid, cheat.Mid, cheat.Lv, cheat.Ts, cheat.IP, cheat.UA, cheat.Refer, cheat.Sid, cheat.Buvid, cheat.DeviceID, cheat.Build, cheat.Reason); err != nil {
  176. log.Error("cheatInfo error(%+v)", err)
  177. }
  178. }
  179. func (s *Service) updateRecheckDB(c context.Context, stat *artmdl.StatMsg) (err error) {
  180. var (
  181. publishTime int64
  182. checkState int
  183. )
  184. if s.setting.Recheck.View == 0 || s.setting.Recheck.Day == 0 {
  185. return
  186. }
  187. if isRecheck, _ := s.dao.GetRecheckCache(c, stat.Aid); isRecheck {
  188. return
  189. }
  190. if publishTime, checkState, err = s.dao.GetRecheckInfo(c, stat.Aid); err != nil || checkState != 0 {
  191. return
  192. }
  193. if s.dao.IsAct(c, stat.Aid) {
  194. return
  195. }
  196. if *(stat.View) > s.setting.Recheck.View {
  197. if publishTime+60*60*24*s.setting.Recheck.Day+s.updateDbInterval > time.Now().Unix() {
  198. if err = s.dao.UpdateRecheck(c, stat.Aid); err == nil {
  199. log.Info("update recheck success aid(%d)", stat.Aid)
  200. s.dao.SetRecheckCache(c, stat.Aid)
  201. }
  202. }
  203. }
  204. return
  205. }