dump.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package canal
  2. import (
  3. "strconv"
  4. "time"
  5. "github.com/juju/errors"
  6. "github.com/siddontang/go-mysql/dump"
  7. "github.com/siddontang/go-mysql/mysql"
  8. "github.com/siddontang/go-mysql/schema"
  9. log "github.com/sirupsen/logrus"
  10. )
  11. type dumpParseHandler struct {
  12. c *Canal
  13. name string
  14. pos uint64
  15. }
  16. func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
  17. h.name = name
  18. h.pos = pos
  19. return nil
  20. }
  21. func (h *dumpParseHandler) Data(db string, table string, values []string) error {
  22. if err := h.c.ctx.Err(); err != nil {
  23. return err
  24. }
  25. tableInfo, err := h.c.GetTable(db, table)
  26. if err != nil {
  27. e := errors.Cause(err)
  28. if e == ErrExcludedTable ||
  29. e == schema.ErrTableNotExist ||
  30. e == schema.ErrMissingTableMeta {
  31. return nil
  32. }
  33. log.Errorf("get %s.%s information err: %v", db, table, err)
  34. return errors.Trace(err)
  35. }
  36. vs := make([]interface{}, len(values))
  37. for i, v := range values {
  38. if v == "NULL" {
  39. vs[i] = nil
  40. } else if v[0] != '\'' {
  41. if tableInfo.Columns[i].Type == schema.TYPE_NUMBER {
  42. n, err := strconv.ParseInt(v, 10, 64)
  43. if err != nil {
  44. log.Errorf("parse row %v at %d error %v, skip", values, i, err)
  45. return dump.ErrSkip
  46. }
  47. vs[i] = n
  48. } else if tableInfo.Columns[i].Type == schema.TYPE_FLOAT {
  49. f, err := strconv.ParseFloat(v, 64)
  50. if err != nil {
  51. log.Errorf("parse row %v at %d error %v, skip", values, i, err)
  52. return dump.ErrSkip
  53. }
  54. vs[i] = f
  55. } else {
  56. log.Errorf("parse row %v error, invalid type at %d, skip", values, i)
  57. return dump.ErrSkip
  58. }
  59. } else {
  60. vs[i] = v[1 : len(v)-1]
  61. }
  62. }
  63. events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs})
  64. return h.c.eventHandler.OnRow(events)
  65. }
  66. func (c *Canal) AddDumpDatabases(dbs ...string) {
  67. if c.dumper == nil {
  68. return
  69. }
  70. c.dumper.AddDatabases(dbs...)
  71. }
  72. func (c *Canal) AddDumpTables(db string, tables ...string) {
  73. if c.dumper == nil {
  74. return
  75. }
  76. c.dumper.AddTables(db, tables...)
  77. }
  78. func (c *Canal) AddDumpIgnoreTables(db string, tables ...string) {
  79. if c.dumper == nil {
  80. return
  81. }
  82. c.dumper.AddIgnoreTables(db, tables...)
  83. }
  84. func (c *Canal) tryDump() error {
  85. pos := c.master.Position()
  86. gtid := c.master.GTID()
  87. if (len(pos.Name) > 0 && pos.Pos > 0) || gtid != nil {
  88. // we will sync with binlog name and position
  89. log.Infof("skip dump, use last binlog replication pos %s or GTID %s", pos, gtid)
  90. return nil
  91. }
  92. if c.dumper == nil {
  93. log.Info("skip dump, no mysqldump")
  94. return nil
  95. }
  96. h := &dumpParseHandler{c: c}
  97. if c.cfg.Dump.SkipMasterData {
  98. pos, err := c.GetMasterPos()
  99. if err != nil {
  100. return errors.Trace(err)
  101. }
  102. log.Infof("skip master data, get current binlog position %v", pos)
  103. h.name = pos.Name
  104. h.pos = uint64(pos.Pos)
  105. }
  106. start := time.Now()
  107. log.Info("try dump MySQL and parse")
  108. if err := c.dumper.DumpAndParse(h); err != nil {
  109. return errors.Trace(err)
  110. }
  111. log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at (%s, %d)",
  112. time.Now().Sub(start).Seconds(), h.name, h.pos)
  113. pos = mysql.Position{h.name, uint32(h.pos)}
  114. c.master.Update(pos)
  115. c.eventHandler.OnPosSynced(pos, true)
  116. return nil
  117. }