canal.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. package canal
  2. import (
  3. "context"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/juju/errors"
  13. "github.com/siddontang/go-mysql/client"
  14. "github.com/siddontang/go-mysql/dump"
  15. "github.com/siddontang/go-mysql/mysql"
  16. "github.com/siddontang/go-mysql/replication"
  17. "github.com/siddontang/go-mysql/schema"
  18. log "github.com/sirupsen/logrus"
  19. )
  20. // Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
  21. // MySQL must open row format for binlog
  22. type Canal struct {
  23. m sync.Mutex
  24. cfg *Config
  25. useGTID bool
  26. master *masterInfo
  27. dumper *dump.Dumper
  28. dumpDoneCh chan struct{}
  29. syncer *replication.BinlogSyncer
  30. eventHandler EventHandler
  31. connLock sync.Mutex
  32. conn *client.Conn
  33. tableLock sync.RWMutex
  34. tables map[string]*schema.Table
  35. errorTablesGetTime map[string]time.Time
  36. tableMatchCache map[string]bool
  37. includeTableRegex []*regexp.Regexp
  38. excludeTableRegex []*regexp.Regexp
  39. ctx context.Context
  40. cancel context.CancelFunc
  41. }
  42. // canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
  43. var UnknownTableRetryPeriod = time.Second * time.Duration(10)
  44. var ErrExcludedTable = errors.New("excluded table meta")
  45. func NewCanal(cfg *Config) (*Canal, error) {
  46. c := new(Canal)
  47. c.cfg = cfg
  48. c.ctx, c.cancel = context.WithCancel(context.Background())
  49. c.dumpDoneCh = make(chan struct{})
  50. c.eventHandler = &DummyEventHandler{}
  51. c.tables = make(map[string]*schema.Table)
  52. if c.cfg.DiscardNoMetaRowEvent {
  53. c.errorTablesGetTime = make(map[string]time.Time)
  54. }
  55. c.master = &masterInfo{}
  56. var err error
  57. if err = c.prepareDumper(); err != nil {
  58. return nil, errors.Trace(err)
  59. }
  60. if err = c.prepareSyncer(); err != nil {
  61. return nil, errors.Trace(err)
  62. }
  63. if err := c.checkBinlogRowFormat(); err != nil {
  64. return nil, errors.Trace(err)
  65. }
  66. // init table filter
  67. if n := len(c.cfg.IncludeTableRegex); n > 0 {
  68. c.includeTableRegex = make([]*regexp.Regexp, n)
  69. for i, val := range c.cfg.IncludeTableRegex {
  70. reg, err := regexp.Compile(val)
  71. if err != nil {
  72. return nil, errors.Trace(err)
  73. }
  74. c.includeTableRegex[i] = reg
  75. }
  76. }
  77. if n := len(c.cfg.ExcludeTableRegex); n > 0 {
  78. c.excludeTableRegex = make([]*regexp.Regexp, n)
  79. for i, val := range c.cfg.ExcludeTableRegex {
  80. reg, err := regexp.Compile(val)
  81. if err != nil {
  82. return nil, errors.Trace(err)
  83. }
  84. c.excludeTableRegex[i] = reg
  85. }
  86. }
  87. if c.includeTableRegex != nil || c.excludeTableRegex != nil {
  88. c.tableMatchCache = make(map[string]bool)
  89. }
  90. return c, nil
  91. }
  92. func (c *Canal) prepareDumper() error {
  93. var err error
  94. dumpPath := c.cfg.Dump.ExecutionPath
  95. if len(dumpPath) == 0 {
  96. // ignore mysqldump, use binlog only
  97. return nil
  98. }
  99. if c.dumper, err = dump.NewDumper(dumpPath,
  100. c.cfg.Addr, c.cfg.User, c.cfg.Password); err != nil {
  101. return errors.Trace(err)
  102. }
  103. if c.dumper == nil {
  104. //no mysqldump, use binlog only
  105. return nil
  106. }
  107. dbs := c.cfg.Dump.Databases
  108. tables := c.cfg.Dump.Tables
  109. tableDB := c.cfg.Dump.TableDB
  110. if len(tables) == 0 {
  111. c.dumper.AddDatabases(dbs...)
  112. } else {
  113. c.dumper.AddTables(tableDB, tables...)
  114. }
  115. charset := c.cfg.Charset
  116. c.dumper.SetCharset(charset)
  117. c.dumper.SkipMasterData(c.cfg.Dump.SkipMasterData)
  118. c.dumper.SetMaxAllowedPacket(c.cfg.Dump.MaxAllowedPacketMB)
  119. for _, ignoreTable := range c.cfg.Dump.IgnoreTables {
  120. if seps := strings.Split(ignoreTable, ","); len(seps) == 2 {
  121. c.dumper.AddIgnoreTables(seps[0], seps[1])
  122. }
  123. }
  124. if c.cfg.Dump.DiscardErr {
  125. c.dumper.SetErrOut(ioutil.Discard)
  126. } else {
  127. c.dumper.SetErrOut(os.Stderr)
  128. }
  129. return nil
  130. }
  131. // Run will first try to dump all data from MySQL master `mysqldump`,
  132. // then sync from the binlog position in the dump data.
  133. // It will run forever until meeting an error or Canal closed.
  134. func (c *Canal) Run() error {
  135. return c.run()
  136. }
  137. // RunFrom will sync from the binlog position directly, ignore mysqldump.
  138. func (c *Canal) RunFrom(pos mysql.Position) error {
  139. c.useGTID = false
  140. c.master.Update(pos)
  141. return c.Run()
  142. }
  143. func (c *Canal) StartFromGTID(set mysql.GTIDSet) error {
  144. c.useGTID = true
  145. c.master.UpdateGTID(set)
  146. return c.Run()
  147. }
  148. func (c *Canal) run() error {
  149. defer func() {
  150. c.cancel()
  151. }()
  152. err := c.tryDump()
  153. close(c.dumpDoneCh)
  154. if err != nil {
  155. log.Errorf("canal dump mysql err: %v", err)
  156. return errors.Trace(err)
  157. }
  158. if err = c.runSyncBinlog(); err != nil {
  159. log.Errorf("canal start sync binlog err: %v", err)
  160. return errors.Trace(err)
  161. }
  162. return nil
  163. }
  164. func (c *Canal) Close() {
  165. log.Infof("closing canal")
  166. c.m.Lock()
  167. defer c.m.Unlock()
  168. c.cancel()
  169. c.connLock.Lock()
  170. c.conn.Close()
  171. c.conn = nil
  172. c.connLock.Unlock()
  173. c.syncer.Close()
  174. c.eventHandler.OnPosSynced(c.master.Position(), true)
  175. }
  176. func (c *Canal) WaitDumpDone() <-chan struct{} {
  177. return c.dumpDoneCh
  178. }
  179. func (c *Canal) Ctx() context.Context {
  180. return c.ctx
  181. }
  182. func (c *Canal) checkTableMatch(key string) bool {
  183. // no filter, return true
  184. if c.tableMatchCache == nil {
  185. return true
  186. }
  187. c.tableLock.RLock()
  188. rst, ok := c.tableMatchCache[key]
  189. c.tableLock.RUnlock()
  190. if ok {
  191. // cache hit
  192. return rst
  193. }
  194. matchFlag := false
  195. // check include
  196. if c.includeTableRegex != nil {
  197. for _, reg := range c.includeTableRegex {
  198. if reg.MatchString(key) {
  199. matchFlag = true
  200. break
  201. }
  202. }
  203. }
  204. // check exclude
  205. if matchFlag && c.excludeTableRegex != nil {
  206. for _, reg := range c.excludeTableRegex {
  207. if reg.MatchString(key) {
  208. matchFlag = false
  209. break
  210. }
  211. }
  212. }
  213. c.tableLock.Lock()
  214. c.tableMatchCache[key] = matchFlag
  215. c.tableLock.Unlock()
  216. return matchFlag
  217. }
  218. func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
  219. key := fmt.Sprintf("%s.%s", db, table)
  220. // if table is excluded, return error and skip parsing event or dump
  221. if !c.checkTableMatch(key) {
  222. return nil, ErrExcludedTable
  223. }
  224. c.tableLock.RLock()
  225. t, ok := c.tables[key]
  226. c.tableLock.RUnlock()
  227. if ok {
  228. return t, nil
  229. }
  230. if c.cfg.DiscardNoMetaRowEvent {
  231. c.tableLock.RLock()
  232. lastTime, ok := c.errorTablesGetTime[key]
  233. c.tableLock.RUnlock()
  234. if ok && time.Now().Sub(lastTime) < UnknownTableRetryPeriod {
  235. return nil, schema.ErrMissingTableMeta
  236. }
  237. }
  238. t, err := schema.NewTable(c, db, table)
  239. if err != nil {
  240. // check table not exists
  241. if ok, err1 := schema.IsTableExist(c, db, table); err1 == nil && !ok {
  242. return nil, schema.ErrTableNotExist
  243. }
  244. // work around : RDS HAHeartBeat
  245. // ref : https://github.com/alibaba/canal/blob/master/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L385
  246. // issue : https://github.com/alibaba/canal/issues/222
  247. // This is a common error in RDS that canal can't get HAHealthCheckSchema's meta, so we mock a table meta.
  248. // If canal just skip and log error, as RDS HA heartbeat interval is very short, so too many HAHeartBeat errors will be logged.
  249. if key == schema.HAHealthCheckSchema {
  250. // mock ha_health_check meta
  251. ta := &schema.Table{
  252. Schema: db,
  253. Name: table,
  254. Columns: make([]schema.TableColumn, 0, 2),
  255. Indexes: make([]*schema.Index, 0),
  256. }
  257. ta.AddColumn("id", "bigint(20)", "", "")
  258. ta.AddColumn("type", "char(1)", "", "")
  259. c.tableLock.Lock()
  260. c.tables[key] = ta
  261. c.tableLock.Unlock()
  262. return ta, nil
  263. }
  264. // if DiscardNoMetaRowEvent is true, we just log this error
  265. if c.cfg.DiscardNoMetaRowEvent {
  266. c.tableLock.Lock()
  267. c.errorTablesGetTime[key] = time.Now()
  268. c.tableLock.Unlock()
  269. // log error and return ErrMissingTableMeta
  270. log.Errorf("canal get table meta err: %v", errors.Trace(err))
  271. return nil, schema.ErrMissingTableMeta
  272. }
  273. return nil, err
  274. }
  275. c.tableLock.Lock()
  276. c.tables[key] = t
  277. if c.cfg.DiscardNoMetaRowEvent {
  278. // if get table info success, delete this key from errorTablesGetTime
  279. delete(c.errorTablesGetTime, key)
  280. }
  281. c.tableLock.Unlock()
  282. return t, nil
  283. }
  284. // ClearTableCache clear table cache
  285. func (c *Canal) ClearTableCache(db []byte, table []byte) {
  286. key := fmt.Sprintf("%s.%s", db, table)
  287. c.tableLock.Lock()
  288. delete(c.tables, key)
  289. if c.cfg.DiscardNoMetaRowEvent {
  290. delete(c.errorTablesGetTime, key)
  291. }
  292. c.tableLock.Unlock()
  293. }
  294. // Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
  295. func (c *Canal) CheckBinlogRowImage(image string) error {
  296. // need to check MySQL binlog row image? full, minimal or noblob?
  297. // now only log
  298. if c.cfg.Flavor == mysql.MySQLFlavor {
  299. if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err != nil {
  300. return errors.Trace(err)
  301. } else {
  302. // MySQL has binlog row image from 5.6, so older will return empty
  303. rowImage, _ := res.GetString(0, 1)
  304. if rowImage != "" && !strings.EqualFold(rowImage, image) {
  305. return errors.Errorf("MySQL uses %s binlog row image, but we want %s", rowImage, image)
  306. }
  307. }
  308. }
  309. return nil
  310. }
  311. func (c *Canal) checkBinlogRowFormat() error {
  312. res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_format";`)
  313. if err != nil {
  314. return errors.Trace(err)
  315. } else if f, _ := res.GetString(0, 1); f != "ROW" {
  316. return errors.Errorf("binlog must ROW format, but %s now", f)
  317. }
  318. return nil
  319. }
  320. func (c *Canal) prepareSyncer() error {
  321. seps := strings.Split(c.cfg.Addr, ":")
  322. if len(seps) != 2 {
  323. return errors.Errorf("invalid mysql addr format %s, must host:port", c.cfg.Addr)
  324. }
  325. port, err := strconv.ParseUint(seps[1], 10, 16)
  326. if err != nil {
  327. return errors.Trace(err)
  328. }
  329. cfg := replication.BinlogSyncerConfig{
  330. ServerID: c.cfg.ServerID,
  331. Flavor: c.cfg.Flavor,
  332. Host: seps[0],
  333. Port: uint16(port),
  334. User: c.cfg.User,
  335. Password: c.cfg.Password,
  336. Charset: c.cfg.Charset,
  337. HeartbeatPeriod: c.cfg.HeartbeatPeriod,
  338. ReadTimeout: c.cfg.ReadTimeout,
  339. UseDecimal: c.cfg.UseDecimal,
  340. }
  341. c.syncer = replication.NewBinlogSyncer(cfg)
  342. return nil
  343. }
  344. // Execute a SQL
  345. func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) {
  346. c.connLock.Lock()
  347. defer c.connLock.Unlock()
  348. retryNum := 3
  349. for i := 0; i < retryNum; i++ {
  350. if c.conn == nil {
  351. c.conn, err = client.Connect(c.cfg.Addr, c.cfg.User, c.cfg.Password, "")
  352. if err != nil {
  353. return nil, errors.Trace(err)
  354. }
  355. }
  356. rr, err = c.conn.Execute(cmd, args...)
  357. if err != nil && !mysql.ErrorEqual(err, mysql.ErrBadConn) {
  358. return
  359. } else if mysql.ErrorEqual(err, mysql.ErrBadConn) {
  360. c.conn.Close()
  361. c.conn = nil
  362. continue
  363. } else {
  364. return
  365. }
  366. }
  367. return
  368. }
  369. func (c *Canal) SyncedPosition() mysql.Position {
  370. return c.master.Position()
  371. }