service.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go-common/app/admin/main/up/conf"
  7. "go-common/app/admin/main/up/dao/global"
  8. "go-common/app/admin/main/up/dao/manager"
  9. "go-common/app/admin/main/up/service/upcrmservice"
  10. "go-common/app/admin/main/up/util"
  11. "go-common/library/log"
  12. bm "go-common/library/net/http/blademaster"
  13. "go-common/library/stat/prom"
  14. )
  15. // Service is service.
  16. type Service struct {
  17. c *conf.Config
  18. mng *manager.Dao
  19. // databus sub
  20. //upSub *databus.Databus
  21. // wait group
  22. wg sync.WaitGroup
  23. // chan for mids
  24. //midsChan chan map[int64]int
  25. //for cache func
  26. missch chan func()
  27. //prom
  28. pCacheHit *prom.Prom
  29. pCacheMiss *prom.Prom
  30. tokenMX sync.Mutex
  31. changeMX sync.Mutex
  32. tokenChan chan int
  33. httpClient *bm.Client
  34. Crmservice *upcrmservice.Service
  35. }
  36. // New is go-common/business/service/videoup service implementation.
  37. func New(c *conf.Config) (s *Service) {
  38. s = &Service{
  39. c: c,
  40. mng: manager.New(c),
  41. //upSub: databus.New(c.UpSub.Config),
  42. //midsChan: make(chan map[int64]int, c.ChanSize),
  43. //upChan: make(chan *model.Msg, c.UpSub.UpChanSize),
  44. //consumeToken: time.Now().UnixNano(),
  45. //consumeRate: int64(1e9 / c.UpSub.ConsumeLimit),
  46. tokenMX: sync.Mutex{},
  47. missch: make(chan func(), 1024),
  48. pCacheHit: prom.CacheHit,
  49. pCacheMiss: prom.CacheMiss,
  50. changeMX: sync.Mutex{},
  51. tokenChan: make(chan int, 10),
  52. httpClient: bm.NewClient(c.HTTPClient.Normal),
  53. Crmservice: upcrmservice.New(c),
  54. }
  55. s.mng.HTTPClient = s.httpClient
  56. s.Crmservice.SetHTTPClient(s.httpClient)
  57. global.Init(c)
  58. // load for first time
  59. s.refreshCache()
  60. go s.cacheproc()
  61. go s.timerproc()
  62. return s
  63. }
  64. func (s *Service) refreshCache() {
  65. log.Info("refresh cache")
  66. }
  67. func (s *Service) cacheproc() {
  68. for {
  69. time.Sleep(5 * time.Minute)
  70. s.refreshCache()
  71. }
  72. }
  73. // Ping service
  74. func (s *Service) Ping(c context.Context) (err error) {
  75. return
  76. }
  77. // Close sub.
  78. func (s *Service) Close() {
  79. //close(s.midsChan)
  80. s.wg.Wait()
  81. s.mng.Close()
  82. }
  83. // AddCache add to chan for cache
  84. func (s *Service) timerproc() {
  85. var t = time.NewTicker(time.Second)
  86. for now := range t.C {
  87. util.GlobalTimer.Advance(now)
  88. }
  89. }