sync.go 6.0 KB


  1. package canal
  2. import (
  3. "fmt"
  4. "regexp"
  5. "time"
  6. "github.com/juju/errors"
  7. "github.com/satori/go.uuid"
  8. "github.com/siddontang/go-mysql/mysql"
  9. "github.com/siddontang/go-mysql/replication"
  10. "github.com/siddontang/go-mysql/schema"
  11. log "github.com/sirupsen/logrus"
  12. )
  13. var (
  14. expCreateTable = regexp.MustCompile("(?i)^CREATE\\sTABLE(\\sIF\\sNOT\\sEXISTS)?\\s`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
  15. expAlterTable = regexp.MustCompile("(?i)^ALTER\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
  16. expRenameTable = regexp.MustCompile("(?i)^RENAME\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s{1,}TO\\s.*?")
  17. expDropTable = regexp.MustCompile("(?i)^DROP\\sTABLE(\\sIF\\sEXISTS){0,1}\\s`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}($|\\s)")
  18. )
  19. func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
  20. if !c.useGTID {
  21. pos := c.master.Position()
  22. s, err := c.syncer.StartSync(pos)
  23. if err != nil {
  24. return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err)
  25. }
  26. log.Infof("start sync binlog at binlog file %v", pos)
  27. return s, nil
  28. } else {
  29. gset := c.master.GTID()
  30. s, err := c.syncer.StartSyncGTID(gset)
  31. if err != nil {
  32. return nil, errors.Errorf("start sync replication at GTID %v error %v", gset, err)
  33. }
  34. log.Infof("start sync binlog at GTID %v", gset)
  35. return s, nil
  36. }
  37. }
  38. func (c *Canal) runSyncBinlog() error {
  39. s, err := c.startSyncer()
  40. if err != nil {
  41. return err
  42. }
  43. savePos := false
  44. force := false
  45. for {
  46. ev, err := s.GetEvent(c.ctx)
  47. if err != nil {
  48. return errors.Trace(err)
  49. }
  50. savePos = false
  51. force = false
  52. pos := c.master.Position()
  53. curPos := pos.Pos
  54. //next binlog pos
  55. pos.Pos = ev.Header.LogPos
  56. // We only save position with RotateEvent and XIDEvent.
  57. // For RowsEvent, we can't save the position until meeting XIDEvent
  58. // which tells the whole transaction is over.
  59. // TODO: If we meet any DDL query, we must save too.
  60. switch e := ev.Event.(type) {
  61. case *replication.RotateEvent:
  62. pos.Name = string(e.NextLogName)
  63. pos.Pos = uint32(e.Position)
  64. log.Infof("rotate binlog to %s", pos)
  65. savePos = true
  66. force = true
  67. if err = c.eventHandler.OnRotate(e); err != nil {
  68. return errors.Trace(err)
  69. }
  70. case *replication.RowsEvent:
  71. // we only focus row based event
  72. err = c.handleRowsEvent(ev)
  73. if err != nil {
  74. e := errors.Cause(err)
  75. // if error is not ErrExcludedTable or ErrTableNotExist or ErrMissingTableMeta, stop canal
  76. if e != ErrExcludedTable &&
  77. e != schema.ErrTableNotExist &&
  78. e != schema.ErrMissingTableMeta {
  79. log.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
  80. return errors.Trace(err)
  81. }
  82. }
  83. continue
  84. case *replication.XIDEvent:
  85. savePos = true
  86. // try to save the position later
  87. if err := c.eventHandler.OnXID(pos); err != nil {
  88. return errors.Trace(err)
  89. }
  90. case *replication.MariadbGTIDEvent:
  91. // try to save the GTID later
  92. gtid := &e.GTID
  93. c.master.UpdateGTID(gtid)
  94. if err := c.eventHandler.OnGTID(gtid); err != nil {
  95. return errors.Trace(err)
  96. }
  97. case *replication.GTIDEvent:
  98. u, _ := uuid.FromBytes(e.SID)
  99. gset, err := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d", u.String(), e.GNO))
  100. if err != nil {
  101. return errors.Trace(err)
  102. }
  103. c.master.UpdateGTID(gset)
  104. if err := c.eventHandler.OnGTID(gset); err != nil {
  105. return errors.Trace(err)
  106. }
  107. case *replication.QueryEvent:
  108. var (
  109. mb [][]byte
  110. schema []byte
  111. table []byte
  112. )
  113. regexps := []regexp.Regexp{*expCreateTable, *expAlterTable, *expRenameTable, *expDropTable}
  114. for _, reg := range regexps {
  115. mb = reg.FindSubmatch(e.Query)
  116. if len(mb) != 0 {
  117. break
  118. }
  119. }
  120. mbLen := len(mb)
  121. if mbLen == 0 {
  122. continue
  123. }
  124. // the first last is table name, the second last is database name(if exists)
  125. if len(mb[mbLen-2]) == 0 {
  126. schema = e.Schema
  127. } else {
  128. schema = mb[mbLen-2]
  129. }
  130. table = mb[mbLen-1]
  131. savePos = true
  132. force = true
  133. c.ClearTableCache(schema, table)
  134. log.Infof("table structure changed, clear table cache: %s.%s\n", schema, table)
  135. if err = c.eventHandler.OnDDL(pos, e); err != nil {
  136. return errors.Trace(err)
  137. }
  138. default:
  139. continue
  140. }
  141. if savePos {
  142. c.master.Update(pos)
  143. c.eventHandler.OnPosSynced(pos, force)
  144. }
  145. }
  146. return nil
  147. }
  148. func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
  149. ev := e.Event.(*replication.RowsEvent)
  150. // Caveat: table may be altered at runtime.
  151. schema := string(ev.Table.Schema)
  152. table := string(ev.Table.Table)
  153. t, err := c.GetTable(schema, table)
  154. if err != nil {
  155. return err
  156. }
  157. var action string
  158. switch e.Header.EventType {
  159. case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
  160. action = InsertAction
  161. case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
  162. action = DeleteAction
  163. case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
  164. action = UpdateAction
  165. default:
  166. return errors.Errorf("%s not supported now", e.Header.EventType)
  167. }
  168. events := newRowsEvent(t, action, ev.Rows)
  169. return c.eventHandler.OnRow(events)
  170. }
  171. func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
  172. timer := time.NewTimer(timeout)
  173. for {
  174. select {
  175. case <-timer.C:
  176. return errors.Errorf("wait position %v too long > %s", pos, timeout)
  177. default:
  178. curPos := c.master.Position()
  179. if curPos.Compare(pos) >= 0 {
  180. return nil
  181. } else {
  182. log.Debugf("master pos is %v, wait catching %v", curPos, pos)
  183. time.Sleep(100 * time.Millisecond)
  184. }
  185. }
  186. }
  187. return nil
  188. }
  189. func (c *Canal) GetMasterPos() (mysql.Position, error) {
  190. rr, err := c.Execute("SHOW MASTER STATUS")
  191. if err != nil {
  192. return mysql.Position{"", 0}, errors.Trace(err)
  193. }
  194. name, _ := rr.GetString(0, 0)
  195. pos, _ := rr.GetInt(0, 1)
  196. return mysql.Position{name, uint32(pos)}, nil
  197. }
  198. func (c *Canal) CatchMasterPos(timeout time.Duration) error {
  199. pos, err := c.GetMasterPos()
  200. if err != nil {
  201. return errors.Trace(err)
  202. }
  203. return c.WaitUntilPos(pos, timeout)
  204. }