service.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package service
  2. import (
  3. "context"
  4. "strings"
  5. "sync"
  6. "go-common/app/job/main/aegis/conf"
  7. "go-common/app/job/main/aegis/dao"
  8. "go-common/app/job/main/aegis/dao/email"
  9. "go-common/app/job/main/aegis/dao/monitor"
  10. "go-common/app/job/main/aegis/model"
  11. accApi "go-common/app/service/main/account/api"
  12. upApi "go-common/app/service/main/up/api/v1"
  13. "go-common/library/queue/databus"
  14. "go-common/library/queue/databus/databusutil"
  15. )
  16. // Service struct
  17. type Service struct {
  18. c *conf.Config
  19. acc accApi.AccountClient
  20. up upApi.UpClient
  21. dao *dao.Dao
  22. moniDao *monitor.Dao
  23. email *email.Dao
  24. // databus
  25. binLogDataBus *databus.Databus
  26. archiveDataBus *databus.Databus
  27. aegisRscDataBus *databus.Databus
  28. aegisTaskDataBus *databus.Databus
  29. //channel
  30. chanReport chan *model.RIR
  31. // cache
  32. Cache
  33. //权重计算器
  34. wmHash map[string]*WeightManager
  35. rschandle map[string]RscHandler
  36. taskhandle map[string]TaskHandler
  37. wg sync.WaitGroup
  38. //databus group
  39. resourceGroup *databusutil.Group
  40. taskGroup *databusutil.Group
  41. }
  42. // New init
  43. func New(c *conf.Config) (s *Service) {
  44. s = &Service{
  45. c: c,
  46. dao: dao.New(c),
  47. moniDao: monitor.New(c),
  48. email: email.New(c),
  49. binLogDataBus: databus.New(c.DataBus.BinLogSub),
  50. chanReport: make(chan *model.RIR, 1024),
  51. archiveDataBus: databus.New(c.DataBus.ArchiveSub),
  52. aegisRscDataBus: databus.New(c.DataBus.ResourceSub),
  53. aegisTaskDataBus: databus.New(c.DataBus.TaskSub),
  54. }
  55. if !s.c.Debug {
  56. var err error
  57. if s.acc, err = accApi.NewClient(c.GRPC.Acc); err != nil {
  58. panic(err)
  59. }
  60. if s.up, err = upApi.NewClient(c.GRPC.Up); err != nil {
  61. panic(err)
  62. }
  63. }
  64. initHandler(s)
  65. s.initCache()
  66. s.startWeightManager()
  67. s.resourceGroup = databusutil.NewGroup(c.Databusutil.Resource, s.aegisRscDataBus.Messages())
  68. s.resourceGroup.New = s.newrsc
  69. s.resourceGroup.Split = s.splitrsc
  70. s.resourceGroup.Do = s.dorsc
  71. s.resourceGroup.Start()
  72. s.taskGroup = databusutil.NewGroup(c.Databusutil.Task, s.aegisTaskDataBus.Messages())
  73. s.taskGroup.New = s.newtask
  74. s.taskGroup.Split = s.splittask
  75. s.taskGroup.Do = s.dotask
  76. s.taskGroup.Start()
  77. go s.cacheProc()
  78. go s.taskProc()
  79. go s.monitorNotify()
  80. s.wg.Add(1)
  81. go s.taskconsumeproc()
  82. s.wg.Add(1)
  83. go s.archiveConsumeProc()
  84. s.wg.Add(1)
  85. go s.monitorEmailProc()
  86. return s
  87. }
  88. // Cache .
  89. type Cache struct {
  90. upCache map[int64]map[int64]struct{}
  91. rangeWeightCfg map[int64]map[string]*model.RangeWeightConfig
  92. equalWeightCfg map[string][]*model.EqualWeightConfig
  93. assignConfig map[string][]*model.AssignConfig
  94. consumerCache map[string]map[int64]struct{}
  95. ccMux sync.RWMutex
  96. oldactiveBizFlow map[string]struct{}
  97. newactiveBizFlow map[string]struct{}
  98. }
  99. // DebugCache .
  100. func (s *Service) DebugCache(keys string) map[string]interface{} {
  101. dc := map[string]interface{}{
  102. "upCache": s.upCache,
  103. "rangeWeightCfg": s.rangeWeightCfg,
  104. "equalWeightCfg": s.equalWeightCfg,
  105. "assignConfig": s.assignConfig,
  106. "consumerCache": s.consumerCache,
  107. "oldactiveBizFlow": s.oldactiveBizFlow,
  108. "newactiveBizFlow": s.newactiveBizFlow,
  109. }
  110. res := make(map[string]interface{})
  111. if len(keys) > 0 {
  112. for _, key := range strings.Split(keys, ",") {
  113. res[key] = dc[key]
  114. }
  115. }
  116. return res
  117. }
  118. // Ping Service
  119. func (s *Service) Ping(c context.Context) (err error) {
  120. return s.dao.Ping(c)
  121. }
  122. // Close Service
  123. func (s *Service) Close() {
  124. s.binLogDataBus.Close()
  125. s.archiveDataBus.Close()
  126. s.aegisRscDataBus.Close()
  127. s.aegisTaskDataBus.Close()
  128. s.resourceGroup.Close()
  129. s.taskGroup.Close()
  130. s.wg.Wait()
  131. s.dao.Close()
  132. s.moniDao.Close()
  133. }