service.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/job/main/reply-feed/conf"
  7. "go-common/app/job/main/reply-feed/dao"
  8. "go-common/app/job/main/reply-feed/model"
  9. "go-common/library/log"
  10. "go-common/library/net/netutil"
  11. "go-common/library/queue/databus"
  12. "go-common/library/sync/pipeline/fanout"
  13. "github.com/ivpusic/grpool"
  14. "github.com/robfig/cron"
  15. )
  16. // Service struct
  17. type Service struct {
  18. c *conf.Config
  19. dao *dao.Dao
  20. // 定时任务
  21. cron *cron.Cron
  22. // backoff
  23. bc netutil.BackoffConfig
  24. statsConsumer *databus.Databus
  25. // eventConsumer *databus.Databus
  26. taskQ *fanout.Fanout
  27. uvQ *fanout.Fanout
  28. statQ *fanout.Fanout
  29. replyListQ *fanout.Fanout
  30. waiter sync.WaitGroup
  31. // 专门计算热评分数的goroutine pool
  32. calculator *grpool.Pool
  33. statisticsStats [model.SlotsNum]model.StatisticsStat
  34. algorithmsLock sync.RWMutex
  35. statisticsLock sync.RWMutex
  36. algorithms []model.Algorithm
  37. }
  38. // New init
  39. func New(c *conf.Config) (s *Service) {
  40. s = &Service{
  41. c: c,
  42. dao: dao.New(c),
  43. cron: cron.New(),
  44. bc: netutil.BackoffConfig{
  45. MaxDelay: 1 * time.Second,
  46. BaseDelay: 100 * time.Millisecond,
  47. Factor: 1.6,
  48. Jitter: 0.2,
  49. },
  50. statsConsumer: databus.New(c.Databus.Stats),
  51. // eventConsumer: databus.New(c.Databus.Event),
  52. // 处理异步写任务的goroutine
  53. taskQ: fanout.New("task"),
  54. uvQ: fanout.New("uv-task", fanout.Worker(4), fanout.Buffer(2048)),
  55. statQ: fanout.New("memcache", fanout.Worker(4), fanout.Buffer(2048)),
  56. replyListQ: fanout.New("redis", fanout.Worker(4), fanout.Buffer(2048)),
  57. calculator: grpool.NewPool(4, 2048),
  58. }
  59. var err error
  60. if err = s.loadAlgorithm(); err != nil {
  61. panic(err)
  62. }
  63. if err = s.loadSlots(); err != nil {
  64. panic(err)
  65. }
  66. go s.loadproc()
  67. // 消费databus
  68. s.waiter.Add(1)
  69. go s.statsproc()
  70. // s.waiter.Add(1)
  71. // go s.eventproc()
  72. // 每整小时执行一次将统计数据写入DB
  73. s.cron.AddFunc("@hourly", func() {
  74. s.persistStatistics()
  75. })
  76. s.cron.Start()
  77. return s
  78. }
  79. func (s *Service) loadproc() {
  80. for {
  81. time.Sleep(time.Minute)
  82. s.loadAlgorithm()
  83. s.loadSlots()
  84. }
  85. }
  86. // Ping Service
  87. func (s *Service) Ping(c context.Context) (err error) {
  88. return s.dao.Ping(c)
  89. }
  90. // Close Service
  91. func (s *Service) Close() {
  92. s.statsConsumer.Close()
  93. // s.eventConsumer.Close()
  94. log.Warn("consumer closed")
  95. s.waiter.Wait()
  96. s.persistStatistics()
  97. s.dao.Close()
  98. }