stat.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package service
  2. import (
  3. "context"
  4. "strconv"
  5. "time"
  6. "go-common/app/job/main/stat/model"
  7. "go-common/app/service/main/archive/api"
  8. "go-common/app/service/main/archive/model/archive"
  9. "go-common/library/cache/memcache"
  10. "go-common/library/log"
  11. )
  12. const (
  13. _prefixStatPB = "stp_"
  14. _prefixClickPB = "clkp_"
  15. )
  16. func statPBKey(aid int64) string {
  17. return _prefixStatPB + strconv.FormatInt(aid, 10)
  18. }
  19. func clickPBKey(aid int64) string {
  20. return _prefixClickPB + strconv.FormatInt(aid, 10)
  21. }
  22. func (s *Service) statDealproc(i int64) {
  23. defer s.waiter.Done()
  24. var (
  25. ch = s.subStatCh[i]
  26. sm = s.statSM[i]
  27. c = context.TODO()
  28. ls *lastTmStat
  29. err error
  30. )
  31. for {
  32. now := time.Now().Unix()
  33. ms, ok := <-ch
  34. if !ok {
  35. s.multiUpdateDB(i, sm)
  36. log.Warn("statDealproc(%d) quit", i)
  37. return
  38. }
  39. if s.maxAid > 0 && s.maxAid+300 < ms.Aid {
  40. log.Warn("aid(%d) too big maxAid(%d)", ms.Aid, s.maxAid)
  41. continue
  42. }
  43. // get stat
  44. if ls, ok = sm[ms.Aid]; !ok {
  45. var stat *api.Stat
  46. if stat, err = s.dao.Stat(c, ms.Aid); err != nil {
  47. log.Error("s.dao.Stat(%d) error(%v)", ms.Aid, err)
  48. continue
  49. }
  50. ls = &lastTmStat{}
  51. if stat == nil {
  52. ls.stat = &api.Stat{Aid: ms.Aid}
  53. ls.last = 0 // NOTE: make sure update db in first.
  54. } else {
  55. ls.stat = stat
  56. ls.last = time.Now().Unix()
  57. }
  58. sm[ms.Aid] = ls
  59. }
  60. model.Merge(ms, ls.stat)
  61. if now-ms.Ts < 60 {
  62. // update cache
  63. s.updateCache(ls.stat)
  64. }
  65. // update db when after 60s
  66. if time.Now().Unix()-ls.last > 120 {
  67. s.updateDB(ls.stat)
  68. delete(sm, ms.Aid) // NOTE: delete make sure the normal scope of memory and can be save all in 120s when close chan.
  69. }
  70. }
  71. }
  72. // updateDB update stat in db.
  73. func (s *Service) updateDB(stat *api.Stat) (err error) {
  74. if _, err := s.dao.Update(context.TODO(), stat); err != nil {
  75. log.Error("s.dao.Update(%v) error(%v)", stat, err)
  76. }
  77. log.Info("update db aid(%d) stat(%+v) success", stat.Aid, stat)
  78. return
  79. }
  80. // multiUpdateDB update some stat in db.
  81. func (s *Service) multiUpdateDB(yu int64, sm map[int64]*lastTmStat) (err error) {
  82. log.Info("start close(%d) multi update stat start", yu)
  83. var (
  84. c = context.TODO()
  85. alloc = [1000]*api.Stat{}
  86. stats = alloc[:0]
  87. i int
  88. )
  89. for aid, ls := range sm {
  90. stats = append(stats, ls.stat)
  91. if i > 0 && i%1000 == 0 {
  92. s.dao.MultiUpdate(c, yu, stats...)
  93. } else if i+1 == len(sm) {
  94. s.dao.MultiUpdate(c, yu, stats...)
  95. } else {
  96. log.Info("start close(%d) aid(%d) append", i, aid)
  97. continue
  98. }
  99. log.Info("start close(%d) multi update stat %d", i, aid)
  100. stats = alloc[:0]
  101. }
  102. log.Info("start close(%d) multi update stat endm", yu)
  103. return
  104. }
  105. // updateCache purge stat info in cache
  106. func (s *Service) updateCache(st *api.Stat) (err error) {
  107. var (
  108. stat3 = &api.Stat{
  109. Aid: st.Aid,
  110. View: int32(st.View),
  111. Danmaku: int32(st.Danmaku),
  112. Reply: int32(st.Reply),
  113. Fav: int32(st.Fav),
  114. Coin: int32(st.Coin),
  115. Share: int32(st.Share),
  116. NowRank: int32(st.NowRank),
  117. HisRank: int32(st.HisRank),
  118. Like: int32(st.Like),
  119. DisLike: 0,
  120. }
  121. click *archive.Click3
  122. upclick = true
  123. )
  124. if click, err = s.dao.Click(context.TODO(), st.Aid); err != nil {
  125. upclick = false
  126. }
  127. if click == nil {
  128. click = &archive.Click3{}
  129. }
  130. for _, mc := range s.memcaches {
  131. var c = context.TODO()
  132. conn := mc.Get(c)
  133. if err = conn.Set(&memcache.Item{Key: statPBKey(stat3.Aid), Object: stat3, Flags: memcache.FlagProtobuf, Expiration: 0}); err != nil {
  134. log.Error("conn1.Set(%s, %+v) error(%v)", statPBKey(stat3.Aid), stat3, err)
  135. }
  136. if upclick {
  137. if err = conn.Set(&memcache.Item{Key: clickPBKey(stat3.Aid), Object: click, Flags: memcache.FlagProtobuf, Expiration: 0}); err != nil {
  138. log.Error("conn1.Set(%s, %+v) error(%v)", clickPBKey(stat3.Aid), click, err)
  139. }
  140. }
  141. if err == nil {
  142. log.Info("update cache aid(%d) stat(%+v) success", st.Aid, stat3)
  143. log.Info("update cache aid(%d) click(%+v) success", st.Aid, click)
  144. }
  145. conn.Close()
  146. }
  147. return
  148. }
  149. // Purge purge arc's stat cache
  150. func (s *Service) Purge(c context.Context, aids []int64) (err error) {
  151. for _, aid := range aids {
  152. var stat *api.Stat
  153. if stat, err = s.dao.Stat(c, aid); err != nil {
  154. return
  155. }
  156. s.updateCache(stat)
  157. }
  158. return
  159. }