instance.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package service
  2. import (
  3. "fmt"
  4. "time"
  5. "go-common/app/infra/canal/conf"
  6. "go-common/library/log"
  7. "github.com/pkg/errors"
  8. "github.com/siddontang/go-mysql/canal"
  9. "github.com/siddontang/go-mysql/mysql"
  10. "github.com/siddontang/go-mysql/replication"
  11. )
  12. // Instance canal instance
  13. type Instance struct {
  14. *canal.Canal
  15. config *conf.InsConf
  16. // one instance can have lots of target different by schema and table
  17. targets []*Target
  18. master *dbMasterInfo
  19. // binlog latest timestamp, be used check delay
  20. latestTimestamp int64
  21. err error
  22. closed bool
  23. }
  24. // NewInstance new canal instance
  25. func NewInstance(c *conf.InsConf) (ins *Instance, err error) {
  26. // new instance
  27. ins = &Instance{config: c}
  28. // check and modify config
  29. if c.MasterInfo == nil {
  30. err = errors.New("no masterinfo config")
  31. ins.err = err
  32. return
  33. }
  34. if c.ReadTimeout == 0 {
  35. c.ReadTimeout = 90
  36. }
  37. c.ReadTimeout = c.ReadTimeout * time.Second
  38. c.HeartbeatPeriod = c.HeartbeatPeriod * time.Second
  39. if ins.master, err = newDBMasterInfo(ins.config.Addr, ins.config.MasterInfo); err != nil {
  40. log.Error("init master info error(%v)", err)
  41. ins.err = errors.Wrap(err, "init master info")
  42. return
  43. }
  44. if ins.targets, err = newTargets(c); err != nil {
  45. log.Error("db.init() error(%v)", err)
  46. ins.err = errors.Wrap(err, "db init")
  47. return
  48. }
  49. ins.latestTimestamp = time.Now().Unix()
  50. // new canal
  51. if ins.Canal, err = canal.NewCanal(c.Config); err != nil {
  52. log.Error("canal.NewCanal(%v) error(%v)", c.Config, err)
  53. ins.err = errors.Wrapf(err, "canal NewCanal(%v)", c.Config)
  54. return
  55. }
  56. // implement self as canal's event handler
  57. ins.Canal.SetEventHandler(ins)
  58. return
  59. }
  60. func newTargets(c *conf.InsConf) (targets []*Target, err error) {
  61. targets = make([]*Target, 0, len(c.Databases))
  62. for _, db := range c.Databases {
  63. if err = db.CheckTable(c.Addr, c.User, c.Password); err != nil {
  64. log.Error("db.CheckTable() error(%v)", err)
  65. return
  66. }
  67. targets = append(targets, NewTarget(db))
  68. }
  69. return
  70. }
  71. // Start start binlog receive
  72. func (ins *Instance) Start() {
  73. pos := ins.master.Pos()
  74. if pos.Name == "" || pos.Pos == 0 {
  75. var err error
  76. if pos, err = ins.Canal.GetMasterPos(); err != nil {
  77. log.Error("c.MasterPos error(%v)", err)
  78. ins.err = errors.Wrap(err, "canal get master pos when start")
  79. return
  80. }
  81. }
  82. ins.err = ins.Canal.RunFrom(pos)
  83. }
  84. // Close close instance
  85. func (ins *Instance) Close() {
  86. if ins.err != nil {
  87. return
  88. }
  89. ins.Canal.Close()
  90. for _, t := range ins.targets {
  91. t.close()
  92. }
  93. ins.closed = true
  94. ins.err = errors.New("canal closed")
  95. }
  96. // Check filter row event
  97. func (ins *Instance) Check(ev *canal.RowsEvent) (ts []*Target) {
  98. for _, t := range ins.targets {
  99. if t.compare(ev.Table.Schema, ev.Table.Name, ev.Action) {
  100. ts = append(ts, t)
  101. }
  102. }
  103. return
  104. }
  105. func (ins *Instance) String() string {
  106. return ins.config.Addr
  107. }
  108. // OnRotate OnRotate
  109. func (ins *Instance) OnRotate(re *replication.RotateEvent) error {
  110. log.Info("OnRotate binlog addr(%s) rotate binname(%s) pos(%d)", ins.config.Addr, re.NextLogName, re.Position)
  111. return nil
  112. }
  113. // OnDDL OnDDL
  114. func (ins *Instance) OnDDL(pos mysql.Position, qe *replication.QueryEvent) error {
  115. log.Info("OnDDL binlog addr(%s) ddl binname(%s) pos(%d)", ins.config.Addr, pos.Name, pos.Pos)
  116. return nil
  117. }
  118. // OnXID OnXID
  119. func (ins *Instance) OnXID(mysql.Position) error {
  120. return nil
  121. }
  122. //OnGTID OnGTID
  123. func (ins *Instance) OnGTID(mysql.GTIDSet) error {
  124. return nil
  125. }
  126. // OnPosSynced OnPosSynced
  127. func (ins *Instance) OnPosSynced(pos mysql.Position, force bool) error {
  128. return ins.master.Save(pos, force)
  129. }
  130. // OnRow send the envent to table
  131. func (ins *Instance) OnRow(ev *canal.RowsEvent) error {
  132. for _, t := range ins.Check(ev) {
  133. t.send(ev)
  134. }
  135. if stats != nil {
  136. stats.Incr("syncer_counter", ins.String(), ev.Table.Schema, tblReplacer.ReplaceAllString(ev.Table.Name, ""), ev.Action)
  137. stats.State("delay_syncer", ins.delay(), ins.String(), ev.Table.Schema, "", "")
  138. }
  139. ins.latestTimestamp = time.Now().Unix()
  140. return nil
  141. }
  142. // Error returns instance error.
  143. func (ins *Instance) Error() string {
  144. if ins.err == nil {
  145. return ""
  146. }
  147. return fmt.Sprintf("+%v", ins.err)
  148. }
  149. func (ins *Instance) delay() int64 {
  150. return time.Now().Unix() - ins.latestTimestamp
  151. }