binlogstreamer.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package replication
  2. import (
  3. "context"
  4. "github.com/juju/errors"
  5. log "github.com/sirupsen/logrus"
  6. )
  7. var (
  8. ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again")
  9. ErrSyncClosed = errors.New("Sync was closed")
  10. )
  11. // BinlogStreamer gets the streaming event.
  12. type BinlogStreamer struct {
  13. ch chan *BinlogEvent
  14. ech chan error
  15. err error
  16. }
  17. // GetEvent gets the binlog event one by one, it will block until Syncer receives any events from MySQL
  18. // or meets a sync error. You can pass a context (like Cancel or Timeout) to break the block.
  19. func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) {
  20. if s.err != nil {
  21. return nil, ErrNeedSyncAgain
  22. }
  23. select {
  24. case c := <-s.ch:
  25. return c, nil
  26. case s.err = <-s.ech:
  27. return nil, s.err
  28. case <-ctx.Done():
  29. return nil, ctx.Err()
  30. }
  31. }
  32. func (s *BinlogStreamer) close() {
  33. s.closeWithError(ErrSyncClosed)
  34. }
  35. func (s *BinlogStreamer) closeWithError(err error) {
  36. if err == nil {
  37. err = ErrSyncClosed
  38. }
  39. log.Errorf("close sync with err: %v", err)
  40. select {
  41. case s.ech <- err:
  42. default:
  43. }
  44. }
  45. func newBinlogStreamer() *BinlogStreamer {
  46. s := new(BinlogStreamer)
  47. s.ch = make(chan *BinlogEvent, 10240)
  48. s.ech = make(chan error, 4)
  49. return s
  50. }