backup.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bufio"
  19. "encoding/binary"
  20. "io"
  21. "log"
  22. "sync"
  23. "github.com/dgraph-io/badger/y"
  24. "github.com/dgraph-io/badger/protos"
  25. )
  26. func writeTo(entry *protos.KVPair, w io.Writer) error {
  27. if err := binary.Write(w, binary.LittleEndian, uint64(entry.Size())); err != nil {
  28. return err
  29. }
  30. buf, err := entry.Marshal()
  31. if err != nil {
  32. return err
  33. }
  34. _, err = w.Write(buf)
  35. return err
  36. }
  37. // Backup dumps a protobuf-encoded list of all entries in the database into the
  38. // given writer, that are newer than the specified version. It returns a
  39. // timestamp indicating when the entries were dumped which can be passed into a
  40. // later invocation to generate an incremental dump, of entries that have been
  41. // added/modified since the last invocation of DB.Backup()
  42. //
  43. // This can be used to backup the data in a database at a given point in time.
  44. func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
  45. var tsNew uint64
  46. err := db.View(func(txn *Txn) error {
  47. opts := DefaultIteratorOptions
  48. opts.AllVersions = true
  49. it := txn.NewIterator(opts)
  50. defer it.Close()
  51. for it.Rewind(); it.Valid(); it.Next() {
  52. item := it.Item()
  53. if item.Version() < since {
  54. // Ignore versions less than given timestamp
  55. continue
  56. }
  57. val, err := item.Value()
  58. if err != nil {
  59. log.Printf("Key [%x]. Error while fetching value [%v]\n", item.Key(), err)
  60. continue
  61. }
  62. entry := &protos.KVPair{
  63. Key: y.Copy(item.Key()),
  64. Value: y.Copy(val),
  65. UserMeta: []byte{item.UserMeta()},
  66. Version: item.Version(),
  67. ExpiresAt: item.ExpiresAt(),
  68. }
  69. // Write entries to disk
  70. if err := writeTo(entry, w); err != nil {
  71. return err
  72. }
  73. }
  74. tsNew = txn.readTs
  75. return nil
  76. })
  77. return tsNew, err
  78. }
  79. // Load reads a protobuf-encoded list of all entries from a reader and writes
  80. // them to the database. This can be used to restore the database from a backup
  81. // made by calling DB.Backup().
  82. //
  83. // DB.Load() should be called on a database that is not running any other
  84. // concurrent transactions while it is running.
  85. func (db *DB) Load(r io.Reader) error {
  86. br := bufio.NewReaderSize(r, 16<<10)
  87. unmarshalBuf := make([]byte, 1<<10)
  88. var entries []*Entry
  89. var wg sync.WaitGroup
  90. errChan := make(chan error, 1)
  91. // func to check for pending error before sending off a batch for writing
  92. batchSetAsyncIfNoErr := func(entries []*Entry) error {
  93. select {
  94. case err := <-errChan:
  95. return err
  96. default:
  97. wg.Add(1)
  98. return db.batchSetAsync(entries, func(err error) {
  99. defer wg.Done()
  100. if err != nil {
  101. select {
  102. case errChan <- err:
  103. default:
  104. }
  105. }
  106. })
  107. }
  108. }
  109. for {
  110. var sz uint64
  111. err := binary.Read(br, binary.LittleEndian, &sz)
  112. if err == io.EOF {
  113. break
  114. } else if err != nil {
  115. return err
  116. }
  117. if cap(unmarshalBuf) < int(sz) {
  118. unmarshalBuf = make([]byte, sz)
  119. }
  120. e := &protos.KVPair{}
  121. if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil {
  122. return err
  123. }
  124. if err = e.Unmarshal(unmarshalBuf[:sz]); err != nil {
  125. return err
  126. }
  127. entries = append(entries, &Entry{
  128. Key: y.KeyWithTs(e.Key, e.Version),
  129. Value: e.Value,
  130. UserMeta: e.UserMeta[0],
  131. ExpiresAt: e.ExpiresAt,
  132. })
  133. // Update nextCommit, memtable stores this timestamp in badger head
  134. // when flushed.
  135. if e.Version >= db.orc.commitTs() {
  136. db.orc.nextCommit = e.Version + 1
  137. }
  138. if len(entries) == 1000 {
  139. if err := batchSetAsyncIfNoErr(entries); err != nil {
  140. return err
  141. }
  142. entries = make([]*Entry, 0, 1000)
  143. }
  144. }
  145. if len(entries) > 0 {
  146. if err := batchSetAsyncIfNoErr(entries); err != nil {
  147. return err
  148. }
  149. }
  150. wg.Wait()
  151. select {
  152. case err := <-errChan:
  153. return err
  154. default:
  155. db.orc.curRead = db.orc.commitTs() - 1
  156. return nil
  157. }
  158. }