service.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/infra/databus/conf"
  6. "go-common/app/infra/databus/dao"
  7. "go-common/app/infra/databus/model"
  8. "go-common/library/log"
  9. "go-common/library/stat/prom"
  10. )
  11. const (
  12. _authUpdateInterval = 1 * time.Minute
  13. )
  14. // Service service instance
  15. type Service struct {
  16. dao *dao.Dao
  17. // auth
  18. auths map[string]*model.Auth
  19. // the auth of cluster changed
  20. clusterChan chan model.Auth
  21. // stats prom
  22. StatProm *prom.Prom
  23. CountProm *prom.Prom
  24. TimeProm *prom.Prom
  25. }
  26. // New new and return service
  27. func New(c *conf.Config) (s *Service) {
  28. s = &Service{
  29. dao: dao.New(c),
  30. // cluster
  31. clusterChan: make(chan model.Auth, 5),
  32. // stats prom
  33. StatProm: prom.New().WithState("go_databus_state", []string{"role", "group", "topic", "partition"}),
  34. // count prom: count consumer and producer partition speed
  35. CountProm: prom.New().WithState("go_databus_counter", []string{"operation", "group", "topic"}),
  36. TimeProm: prom.New().WithTimer("go_databus_timer", []string{"group"}),
  37. }
  38. s.fillAuth()
  39. go s.proc()
  40. return
  41. }
  42. // Ping check mysql connection
  43. func (s *Service) Ping(c context.Context) error {
  44. return s.dao.Ping(c)
  45. }
  46. // Close close mysql connection
  47. func (s *Service) Close() {
  48. if s.dao != nil {
  49. s.dao.Close()
  50. }
  51. }
  52. func (s *Service) proc() {
  53. for {
  54. s.fillAuth()
  55. time.Sleep(_authUpdateInterval)
  56. }
  57. }
  58. func (s *Service) fillAuth() (err error) {
  59. auths, err := s.dao.Auth(context.Background())
  60. if err != nil {
  61. log.Error("service.fillAuth error(%v)", err)
  62. return
  63. }
  64. var changed []*model.Auth
  65. // check cluster change event
  66. for group, nw := range auths {
  67. old, ok := s.auths[group]
  68. if !ok {
  69. continue
  70. }
  71. if old.Cluster != nw.Cluster {
  72. changed = append(changed, old)
  73. log.Info("cluster changed group(%s) topic(%s) oldCluster(%s) newCluster(%s)", old.Group, old.Topic, old.Cluster, nw.Cluster)
  74. }
  75. }
  76. s.auths = auths
  77. for _, ch := range changed {
  78. s.clusterChan <- *ch
  79. }
  80. return
  81. }
  82. // AuthApp check auth from cache
  83. func (s *Service) AuthApp(group string) (a *model.Auth, ok bool) {
  84. a, ok = s.auths[group]
  85. return
  86. }
  87. // ClusterEvent return cluster change event
  88. func (s *Service) ClusterEvent() (group <-chan model.Auth) {
  89. return s.clusterChan
  90. }