123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- package canal
- import (
- "strconv"
- "time"
- "github.com/juju/errors"
- "github.com/siddontang/go-mysql/dump"
- "github.com/siddontang/go-mysql/mysql"
- "github.com/siddontang/go-mysql/schema"
- log "github.com/sirupsen/logrus"
- )
- type dumpParseHandler struct {
- c *Canal
- name string
- pos uint64
- }
- func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
- h.name = name
- h.pos = pos
- return nil
- }
- func (h *dumpParseHandler) Data(db string, table string, values []string) error {
- if err := h.c.ctx.Err(); err != nil {
- return err
- }
- tableInfo, err := h.c.GetTable(db, table)
- if err != nil {
- e := errors.Cause(err)
- if e == ErrExcludedTable ||
- e == schema.ErrTableNotExist ||
- e == schema.ErrMissingTableMeta {
- return nil
- }
- log.Errorf("get %s.%s information err: %v", db, table, err)
- return errors.Trace(err)
- }
- vs := make([]interface{}, len(values))
- for i, v := range values {
- if v == "NULL" {
- vs[i] = nil
- } else if v[0] != '\'' {
- if tableInfo.Columns[i].Type == schema.TYPE_NUMBER {
- n, err := strconv.ParseInt(v, 10, 64)
- if err != nil {
- log.Errorf("parse row %v at %d error %v, skip", values, i, err)
- return dump.ErrSkip
- }
- vs[i] = n
- } else if tableInfo.Columns[i].Type == schema.TYPE_FLOAT {
- f, err := strconv.ParseFloat(v, 64)
- if err != nil {
- log.Errorf("parse row %v at %d error %v, skip", values, i, err)
- return dump.ErrSkip
- }
- vs[i] = f
- } else {
- log.Errorf("parse row %v error, invalid type at %d, skip", values, i)
- return dump.ErrSkip
- }
- } else {
- vs[i] = v[1 : len(v)-1]
- }
- }
- events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs})
- return h.c.eventHandler.OnRow(events)
- }
- func (c *Canal) AddDumpDatabases(dbs ...string) {
- if c.dumper == nil {
- return
- }
- c.dumper.AddDatabases(dbs...)
- }
- func (c *Canal) AddDumpTables(db string, tables ...string) {
- if c.dumper == nil {
- return
- }
- c.dumper.AddTables(db, tables...)
- }
- func (c *Canal) AddDumpIgnoreTables(db string, tables ...string) {
- if c.dumper == nil {
- return
- }
- c.dumper.AddIgnoreTables(db, tables...)
- }
- func (c *Canal) tryDump() error {
- pos := c.master.Position()
- gtid := c.master.GTID()
- if (len(pos.Name) > 0 && pos.Pos > 0) || gtid != nil {
- // we will sync with binlog name and position
- log.Infof("skip dump, use last binlog replication pos %s or GTID %s", pos, gtid)
- return nil
- }
- if c.dumper == nil {
- log.Info("skip dump, no mysqldump")
- return nil
- }
- h := &dumpParseHandler{c: c}
- if c.cfg.Dump.SkipMasterData {
- pos, err := c.GetMasterPos()
- if err != nil {
- return errors.Trace(err)
- }
- log.Infof("skip master data, get current binlog position %v", pos)
- h.name = pos.Name
- h.pos = uint64(pos.Pos)
- }
- start := time.Now()
- log.Info("try dump MySQL and parse")
- if err := c.dumper.DumpAndParse(h); err != nil {
- return errors.Trace(err)
- }
- log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at (%s, %d)",
- time.Now().Sub(start).Seconds(), h.name, h.pos)
- pos = mysql.Position{h.name, uint32(h.pos)}
- c.master.Update(pos)
- c.eventHandler.OnPosSynced(pos, true)
- return nil
- }
|