backup.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package replication
  2. import (
  3. "context"
  4. "io"
  5. "os"
  6. "path"
  7. "time"
  8. "github.com/juju/errors"
  9. . "github.com/siddontang/go-mysql/mysql"
  10. )
  11. // Like mysqlbinlog remote raw backup
  12. // Backup remote binlog from position (filename, offset) and write in backupDir
  13. func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error {
  14. if timeout == 0 {
  15. // a very long timeout here
  16. timeout = 30 * 3600 * 24 * time.Second
  17. }
  18. // Force use raw mode
  19. b.parser.SetRawMode(true)
  20. os.MkdirAll(backupDir, 0755)
  21. s, err := b.StartSync(p)
  22. if err != nil {
  23. return errors.Trace(err)
  24. }
  25. var filename string
  26. var offset uint32
  27. var f *os.File
  28. defer func() {
  29. if f != nil {
  30. f.Close()
  31. }
  32. }()
  33. for {
  34. ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
  35. e, err := s.GetEvent(ctx)
  36. cancel()
  37. if err == context.DeadlineExceeded {
  38. return nil
  39. }
  40. if err != nil {
  41. return errors.Trace(err)
  42. }
  43. offset = e.Header.LogPos
  44. if e.Header.EventType == ROTATE_EVENT {
  45. rotateEvent := e.Event.(*RotateEvent)
  46. filename = string(rotateEvent.NextLogName)
  47. if e.Header.Timestamp == 0 || offset == 0 {
  48. // fake rotate event
  49. continue
  50. }
  51. } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
  52. // FormateDescriptionEvent is the first event in binlog, we will close old one and create a new
  53. if f != nil {
  54. f.Close()
  55. }
  56. if len(filename) == 0 {
  57. return errors.Errorf("empty binlog filename for FormateDescriptionEvent")
  58. }
  59. f, err = os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
  60. if err != nil {
  61. return errors.Trace(err)
  62. }
  63. // write binlog header fe'bin'
  64. if _, err = f.Write(BinLogFileHeader); err != nil {
  65. return errors.Trace(err)
  66. }
  67. }
  68. if n, err := f.Write(e.RawData); err != nil {
  69. return errors.Trace(err)
  70. } else if n != len(e.RawData) {
  71. return errors.Trace(io.ErrShortWrite)
  72. }
  73. }
  74. return nil
  75. }