stat.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/videoup/model/archive"
  6. "go-common/app/job/main/videoup/model/message"
  7. "go-common/library/database/sql"
  8. "go-common/library/log"
  9. )
  10. // statConsumer is stat message consumer.
  11. func (s *Service) statConsumer() {
  12. defer s.wg.Done()
  13. const magicClick = 3000
  14. var (
  15. msgs = s.statSub.Messages()
  16. err error
  17. c = context.TODO()
  18. )
  19. for {
  20. msg, ok := <-msgs
  21. if !ok {
  22. log.Error("s.statSub.Messages closed")
  23. return
  24. }
  25. msg.Commit()
  26. s.statMo++
  27. m := &message.StatMsg{}
  28. if err = json.Unmarshal(msg.Value, m); err != nil {
  29. log.Error("stat %s json.Unmarshal(%s) error(%v)", msg, msg.Value, err)
  30. continue
  31. }
  32. if m.Type != "archive" || m.ID < 8050956 || m.Count < s.thrMin || m.Count > s.thrMax+magicClick {
  33. continue
  34. }
  35. s.checkClick(c, m)
  36. log.Info("statConsumer key(%s) value(%s) partition(%d) offset(%d) commit", msg.Key, msg.Value, msg.Partition, msg.Offset)
  37. }
  38. }
  39. func (s *Service) checkClick(c context.Context, m *message.StatMsg) (err error) {
  40. var a *archive.Archive
  41. if a, err = s.arc.Archive(c, m.ID); err != nil || a == nil {
  42. log.Error("s.arc.Archive(%d) error(%v) or a==nil", m.ID, err)
  43. return
  44. }
  45. if a.Round != archive.RoundReviewFirstWaitTrigger {
  46. log.Warn("archive(%d) round(%d) not 31 wait trigger", a.Aid, a.Round)
  47. return
  48. }
  49. var (
  50. thr = s.thrTpsCache[s.sfTpsCache[a.TypeID]]
  51. click, _ = s.redis.ArcClick(c, a.Aid)
  52. )
  53. if m.Count-click < thr || thr == 0 {
  54. log.Warn("archive(%d) typeThr(%d) nowClick(%d) beforeClick(%d) round(%d)", a.Aid, thr, m.Count, click, a.Round)
  55. return
  56. }
  57. var tx *sql.Tx
  58. if tx, err = s.arc.BeginTran(c); err != nil {
  59. log.Error("s.arc.BeginTran(%d) error(%v)", m.ID, err)
  60. return
  61. }
  62. var round = archive.RoundTriggerClick
  63. if _, err = s.arc.TxUpRound(tx, a.Aid, round); err != nil {
  64. tx.Rollback()
  65. log.Error("s.arc.TxUpRound(%d, %d) error(%d)", a.Aid, round, err)
  66. return
  67. }
  68. a.Round = round
  69. if err = s.tranArchiveOper(tx, a); err != nil {
  70. tx.Rollback()
  71. return
  72. }
  73. if err = tx.Commit(); err != nil {
  74. log.Error("tx.Commit error(%v)", err)
  75. return
  76. }
  77. log.Info("archive(%d) typeThr(%d) nowClick(%d) upRound success", a.Aid, thr, m.Count)
  78. return
  79. }
  80. func (s *Service) addClickToRedis(c context.Context, aid int64) (err error) {
  81. click, err := s.arc.Stat(c, aid)
  82. if err != nil {
  83. log.Error("s.arc.Stat(%d) error(%v)", aid, err)
  84. return
  85. }
  86. if err = s.redis.AddArcClick(c, aid, click); err != nil {
  87. log.Error("s.redis.AddArcClick(%d) error(%v)", click, err)
  88. }
  89. return
  90. }