12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- package service
- import (
- "context"
- "encoding/json"
- "errors"
- "regexp"
- "go-common/app/job/main/dm/conf"
- "go-common/app/job/main/dm/dao"
- "go-common/app/job/main/dm/model"
- "go-common/library/log"
- "go-common/library/queue/databus"
- "go-common/library/sync/pipeline/fanout"
- )
- var (
- errSubNotExist = errors.New("subject not exist")
- )
- // Service rpc service.
- type Service struct {
- c *conf.Config
- dao *dao.Dao
- // databus sub
- dmMetaCsmr *databus.Databus
- // cache
- cache *fanout.Fanout
- }
- // New new rpc service.
- func New(c *conf.Config) *Service {
- s := &Service{
- c: c,
- dao: dao.New(c),
- dmMetaCsmr: databus.New(c.Databus.DMMetaCsmr),
- cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
- }
- // 消费DMMeta-T消息
- go s.dmMetaCsmproc()
- return s
- }
- // Ping check if service is ok.
- func (s *Service) Ping(c context.Context) error {
- return s.dao.Ping(c)
- }
- func (s *Service) dmMetaCsmproc() {
- var (
- err error
- c = context.TODO()
- regexIndex = regexp.MustCompile("dm_index_[0-9]+")
- )
- for {
- msg, ok := <-s.dmMetaCsmr.Messages()
- if !ok {
- log.Error("dmmeta binlog consumer exit")
- return
- }
- m := &model.BinlogMsg{}
- if err = json.Unmarshal(msg.Value, &m); err != nil {
- log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
- continue
- }
- if regexIndex.MatchString(m.Table) {
- if err = s.trackDMMeta(c, m); err != nil {
- log.Error("s.trackDMMeta(%s) error(%v)", m, err)
- continue
- }
- }
- if err = msg.Commit(); err != nil {
- log.Error("commit offset(%v) error(%v)", msg, err)
- }
- }
- }
|