stat.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "go-common/app/job/main/archive-shjd/model"
  7. "go-common/app/service/main/archive/api"
  8. "go-common/app/service/main/archive/model/archive"
  9. "go-common/library/log"
  10. "go-common/library/queue/databus"
  11. )
  12. // consumerproc consumer all topic
  13. func (s *Service) consumerproc(k string, d *databus.Databus) {
  14. defer s.waiter.Done()
  15. var msgs = d.Messages()
  16. for {
  17. var (
  18. err error
  19. ok bool
  20. msg *databus.Message
  21. now = time.Now().Unix()
  22. )
  23. msg, ok = <-msgs
  24. if !ok || s.close {
  25. log.Info("databus(%s) consumer exit", k)
  26. return
  27. }
  28. msg.Commit()
  29. var ms = &model.StatCount{}
  30. if err = json.Unmarshal(msg.Value, ms); err != nil {
  31. log.Error("json.Unmarshal(%s) error(%v)", string(msg.Value), err)
  32. continue
  33. }
  34. if ms.Aid <= 0 || (ms.Type != "archive" && ms.Type != "archive_his") {
  35. log.Warn("message(%s) error", msg.Value)
  36. continue
  37. }
  38. if now-ms.TimeStamp > 1800 {
  39. log.Warn("topic(%s) message(%s) too early", msg.Topic, msg.Value)
  40. continue
  41. }
  42. stat := &model.StatMsg{Aid: ms.Aid, Type: k, Ts: ms.TimeStamp}
  43. switch k {
  44. case model.TypeForView:
  45. stat.Click = ms.Count
  46. case model.TypeForDm:
  47. stat.DM = ms.Count
  48. case model.TypeForReply:
  49. stat.Reply = ms.Count
  50. case model.TypeForFav:
  51. stat.Fav = ms.Count
  52. case model.TypeForCoin:
  53. stat.Coin = ms.Count
  54. case model.TypeForShare:
  55. stat.Share = ms.Count
  56. case model.TypeForRank:
  57. stat.HisRank = ms.Count
  58. case model.TypeForLike:
  59. stat.Like = ms.Count
  60. default:
  61. log.Error("unknow type(%s) message(%s)", k, msg.Value)
  62. continue
  63. }
  64. s.subStatCh[stat.Aid%_sharding] <- stat
  65. log.Info("got message(%+v)", stat)
  66. }
  67. }
  68. func (s *Service) statDealproc(i int) {
  69. defer s.waiter.Done()
  70. var (
  71. ch = s.subStatCh[i]
  72. sm = s.statSM[i]
  73. c = context.TODO()
  74. ls *lastTmStat
  75. err error
  76. )
  77. for {
  78. ms, ok := <-ch
  79. if !ok {
  80. log.Info("statDealproc(%d) quit", i)
  81. return
  82. }
  83. // get stat
  84. if ls, ok = sm[ms.Aid]; !ok {
  85. var stat *api.Stat
  86. for _, arc := range s.arcRPCs {
  87. if stat, err = arc.Stat3(c, &archive.ArgAid2{Aid: ms.Aid}); err == nil {
  88. break
  89. }
  90. }
  91. if stat == nil {
  92. log.Error("stat(%d) is nill", ms.Aid)
  93. continue
  94. }
  95. ls = &lastTmStat{}
  96. ls.stat = stat
  97. sm[ms.Aid] = ls
  98. }
  99. model.Merge(ms, ls.stat)
  100. // update cache
  101. st := &api.Stat{
  102. Aid: ls.stat.Aid,
  103. View: ls.stat.View,
  104. Danmaku: ls.stat.Danmaku,
  105. Reply: ls.stat.Reply,
  106. Fav: ls.stat.Fav,
  107. Coin: ls.stat.Coin,
  108. Share: ls.stat.Share,
  109. NowRank: ls.stat.NowRank,
  110. HisRank: ls.stat.HisRank,
  111. Like: ls.stat.Like,
  112. }
  113. for cluster, arc := range s.arcRPCs {
  114. if err = arc.SetStat2(c, st); err != nil {
  115. log.Error("s.arcRPC.SetStat2(%s) (%+v) error(%v)", cluster, st, err)
  116. }
  117. }
  118. }
  119. }