service.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "regexp"
  7. "go-common/app/job/main/dm/conf"
  8. "go-common/app/job/main/dm/dao"
  9. "go-common/app/job/main/dm/model"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. "go-common/library/sync/pipeline/fanout"
  13. )
  14. var (
  15. errSubNotExist = errors.New("subject not exist")
  16. )
  17. // Service rpc service.
  18. type Service struct {
  19. c *conf.Config
  20. dao *dao.Dao
  21. // databus sub
  22. dmMetaCsmr *databus.Databus
  23. // cache
  24. cache *fanout.Fanout
  25. }
  26. // New new rpc service.
  27. func New(c *conf.Config) *Service {
  28. s := &Service{
  29. c: c,
  30. dao: dao.New(c),
  31. dmMetaCsmr: databus.New(c.Databus.DMMetaCsmr),
  32. cache: fanout.New("cache", fanout.Worker(1), fanout.Buffer(1024)),
  33. }
  34. // 消费DMMeta-T消息
  35. go s.dmMetaCsmproc()
  36. return s
  37. }
  38. // Ping check if service is ok.
  39. func (s *Service) Ping(c context.Context) error {
  40. return s.dao.Ping(c)
  41. }
  42. func (s *Service) dmMetaCsmproc() {
  43. var (
  44. err error
  45. c = context.TODO()
  46. regexIndex = regexp.MustCompile("dm_index_[0-9]+")
  47. )
  48. for {
  49. msg, ok := <-s.dmMetaCsmr.Messages()
  50. if !ok {
  51. log.Error("dmmeta binlog consumer exit")
  52. return
  53. }
  54. m := &model.BinlogMsg{}
  55. if err = json.Unmarshal(msg.Value, &m); err != nil {
  56. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  57. continue
  58. }
  59. if regexIndex.MatchString(m.Table) {
  60. if err = s.trackDMMeta(c, m); err != nil {
  61. log.Error("s.trackDMMeta(%s) error(%v)", m, err)
  62. continue
  63. }
  64. }
  65. if err = msg.Commit(); err != nil {
  66. log.Error("commit offset(%v) error(%v)", msg, err)
  67. }
  68. }
  69. }