binlogsyncer.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. package replication
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/binary"
  6. "fmt"
  7. "net"
  8. "os"
  9. "sync"
  10. "time"
  11. "github.com/juju/errors"
  12. "github.com/satori/go.uuid"
  13. "github.com/siddontang/go-mysql/client"
  14. . "github.com/siddontang/go-mysql/mysql"
  15. log "github.com/sirupsen/logrus"
  16. )
  17. var (
  18. errSyncRunning = errors.New("Sync is running, must Close first")
  19. )
  20. // BinlogSyncerConfig is the configuration for BinlogSyncer.
  21. type BinlogSyncerConfig struct {
  22. // ServerID is the unique ID in cluster.
  23. ServerID uint32
  24. // Flavor is "mysql" or "mariadb", if not set, use "mysql" default.
  25. Flavor string
  26. // Host is for MySQL server host.
  27. Host string
  28. // Port is for MySQL server port.
  29. Port uint16
  30. // User is for MySQL user.
  31. User string
  32. // Password is for MySQL password.
  33. Password string
  34. // Localhost is local hostname if register salve.
  35. // If not set, use os.Hostname() instead.
  36. Localhost string
  37. // Charset is for MySQL client character set
  38. Charset string
  39. // SemiSyncEnabled enables semi-sync or not.
  40. SemiSyncEnabled bool
  41. // RawModeEnabled is for not parsing binlog event.
  42. RawModeEnabled bool
  43. // If not nil, use the provided tls.Config to connect to the database using TLS/SSL.
  44. TLSConfig *tls.Config
  45. // Use replication.Time structure for timestamp and datetime.
  46. // We will use Local location for timestamp and UTC location for datatime.
  47. ParseTime bool
  48. // Use decimal.Decimal structure for decimals.
  49. UseDecimal bool
  50. // RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection.
  51. RecvBufferSize int
  52. // master heartbeat period
  53. HeartbeatPeriod time.Duration
  54. // read timeout
  55. ReadTimeout time.Duration
  56. }
  57. // BinlogSyncer syncs binlog event from server.
  58. type BinlogSyncer struct {
  59. m sync.RWMutex
  60. cfg BinlogSyncerConfig
  61. c *client.Conn
  62. wg sync.WaitGroup
  63. parser *BinlogParser
  64. nextPos Position
  65. useGTID bool
  66. gset GTIDSet
  67. running bool
  68. ctx context.Context
  69. cancel context.CancelFunc
  70. lastConnectionID uint32
  71. }
  72. // NewBinlogSyncer creates the BinlogSyncer with cfg.
  73. func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
  74. // Clear the Password to avoid outputing it in log.
  75. pass := cfg.Password
  76. cfg.Password = ""
  77. log.Infof("create BinlogSyncer with config %v", cfg)
  78. cfg.Password = pass
  79. b := new(BinlogSyncer)
  80. b.cfg = cfg
  81. b.parser = NewBinlogParser()
  82. b.parser.SetRawMode(b.cfg.RawModeEnabled)
  83. b.parser.SetParseTime(b.cfg.ParseTime)
  84. b.parser.SetUseDecimal(b.cfg.UseDecimal)
  85. b.useGTID = false
  86. b.running = false
  87. b.ctx, b.cancel = context.WithCancel(context.Background())
  88. return b
  89. }
  90. // Close closes the BinlogSyncer.
  91. func (b *BinlogSyncer) Close() {
  92. b.m.Lock()
  93. defer b.m.Unlock()
  94. b.close()
  95. }
  96. func (b *BinlogSyncer) close() {
  97. if b.isClosed() {
  98. return
  99. }
  100. log.Info("syncer is closing...")
  101. b.running = false
  102. b.cancel()
  103. if b.c != nil {
  104. b.c.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
  105. }
  106. b.wg.Wait()
  107. if b.c != nil {
  108. b.c.Close()
  109. }
  110. log.Info("syncer is closed")
  111. }
  112. func (b *BinlogSyncer) isClosed() bool {
  113. select {
  114. case <-b.ctx.Done():
  115. return true
  116. default:
  117. return false
  118. }
  119. }
  120. func (b *BinlogSyncer) registerSlave() error {
  121. if b.c != nil {
  122. b.c.Close()
  123. }
  124. log.Infof("register slave for master server %s:%d", b.cfg.Host, b.cfg.Port)
  125. var err error
  126. b.c, err = client.Connect(fmt.Sprintf("%s:%d", b.cfg.Host, b.cfg.Port), b.cfg.User, b.cfg.Password, "", func(c *client.Conn) {
  127. c.TLSConfig = b.cfg.TLSConfig
  128. })
  129. if err != nil {
  130. return errors.Trace(err)
  131. }
  132. if len(b.cfg.Charset) != 0 {
  133. b.c.SetCharset(b.cfg.Charset)
  134. }
  135. //set read timeout
  136. if b.cfg.ReadTimeout > 0 {
  137. b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout))
  138. }
  139. if b.cfg.RecvBufferSize > 0 {
  140. if tcp, ok := b.c.Conn.Conn.(*net.TCPConn); ok {
  141. tcp.SetReadBuffer(b.cfg.RecvBufferSize)
  142. }
  143. }
  144. // kill last connection id
  145. if b.lastConnectionID > 0 {
  146. cmd := fmt.Sprintf("KILL %d", b.lastConnectionID)
  147. if _, err := b.c.Execute(cmd); err != nil {
  148. log.Errorf("kill connection %d error %v", b.lastConnectionID, err)
  149. // Unknown thread id
  150. if code := ErrorCode(err.Error()); code != ER_NO_SUCH_THREAD {
  151. return errors.Trace(err)
  152. }
  153. }
  154. log.Infof("kill last connection id %d", b.lastConnectionID)
  155. }
  156. // save last last connection id for kill
  157. b.lastConnectionID = b.c.GetConnectionID()
  158. //for mysql 5.6+, binlog has a crc32 checksum
  159. //before mysql 5.6, this will not work, don't matter.:-)
  160. if r, err := b.c.Execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'"); err != nil {
  161. return errors.Trace(err)
  162. } else {
  163. s, _ := r.GetString(0, 1)
  164. if s != "" {
  165. // maybe CRC32 or NONE
  166. // mysqlbinlog.cc use NONE, see its below comments:
  167. // Make a notice to the server that this client
  168. // is checksum-aware. It does not need the first fake Rotate
  169. // necessary checksummed.
  170. // That preference is specified below.
  171. if _, err = b.c.Execute(`SET @master_binlog_checksum='NONE'`); err != nil {
  172. return errors.Trace(err)
  173. }
  174. // if _, err = b.c.Execute(`SET @master_binlog_checksum=@@global.binlog_checksum`); err != nil {
  175. // return errors.Trace(err)
  176. // }
  177. }
  178. }
  179. if b.cfg.Flavor == MariaDBFlavor {
  180. // Refer https://github.com/alibaba/canal/wiki/BinlogChange(MariaDB5&10)
  181. // Tell the server that we understand GTIDs by setting our slave capability
  182. // to MARIA_SLAVE_CAPABILITY_GTID = 4 (MariaDB >= 10.0.1).
  183. if _, err := b.c.Execute("SET @mariadb_slave_capability=4"); err != nil {
  184. return errors.Errorf("failed to set @mariadb_slave_capability=4: %v", err)
  185. }
  186. }
  187. if b.cfg.HeartbeatPeriod > 0 {
  188. _, err = b.c.Execute(fmt.Sprintf("SET @master_heartbeat_period=%d;", b.cfg.HeartbeatPeriod))
  189. if err != nil {
  190. log.Error("failed to set @master_heartbeat_period=%d", b.cfg.HeartbeatPeriod, err)
  191. return errors.Trace(err)
  192. }
  193. }
  194. if err = b.writeRegisterSlaveCommand(); err != nil {
  195. return errors.Trace(err)
  196. }
  197. if _, err = b.c.ReadOKPacket(); err != nil {
  198. return errors.Trace(err)
  199. }
  200. return nil
  201. }
  202. func (b *BinlogSyncer) enableSemiSync() error {
  203. if !b.cfg.SemiSyncEnabled {
  204. return nil
  205. }
  206. if r, err := b.c.Execute("SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';"); err != nil {
  207. return errors.Trace(err)
  208. } else {
  209. s, _ := r.GetString(0, 1)
  210. if s != "ON" {
  211. log.Errorf("master does not support semi synchronous replication, use no semi-sync")
  212. b.cfg.SemiSyncEnabled = false
  213. return nil
  214. }
  215. }
  216. _, err := b.c.Execute(`SET @rpl_semi_sync_slave = 1;`)
  217. if err != nil {
  218. return errors.Trace(err)
  219. }
  220. return nil
  221. }
  222. func (b *BinlogSyncer) prepare() error {
  223. if b.isClosed() {
  224. return errors.Trace(ErrSyncClosed)
  225. }
  226. if err := b.registerSlave(); err != nil {
  227. return errors.Trace(err)
  228. }
  229. if err := b.enableSemiSync(); err != nil {
  230. return errors.Trace(err)
  231. }
  232. return nil
  233. }
  234. func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
  235. b.running = true
  236. s := newBinlogStreamer()
  237. b.wg.Add(1)
  238. go b.onStream(s)
  239. return s
  240. }
  241. // GetNextPosition returns the next position of the syncer
  242. func (b *BinlogSyncer) GetNextPosition() Position {
  243. return b.nextPos
  244. }
  245. // StartSync starts syncing from the `pos` position.
  246. func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
  247. log.Infof("begin to sync binlog from position %s", pos)
  248. b.m.Lock()
  249. defer b.m.Unlock()
  250. if b.running {
  251. return nil, errors.Trace(errSyncRunning)
  252. }
  253. if err := b.prepareSyncPos(pos); err != nil {
  254. return nil, errors.Trace(err)
  255. }
  256. return b.startDumpStream(), nil
  257. }
  258. // StartSyncGTID starts syncing from the `gset` GTIDSet.
  259. func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
  260. log.Infof("begin to sync binlog from GTID %s", gset)
  261. b.useGTID = true
  262. b.gset = gset
  263. b.m.Lock()
  264. defer b.m.Unlock()
  265. if b.running {
  266. return nil, errors.Trace(errSyncRunning)
  267. }
  268. if err := b.prepare(); err != nil {
  269. return nil, errors.Trace(err)
  270. }
  271. var err error
  272. if b.cfg.Flavor != MariaDBFlavor {
  273. // default use MySQL
  274. err = b.writeBinlogDumpMysqlGTIDCommand(gset)
  275. } else {
  276. err = b.writeBinlogDumpMariadbGTIDCommand(gset)
  277. }
  278. if err != nil {
  279. return nil, err
  280. }
  281. return b.startDumpStream(), nil
  282. }
  283. func (b *BinlogSyncer) writeBinlogDumpCommand(p Position) error {
  284. b.c.ResetSequence()
  285. data := make([]byte, 4+1+4+2+4+len(p.Name))
  286. pos := 4
  287. data[pos] = COM_BINLOG_DUMP
  288. pos++
  289. binary.LittleEndian.PutUint32(data[pos:], p.Pos)
  290. pos += 4
  291. binary.LittleEndian.PutUint16(data[pos:], BINLOG_DUMP_NEVER_STOP)
  292. pos += 2
  293. binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID)
  294. pos += 4
  295. copy(data[pos:], p.Name)
  296. return b.c.WritePacket(data)
  297. }
  298. func (b *BinlogSyncer) writeBinlogDumpMysqlGTIDCommand(gset GTIDSet) error {
  299. p := Position{"", 4}
  300. gtidData := gset.Encode()
  301. b.c.ResetSequence()
  302. data := make([]byte, 4+1+2+4+4+len(p.Name)+8+4+len(gtidData))
  303. pos := 4
  304. data[pos] = COM_BINLOG_DUMP_GTID
  305. pos++
  306. binary.LittleEndian.PutUint16(data[pos:], 0)
  307. pos += 2
  308. binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID)
  309. pos += 4
  310. binary.LittleEndian.PutUint32(data[pos:], uint32(len(p.Name)))
  311. pos += 4
  312. n := copy(data[pos:], p.Name)
  313. pos += n
  314. binary.LittleEndian.PutUint64(data[pos:], uint64(p.Pos))
  315. pos += 8
  316. binary.LittleEndian.PutUint32(data[pos:], uint32(len(gtidData)))
  317. pos += 4
  318. n = copy(data[pos:], gtidData)
  319. pos += n
  320. data = data[0:pos]
  321. return b.c.WritePacket(data)
  322. }
  323. func (b *BinlogSyncer) writeBinlogDumpMariadbGTIDCommand(gset GTIDSet) error {
  324. // Copy from vitess
  325. startPos := gset.String()
  326. // Set the slave_connect_state variable before issuing COM_BINLOG_DUMP to
  327. // provide the start position in GTID form.
  328. query := fmt.Sprintf("SET @slave_connect_state='%s'", startPos)
  329. if _, err := b.c.Execute(query); err != nil {
  330. return errors.Errorf("failed to set @slave_connect_state='%s': %v", startPos, err)
  331. }
  332. // Real slaves set this upon connecting if their gtid_strict_mode option was
  333. // enabled. We always use gtid_strict_mode because we need it to make our
  334. // internal GTID comparisons safe.
  335. if _, err := b.c.Execute("SET @slave_gtid_strict_mode=1"); err != nil {
  336. return errors.Errorf("failed to set @slave_gtid_strict_mode=1: %v", err)
  337. }
  338. // Since we use @slave_connect_state, the file and position here are ignored.
  339. return b.writeBinlogDumpCommand(Position{"", 0})
  340. }
  341. // localHostname returns the hostname that register slave would register as.
  342. func (b *BinlogSyncer) localHostname() string {
  343. if len(b.cfg.Localhost) == 0 {
  344. h, _ := os.Hostname()
  345. return h
  346. }
  347. return b.cfg.Localhost
  348. }
  349. func (b *BinlogSyncer) writeRegisterSlaveCommand() error {
  350. b.c.ResetSequence()
  351. hostname := b.localHostname()
  352. // This should be the name of slave host not the host we are connecting to.
  353. data := make([]byte, 4+1+4+1+len(hostname)+1+len(b.cfg.User)+1+len(b.cfg.Password)+2+4+4)
  354. pos := 4
  355. data[pos] = COM_REGISTER_SLAVE
  356. pos++
  357. binary.LittleEndian.PutUint32(data[pos:], b.cfg.ServerID)
  358. pos += 4
  359. // This should be the name of slave hostname not the host we are connecting to.
  360. data[pos] = uint8(len(hostname))
  361. pos++
  362. n := copy(data[pos:], hostname)
  363. pos += n
  364. data[pos] = uint8(len(b.cfg.User))
  365. pos++
  366. n = copy(data[pos:], b.cfg.User)
  367. pos += n
  368. data[pos] = uint8(len(b.cfg.Password))
  369. pos++
  370. n = copy(data[pos:], b.cfg.Password)
  371. pos += n
  372. binary.LittleEndian.PutUint16(data[pos:], b.cfg.Port)
  373. pos += 2
  374. //replication rank, not used
  375. binary.LittleEndian.PutUint32(data[pos:], 0)
  376. pos += 4
  377. // master ID, 0 is OK
  378. binary.LittleEndian.PutUint32(data[pos:], 0)
  379. return b.c.WritePacket(data)
  380. }
  381. func (b *BinlogSyncer) replySemiSyncACK(p Position) error {
  382. b.c.ResetSequence()
  383. data := make([]byte, 4+1+8+len(p.Name))
  384. pos := 4
  385. // semi sync indicator
  386. data[pos] = SemiSyncIndicator
  387. pos++
  388. binary.LittleEndian.PutUint64(data[pos:], uint64(p.Pos))
  389. pos += 8
  390. copy(data[pos:], p.Name)
  391. err := b.c.WritePacket(data)
  392. if err != nil {
  393. return errors.Trace(err)
  394. }
  395. return nil
  396. }
  397. func (b *BinlogSyncer) retrySync() error {
  398. b.m.Lock()
  399. defer b.m.Unlock()
  400. b.parser.Reset()
  401. if b.useGTID {
  402. log.Infof("begin to re-sync from %s", b.gset.String())
  403. if err := b.prepareSyncGTID(b.gset); err != nil {
  404. return errors.Trace(err)
  405. }
  406. } else {
  407. log.Infof("begin to re-sync from %s", b.nextPos)
  408. if err := b.prepareSyncPos(b.nextPos); err != nil {
  409. return errors.Trace(err)
  410. }
  411. }
  412. return nil
  413. }
  414. func (b *BinlogSyncer) prepareSyncPos(pos Position) error {
  415. // always start from position 4
  416. if pos.Pos < 4 {
  417. pos.Pos = 4
  418. }
  419. if err := b.prepare(); err != nil {
  420. return errors.Trace(err)
  421. }
  422. if err := b.writeBinlogDumpCommand(pos); err != nil {
  423. return errors.Trace(err)
  424. }
  425. return nil
  426. }
  427. func (b *BinlogSyncer) prepareSyncGTID(gset GTIDSet) error {
  428. var err error
  429. if err = b.prepare(); err != nil {
  430. return errors.Trace(err)
  431. }
  432. if b.cfg.Flavor != MariaDBFlavor {
  433. // default use MySQL
  434. err = b.writeBinlogDumpMysqlGTIDCommand(gset)
  435. } else {
  436. err = b.writeBinlogDumpMariadbGTIDCommand(gset)
  437. }
  438. if err != nil {
  439. return err
  440. }
  441. return nil
  442. }
  443. func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
  444. defer func() {
  445. if e := recover(); e != nil {
  446. s.closeWithError(fmt.Errorf("Err: %v\n Stack: %s", e, Pstack()))
  447. }
  448. b.wg.Done()
  449. }()
  450. for {
  451. data, err := b.c.ReadPacket()
  452. if err != nil {
  453. log.Error(err)
  454. // we meet connection error, should re-connect again with
  455. // last nextPos or nextGTID we got.
  456. if len(b.nextPos.Name) == 0 && b.gset == nil {
  457. // we can't get the correct position, close.
  458. s.closeWithError(err)
  459. return
  460. }
  461. // TODO: add a max retry count.
  462. for {
  463. select {
  464. case <-b.ctx.Done():
  465. s.close()
  466. return
  467. case <-time.After(time.Second):
  468. if err = b.retrySync(); err != nil {
  469. log.Errorf("retry sync err: %v, wait 1s and retry again", err)
  470. continue
  471. }
  472. }
  473. break
  474. }
  475. // we connect the server and begin to re-sync again.
  476. continue
  477. }
  478. //set read timeout
  479. if b.cfg.ReadTimeout > 0 {
  480. b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout))
  481. }
  482. switch data[0] {
  483. case OK_HEADER:
  484. if err = b.parseEvent(s, data); err != nil {
  485. s.closeWithError(err)
  486. return
  487. }
  488. case ERR_HEADER:
  489. err = b.c.HandleErrorPacket(data)
  490. s.closeWithError(err)
  491. return
  492. case EOF_HEADER:
  493. // Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
  494. // In the MySQL client/server protocol, EOF and OK packets serve the same purpose.
  495. // Some users told me that they received EOF packet here, but I don't know why.
  496. // So we only log a message and retry ReadPacket.
  497. log.Info("receive EOF packet, retry ReadPacket")
  498. continue
  499. default:
  500. log.Errorf("invalid stream header %c", data[0])
  501. continue
  502. }
  503. }
  504. }
  505. func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
  506. //skip OK byte, 0x00
  507. data = data[1:]
  508. needACK := false
  509. if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) {
  510. needACK = (data[1] == 0x01)
  511. //skip semi sync header
  512. data = data[2:]
  513. }
  514. e, err := b.parser.Parse(data)
  515. if err != nil {
  516. return errors.Trace(err)
  517. }
  518. if e.Header.LogPos > 0 {
  519. // Some events like FormatDescriptionEvent return 0, ignore.
  520. b.nextPos.Pos = e.Header.LogPos
  521. }
  522. switch event := e.Event.(type) {
  523. case *RotateEvent:
  524. b.nextPos.Name = string(event.NextLogName)
  525. b.nextPos.Pos = uint32(event.Position)
  526. log.Infof("rotate to %s", b.nextPos)
  527. case *GTIDEvent:
  528. if !b.useGTID {
  529. break
  530. }
  531. u, _ := uuid.FromBytes(event.SID)
  532. err := b.gset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO))
  533. if err != nil {
  534. return errors.Trace(err)
  535. }
  536. case *MariadbGTIDEvent:
  537. if !b.useGTID {
  538. break
  539. }
  540. GTID := event.GTID
  541. err := b.gset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
  542. if err != nil {
  543. return errors.Trace(err)
  544. }
  545. case *XIDEvent:
  546. event.GSet = b.getGtidSet()
  547. case *QueryEvent:
  548. event.GSet = b.getGtidSet()
  549. }
  550. needStop := false
  551. select {
  552. case s.ch <- e:
  553. case <-b.ctx.Done():
  554. needStop = true
  555. }
  556. if needACK {
  557. err := b.replySemiSyncACK(b.nextPos)
  558. if err != nil {
  559. return errors.Trace(err)
  560. }
  561. }
  562. if needStop {
  563. return errors.New("sync is been closing...")
  564. }
  565. return nil
  566. }
  567. func (b *BinlogSyncer) getGtidSet() GTIDSet {
  568. var gtidSet GTIDSet
  569. if !b.useGTID {
  570. return nil
  571. }
  572. if b.cfg.Flavor != MariaDBFlavor {
  573. gtidSet, _ = ParseGTIDSet(MySQLFlavor, b.gset.String())
  574. } else {
  575. gtidSet, _ = ParseGTIDSet(MariaDBFlavor, b.gset.String())
  576. }
  577. return gtidSet
  578. }
  579. // LastConnectionID returns last connectionID.
  580. func (b *BinlogSyncer) LastConnectionID() uint32 {
  581. return b.lastConnectionID
  582. }