dump.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package dump
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "os/exec"
  7. "strings"
  8. "github.com/juju/errors"
  9. . "github.com/siddontang/go-mysql/mysql"
  10. )
  11. // Unlick mysqldump, Dumper is designed for parsing and syning data easily.
  12. type Dumper struct {
  13. // mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
  14. ExecutionPath string
  15. Addr string
  16. User string
  17. Password string
  18. // Will override Databases
  19. Tables []string
  20. TableDB string
  21. Databases []string
  22. Charset string
  23. IgnoreTables map[string][]string
  24. ErrOut io.Writer
  25. masterDataSkipped bool
  26. maxAllowedPacket int
  27. }
  28. func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) {
  29. if len(executionPath) == 0 {
  30. return nil, nil
  31. }
  32. path, err := exec.LookPath(executionPath)
  33. if err != nil {
  34. return nil, errors.Trace(err)
  35. }
  36. d := new(Dumper)
  37. d.ExecutionPath = path
  38. d.Addr = addr
  39. d.User = user
  40. d.Password = password
  41. d.Tables = make([]string, 0, 16)
  42. d.Databases = make([]string, 0, 16)
  43. d.Charset = DEFAULT_CHARSET
  44. d.IgnoreTables = make(map[string][]string)
  45. d.masterDataSkipped = false
  46. d.ErrOut = os.Stderr
  47. return d, nil
  48. }
  49. func (d *Dumper) SetCharset(charset string) {
  50. d.Charset = charset
  51. }
  52. func (d *Dumper) SetErrOut(o io.Writer) {
  53. d.ErrOut = o
  54. }
  55. // In some cloud MySQL, we have no privilege to use `--master-data`.
  56. func (d *Dumper) SkipMasterData(v bool) {
  57. d.masterDataSkipped = v
  58. }
  59. func (d *Dumper) SetMaxAllowedPacket(i int) {
  60. d.maxAllowedPacket = i
  61. }
  62. func (d *Dumper) AddDatabases(dbs ...string) {
  63. d.Databases = append(d.Databases, dbs...)
  64. }
  65. func (d *Dumper) AddTables(db string, tables ...string) {
  66. if d.TableDB != db {
  67. d.TableDB = db
  68. d.Tables = d.Tables[0:0]
  69. }
  70. d.Tables = append(d.Tables, tables...)
  71. }
  72. func (d *Dumper) AddIgnoreTables(db string, tables ...string) {
  73. t, _ := d.IgnoreTables[db]
  74. t = append(t, tables...)
  75. d.IgnoreTables[db] = t
  76. }
  77. func (d *Dumper) Reset() {
  78. d.Tables = d.Tables[0:0]
  79. d.TableDB = ""
  80. d.IgnoreTables = make(map[string][]string)
  81. d.Databases = d.Databases[0:0]
  82. }
  83. func (d *Dumper) Dump(w io.Writer) error {
  84. args := make([]string, 0, 16)
  85. // Common args
  86. seps := strings.Split(d.Addr, ":")
  87. args = append(args, fmt.Sprintf("--host=%s", seps[0]))
  88. if len(seps) > 1 {
  89. args = append(args, fmt.Sprintf("--port=%s", seps[1]))
  90. }
  91. args = append(args, fmt.Sprintf("--user=%s", d.User))
  92. args = append(args, fmt.Sprintf("--password=%s", d.Password))
  93. if !d.masterDataSkipped {
  94. args = append(args, "--master-data")
  95. }
  96. if d.maxAllowedPacket > 0 {
  97. // mysqldump param should be --max-allowed-packet=%dM not be --max_allowed_packet=%dM
  98. args = append(args, fmt.Sprintf("--max-allowed-packet=%dM", d.maxAllowedPacket))
  99. }
  100. args = append(args, "--single-transaction")
  101. args = append(args, "--skip-lock-tables")
  102. // Disable uncessary data
  103. args = append(args, "--compact")
  104. args = append(args, "--skip-opt")
  105. args = append(args, "--quick")
  106. // We only care about data
  107. args = append(args, "--no-create-info")
  108. // Multi row is easy for us to parse the data
  109. args = append(args, "--skip-extended-insert")
  110. for db, tables := range d.IgnoreTables {
  111. for _, table := range tables {
  112. args = append(args, fmt.Sprintf("--ignore-table=%s.%s", db, table))
  113. }
  114. }
  115. if len(d.Tables) == 0 && len(d.Databases) == 0 {
  116. args = append(args, "--all-databases")
  117. } else if len(d.Tables) == 0 {
  118. args = append(args, "--databases")
  119. args = append(args, d.Databases...)
  120. } else {
  121. args = append(args, d.TableDB)
  122. args = append(args, d.Tables...)
  123. // If we only dump some tables, the dump data will not have database name
  124. // which makes us hard to parse, so here we add it manually.
  125. w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB)))
  126. }
  127. if len(d.Charset) != 0 {
  128. args = append(args, fmt.Sprintf("--default-character-set=%s", d.Charset))
  129. }
  130. cmd := exec.Command(d.ExecutionPath, args...)
  131. cmd.Stderr = d.ErrOut
  132. cmd.Stdout = w
  133. return cmd.Run()
  134. }
  135. // Dump MySQL and parse immediately
  136. func (d *Dumper) DumpAndParse(h ParseHandler) error {
  137. r, w := io.Pipe()
  138. done := make(chan error, 1)
  139. go func() {
  140. err := Parse(r, h, !d.masterDataSkipped)
  141. r.CloseWithError(err)
  142. done <- err
  143. }()
  144. err := d.Dump(w)
  145. w.CloseWithError(err)
  146. err = <-done
  147. return errors.Trace(err)
  148. }