service.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package service
  2. import (
  3. "context"
  4. "strconv"
  5. "sync"
  6. "time"
  7. "go-common/app/service/main/reply-feed/conf"
  8. "go-common/app/service/main/reply-feed/dao"
  9. "go-common/app/service/main/reply-feed/model"
  10. "go-common/library/log"
  11. "go-common/library/net/netutil"
  12. "github.com/robfig/cron"
  13. )
  14. // Service struct
  15. type Service struct {
  16. c *conf.Config
  17. dao *dao.Dao
  18. // backoff
  19. bc netutil.BackoffConfig
  20. cron *cron.Cron
  21. statisticsStats []model.StatisticsStat
  22. statisticsLock sync.RWMutex
  23. // eventProducer *databus.Databus
  24. midMapping map[int64]int
  25. oidWhiteList map[int64]int
  26. }
  27. // New init
  28. func New(c *conf.Config) (s *Service) {
  29. s = &Service{
  30. c: c,
  31. dao: dao.New(c),
  32. bc: netutil.BackoffConfig{
  33. MaxDelay: 1 * time.Second,
  34. BaseDelay: 100 * time.Millisecond,
  35. Factor: 1.6,
  36. Jitter: 0.2,
  37. },
  38. cron: cron.New(),
  39. statisticsStats: make([]model.StatisticsStat, model.SlotsNum),
  40. // eventProducer: databus.New(c.Databus.Event),
  41. midMapping: make(map[int64]int),
  42. oidWhiteList: make(map[int64]int),
  43. }
  44. // toml不支持int为key
  45. for k, v := range s.c.MidMapping {
  46. mid, err := strconv.ParseInt(k, 10, 64)
  47. if err != nil {
  48. continue
  49. }
  50. s.midMapping[mid] = v
  51. }
  52. for k, v := range s.c.OidWhiteList {
  53. oid, err := strconv.ParseInt(k, 10, 64)
  54. if err != nil {
  55. continue
  56. }
  57. s.oidWhiteList[oid] = v
  58. }
  59. // 初始化各个实验组所占流量槽位
  60. var err error
  61. if err = s.loadSlots(); err != nil {
  62. panic(err)
  63. }
  64. go s.loadproc()
  65. // 每整小时执行一次将统计数据写入DB
  66. s.cron.AddFunc("@hourly", func() {
  67. s.persistStatistics()
  68. })
  69. s.cron.Start()
  70. return s
  71. }
  72. // namePercent ...
  73. // func (s *Service) namePercent() map[string]int64 {
  74. // p := make(map[string]int64)
  75. // s.statisticsLock.RLock()
  76. // for _, slot := range s.statisticsStats {
  77. // if _, ok := p[slot.Name]; ok {
  78. // p[slot.Name]++
  79. // } else {
  80. // p[slot.Name] = 1
  81. // }
  82. // }
  83. // s.statisticsLock.RUnlock()
  84. // return p
  85. // }
  86. func (s *Service) loadproc() {
  87. for {
  88. time.Sleep(time.Minute)
  89. s.loadSlots()
  90. }
  91. }
  92. func (s *Service) loadSlots() (err error) {
  93. slotsMapping, err := s.dao.SlotsMapping(context.Background())
  94. if err != nil {
  95. return
  96. }
  97. s.statisticsLock.Lock()
  98. for _, mapping := range slotsMapping {
  99. for _, slot := range mapping.Slots {
  100. s.statisticsStats[slot].Name = mapping.Name
  101. s.statisticsStats[slot].Slot = slot
  102. s.statisticsStats[slot].State = mapping.State
  103. }
  104. }
  105. s.statisticsLock.Unlock()
  106. log.Warn("statistics stat (%v)", s.statisticsStats)
  107. return
  108. }
  109. func (s *Service) persistStatistics() {
  110. ctx := context.Background()
  111. statisticsMap := make(map[string]*model.StatisticsStat)
  112. s.statisticsLock.RLock()
  113. for _, stat := range s.statisticsStats {
  114. s, ok := statisticsMap[stat.Name]
  115. if ok {
  116. statisticsMap[stat.Name] = s.Merge(&stat)
  117. } else {
  118. statisticsMap[stat.Name] = &stat
  119. }
  120. }
  121. s.statisticsLock.RUnlock()
  122. now := time.Now()
  123. year, month, day := now.Date()
  124. date := year*10000 + int(month)*100 + day
  125. hour := now.Hour()
  126. for name, stat := range statisticsMap {
  127. err := s.dao.UpsertStatistics(ctx, name, date, hour, stat)
  128. var (
  129. retryTimes = 0
  130. maxRetryTimes = 5
  131. )
  132. for err != nil && retryTimes < maxRetryTimes {
  133. time.Sleep(s.bc.Backoff(retryTimes))
  134. err = s.dao.UpsertStatistics(ctx, name, date, hour, stat)
  135. retryTimes++
  136. }
  137. if retryTimes >= maxRetryTimes {
  138. log.Error("upsert statistics error retry reached max retry times.")
  139. }
  140. }
  141. for i := range s.statisticsStats {
  142. reset(&s.statisticsStats[i])
  143. }
  144. }
  145. func reset(stat *model.StatisticsStat) {
  146. stat.HotView = 0
  147. stat.View = 0
  148. stat.HotClick = 0
  149. stat.TotalView = 0
  150. }
  151. // Ping Service
  152. func (s *Service) Ping(ctx context.Context) (err error) {
  153. return s.dao.Ping(ctx)
  154. }
  155. // Close Service
  156. func (s *Service) Close() {
  157. s.persistStatistics()
  158. s.dao.Close()
  159. }