123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package service
- import (
- "context"
- "time"
- "go-common/app/infra/databus/conf"
- "go-common/app/infra/databus/dao"
- "go-common/app/infra/databus/model"
- "go-common/library/log"
- "go-common/library/stat/prom"
- )
- const (
- _authUpdateInterval = 1 * time.Minute
- )
- // Service service instance
- type Service struct {
- dao *dao.Dao
- // auth
- auths map[string]*model.Auth
- // the auth of cluster changed
- clusterChan chan model.Auth
- // stats prom
- StatProm *prom.Prom
- CountProm *prom.Prom
- TimeProm *prom.Prom
- }
- // New new and return service
- func New(c *conf.Config) (s *Service) {
- s = &Service{
- dao: dao.New(c),
- // cluster
- clusterChan: make(chan model.Auth, 5),
- // stats prom
- StatProm: prom.New().WithState("go_databus_state", []string{"role", "group", "topic", "partition"}),
- // count prom: count consumer and producer partition speed
- CountProm: prom.New().WithState("go_databus_counter", []string{"operation", "group", "topic"}),
- TimeProm: prom.New().WithTimer("go_databus_timer", []string{"group"}),
- }
- s.fillAuth()
- go s.proc()
- return
- }
- // Ping check mysql connection
- func (s *Service) Ping(c context.Context) error {
- return s.dao.Ping(c)
- }
- // Close close mysql connection
- func (s *Service) Close() {
- if s.dao != nil {
- s.dao.Close()
- }
- }
- func (s *Service) proc() {
- for {
- s.fillAuth()
- time.Sleep(_authUpdateInterval)
- }
- }
- func (s *Service) fillAuth() (err error) {
- auths, err := s.dao.Auth(context.Background())
- if err != nil {
- log.Error("service.fillAuth error(%v)", err)
- return
- }
- var changed []*model.Auth
- // check cluster change event
- for group, nw := range auths {
- old, ok := s.auths[group]
- if !ok {
- continue
- }
- if old.Cluster != nw.Cluster {
- changed = append(changed, old)
- log.Info("cluster changed group(%s) topic(%s) oldCluster(%s) newCluster(%s)", old.Group, old.Topic, old.Cluster, nw.Cluster)
- }
- }
- s.auths = auths
- for _, ch := range changed {
- s.clusterChan <- *ch
- }
- return
- }
- // AuthApp check auth from cache
- func (s *Service) AuthApp(group string) (a *model.Auth, ok bool) {
- a, ok = s.auths[group]
- return
- }
- // ClusterEvent return cluster change event
- func (s *Service) ClusterEvent() (group <-chan model.Auth) {
- return s.clusterChan
- }
|