service.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "sync"
  7. "time"
  8. "go-common/app/interface/openplatform/monitor-end/conf"
  9. "go-common/app/interface/openplatform/monitor-end/dao"
  10. "go-common/app/interface/openplatform/monitor-end/model"
  11. "go-common/app/interface/openplatform/monitor-end/model/monitor"
  12. "go-common/app/interface/openplatform/monitor-end/model/prom"
  13. "go-common/library/log"
  14. )
  15. var (
  16. _notConsumedErr = errors.New("未启动消费")
  17. _pausedNowErr = errors.New("暂停消费中")
  18. _closedNowErr = errors.New("已停止消费")
  19. _consumedNowErr = errors.New("已启动消费")
  20. )
  21. // Service struct
  22. type Service struct {
  23. c *conf.Config
  24. dao *dao.Dao
  25. mh *monitor.MonitorHandler
  26. consumer *Consumer
  27. consumed bool
  28. // settings
  29. groups map[int64]*model.Group
  30. targets map[int64]*model.Target
  31. targetKeys map[string]*model.Target
  32. products map[int64]*model.Product
  33. productKeys map[string]*model.Product
  34. newTargets map[string]*model.Target
  35. naProducts map[string]bool
  36. mapMutex sync.Mutex
  37. // infoc
  38. infoCh chan interface{}
  39. }
  40. // New init
  41. func New(c *conf.Config) (s *Service) {
  42. s = &Service{
  43. c: c,
  44. dao: dao.New(c),
  45. mh: monitor.NewMonitor(c.Monitor),
  46. groups: make(map[int64]*model.Group),
  47. targets: make(map[int64]*model.Target),
  48. targetKeys: make(map[string]*model.Target),
  49. newTargets: make(map[string]*model.Target),
  50. products: make(map[int64]*model.Product),
  51. productKeys: make(map[string]*model.Product),
  52. naProducts: make(map[string]bool),
  53. infoCh: make(chan interface{}, 1024),
  54. }
  55. for c.NeedConsume && !s.consumed {
  56. var err error
  57. if s.consumer, err = NewConsumer(c); err != nil {
  58. log.Error("s.New.NewConsumer error(%+v)", err)
  59. time.Sleep(10 * time.Second)
  60. continue
  61. }
  62. go s.consume()
  63. go s.handleMsg()
  64. s.consumed = true
  65. break
  66. }
  67. prom.Init(c)
  68. go s.loadalertsettingproc()
  69. go s.infocproc()
  70. go s.loadNAProducts()
  71. // go s.alertproc()
  72. return
  73. }
  74. // Ping .
  75. func (s *Service) Ping(c context.Context) (err error) {
  76. return s.dao.Ping(c)
  77. }
  78. // StartConsume .
  79. func (s *Service) StartConsume() (err error) {
  80. if s.consumed {
  81. return _consumedNowErr
  82. }
  83. if s.consumer, err = NewConsumer(s.c); err != nil {
  84. log.Error("s.New.NewConsumer error(%+v)", err)
  85. return
  86. }
  87. go s.consume()
  88. go s.handleMsg()
  89. s.consumed = true
  90. return
  91. }
  92. // Close .
  93. func (s *Service) Close() {
  94. s.dao.Close()
  95. s.StopConsume()
  96. }
  97. // StopConsume .
  98. func (s *Service) StopConsume() error {
  99. if s.consumed {
  100. s.consumed = false
  101. }
  102. if s.consumer != nil {
  103. s.consumer.closed = true
  104. }
  105. return nil
  106. }
  107. // PauseConsume .
  108. func (s *Service) PauseConsume(t int64) error {
  109. if !s.consumed {
  110. return _notConsumedErr
  111. }
  112. if s.consumer.closed {
  113. return _closedNowErr
  114. }
  115. if s.consumer.paused {
  116. return _pausedNowErr
  117. }
  118. s.consumer.paused = true
  119. s.consumer.duration = time.Second * time.Duration(t)
  120. return nil
  121. }
  122. func (s *Service) loadalertsettingproc() {
  123. for {
  124. s.loadalertsettings()
  125. time.Sleep(time.Minute)
  126. }
  127. }
  128. func (s *Service) loadNAProducts() {
  129. for {
  130. if s.c.Products != "" {
  131. var m = make(map[string]bool)
  132. ps := strings.Split(s.c.Products, ",")
  133. for _, p := range ps {
  134. m[p] = true
  135. }
  136. s.mapMutex.Lock()
  137. s.naProducts = m
  138. s.mapMutex.Unlock()
  139. }
  140. time.Sleep(time.Minute)
  141. }
  142. }