123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- package service
- import (
- "fmt"
- "time"
- "go-common/app/infra/canal/conf"
- "go-common/library/log"
- "github.com/pkg/errors"
- "github.com/siddontang/go-mysql/canal"
- "github.com/siddontang/go-mysql/mysql"
- "github.com/siddontang/go-mysql/replication"
- )
- // Instance canal instance
- type Instance struct {
- *canal.Canal
- config *conf.InsConf
- // one instance can have lots of target different by schema and table
- targets []*Target
- master *dbMasterInfo
- // binlog latest timestamp, be used check delay
- latestTimestamp int64
- err error
- closed bool
- }
- // NewInstance new canal instance
- func NewInstance(c *conf.InsConf) (ins *Instance, err error) {
- // new instance
- ins = &Instance{config: c}
- // check and modify config
- if c.MasterInfo == nil {
- err = errors.New("no masterinfo config")
- ins.err = err
- return
- }
- if c.ReadTimeout == 0 {
- c.ReadTimeout = 90
- }
- c.ReadTimeout = c.ReadTimeout * time.Second
- c.HeartbeatPeriod = c.HeartbeatPeriod * time.Second
- if ins.master, err = newDBMasterInfo(ins.config.Addr, ins.config.MasterInfo); err != nil {
- log.Error("init master info error(%v)", err)
- ins.err = errors.Wrap(err, "init master info")
- return
- }
- if ins.targets, err = newTargets(c); err != nil {
- log.Error("db.init() error(%v)", err)
- ins.err = errors.Wrap(err, "db init")
- return
- }
- ins.latestTimestamp = time.Now().Unix()
- // new canal
- if ins.Canal, err = canal.NewCanal(c.Config); err != nil {
- log.Error("canal.NewCanal(%v) error(%v)", c.Config, err)
- ins.err = errors.Wrapf(err, "canal NewCanal(%v)", c.Config)
- return
- }
- // implement self as canal's event handler
- ins.Canal.SetEventHandler(ins)
- return
- }
- func newTargets(c *conf.InsConf) (targets []*Target, err error) {
- targets = make([]*Target, 0, len(c.Databases))
- for _, db := range c.Databases {
- if err = db.CheckTable(c.Addr, c.User, c.Password); err != nil {
- log.Error("db.CheckTable() error(%v)", err)
- return
- }
- targets = append(targets, NewTarget(db))
- }
- return
- }
- // Start start binlog receive
- func (ins *Instance) Start() {
- pos := ins.master.Pos()
- if pos.Name == "" || pos.Pos == 0 {
- var err error
- if pos, err = ins.Canal.GetMasterPos(); err != nil {
- log.Error("c.MasterPos error(%v)", err)
- ins.err = errors.Wrap(err, "canal get master pos when start")
- return
- }
- }
- ins.err = ins.Canal.RunFrom(pos)
- }
- // Close close instance
- func (ins *Instance) Close() {
- if ins.err != nil {
- return
- }
- ins.Canal.Close()
- for _, t := range ins.targets {
- t.close()
- }
- ins.closed = true
- ins.err = errors.New("canal closed")
- }
- // Check filter row event
- func (ins *Instance) Check(ev *canal.RowsEvent) (ts []*Target) {
- for _, t := range ins.targets {
- if t.compare(ev.Table.Schema, ev.Table.Name, ev.Action) {
- ts = append(ts, t)
- }
- }
- return
- }
- func (ins *Instance) String() string {
- return ins.config.Addr
- }
- // OnRotate OnRotate
- func (ins *Instance) OnRotate(re *replication.RotateEvent) error {
- log.Info("OnRotate binlog addr(%s) rotate binname(%s) pos(%d)", ins.config.Addr, re.NextLogName, re.Position)
- return nil
- }
- // OnDDL OnDDL
- func (ins *Instance) OnDDL(pos mysql.Position, qe *replication.QueryEvent) error {
- log.Info("OnDDL binlog addr(%s) ddl binname(%s) pos(%d)", ins.config.Addr, pos.Name, pos.Pos)
- return nil
- }
- // OnXID OnXID
- func (ins *Instance) OnXID(mysql.Position) error {
- return nil
- }
- //OnGTID OnGTID
- func (ins *Instance) OnGTID(mysql.GTIDSet) error {
- return nil
- }
- // OnPosSynced OnPosSynced
- func (ins *Instance) OnPosSynced(pos mysql.Position, force bool) error {
- return ins.master.Save(pos, force)
- }
- // OnRow send the envent to table
- func (ins *Instance) OnRow(ev *canal.RowsEvent) error {
- for _, t := range ins.Check(ev) {
- t.send(ev)
- }
- if stats != nil {
- stats.Incr("syncer_counter", ins.String(), ev.Table.Schema, tblReplacer.ReplaceAllString(ev.Table.Name, ""), ev.Action)
- stats.State("delay_syncer", ins.delay(), ins.String(), ev.Table.Schema, "", "")
- }
- ins.latestTimestamp = time.Now().Unix()
- return nil
- }
- // Error returns instance error.
- func (ins *Instance) Error() string {
- if ins.err == nil {
- return ""
- }
- return fmt.Sprintf("+%v", ins.err)
- }
- func (ins *Instance) delay() int64 {
- return time.Now().Unix() - ins.latestTimestamp
- }
|