managed_db.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "math"
  19. "sync"
  20. "sync/atomic"
  21. "time"
  22. "github.com/dgraph-io/badger/y"
  23. "github.com/pkg/errors"
  24. )
  25. // ManagedDB allows end users to manage the transactions themselves. Transaction
  26. // start and commit timestamps are set by end-user.
  27. //
  28. // This is only useful for databases built on top of Badger (like Dgraph), and
  29. // can be ignored by most users.
  30. //
  31. // WARNING: This is an experimental feature and may be changed significantly in
  32. // a future release. So please proceed with caution.
  33. type ManagedDB struct {
  34. *DB
  35. }
  36. // OpenManaged returns a new ManagedDB, which allows more control over setting
  37. // transaction timestamps.
  38. //
  39. // This is only useful for databases built on top of Badger (like Dgraph), and
  40. // can be ignored by most users.
  41. func OpenManaged(opts Options) (*ManagedDB, error) {
  42. opts.managedTxns = true
  43. db, err := Open(opts)
  44. if err != nil {
  45. return nil, err
  46. }
  47. return &ManagedDB{db}, nil
  48. }
  49. // NewTransaction overrides DB.NewTransaction() and panics when invoked. Use
  50. // NewTransactionAt() instead.
  51. func (db *ManagedDB) NewTransaction(update bool) {
  52. panic("Cannot use NewTransaction() for ManagedDB. Use NewTransactionAt() instead.")
  53. }
  54. // NewTransactionAt follows the same logic as DB.NewTransaction(), but uses the
  55. // provided read timestamp.
  56. //
  57. // This is only useful for databases built on top of Badger (like Dgraph), and
  58. // can be ignored by most users.
  59. func (db *ManagedDB) NewTransactionAt(readTs uint64, update bool) *Txn {
  60. txn := db.DB.NewTransaction(update)
  61. txn.readTs = readTs
  62. return txn
  63. }
  64. // CommitAt commits the transaction, following the same logic as Commit(), but
  65. // at the given commit timestamp. This will panic if not used with ManagedDB.
  66. //
  67. // This is only useful for databases built on top of Badger (like Dgraph), and
  68. // can be ignored by most users.
  69. func (txn *Txn) CommitAt(commitTs uint64, callback func(error)) error {
  70. if !txn.db.opt.managedTxns {
  71. return ErrManagedTxn
  72. }
  73. txn.commitTs = commitTs
  74. return txn.Commit(callback)
  75. }
  76. // GetSequence is not supported on ManagedDB. Calling this would result
  77. // in a panic.
  78. func (db *ManagedDB) GetSequence(_ []byte, _ uint64) (*Sequence, error) {
  79. panic("Cannot use GetSequence for ManagedDB.")
  80. }
  81. // SetDiscardTs sets a timestamp at or below which, any invalid or deleted
  82. // versions can be discarded from the LSM tree, and thence from the value log to
  83. // reclaim disk space.
  84. func (db *ManagedDB) SetDiscardTs(ts uint64) {
  85. db.orc.setDiscardTs(ts)
  86. }
  87. var errDone = errors.New("Done deleting keys")
  88. // DropAll would drop all the data stored in Badger. It does this in the following way.
  89. // - Stop accepting new writes.
  90. // - Pause the compactions.
  91. // - Pick all tables from all levels, create a changeset to delete all these
  92. // tables and apply it to manifest. DO not pick up the latest table from level
  93. // 0, to preserve the (persistent) badgerHead key.
  94. // - Iterate over the KVs in Level 0, and run deletes on them via transactions.
  95. // - The deletions are done at the same timestamp as the latest version of the
  96. // key. Thus, we could write the keys back at the same timestamp as before.
  97. func (db *ManagedDB) DropAll() error {
  98. // Stop accepting new writes.
  99. atomic.StoreInt32(&db.blockWrites, 1)
  100. // Wait for writeCh to reach size of zero. This is not ideal, but a very
  101. // simple way to allow writeCh to flush out, before we proceed.
  102. tick := time.NewTicker(100 * time.Millisecond)
  103. for range tick.C {
  104. if len(db.writeCh) == 0 {
  105. break
  106. }
  107. }
  108. tick.Stop()
  109. // Stop the compactions.
  110. if db.closers.compactors != nil {
  111. db.closers.compactors.SignalAndWait()
  112. }
  113. _, err := db.lc.deleteLSMTree()
  114. // Allow writes so that we can run transactions. Ideally, the user must ensure that they're not
  115. // doing more writes concurrently while this operation is happening.
  116. atomic.StoreInt32(&db.blockWrites, 0)
  117. // Need compactions to happen so deletes below can be flushed out.
  118. if db.closers.compactors != nil {
  119. db.closers.compactors = y.NewCloser(1)
  120. db.lc.startCompact(db.closers.compactors)
  121. }
  122. if err != nil {
  123. return err
  124. }
  125. type KV struct {
  126. key []byte
  127. version uint64
  128. }
  129. var kvs []KV
  130. getKeys := func() error {
  131. txn := db.NewTransactionAt(math.MaxUint64, false)
  132. defer txn.Discard()
  133. opts := DefaultIteratorOptions
  134. opts.PrefetchValues = false
  135. itr := txn.NewIterator(opts)
  136. defer itr.Close()
  137. for itr.Rewind(); itr.Valid(); itr.Next() {
  138. item := itr.Item()
  139. kvs = append(kvs, KV{item.KeyCopy(nil), item.Version()})
  140. }
  141. return nil
  142. }
  143. if err := getKeys(); err != nil {
  144. return err
  145. }
  146. var wg sync.WaitGroup
  147. errCh := make(chan error, 1)
  148. for _, kv := range kvs {
  149. wg.Add(1)
  150. txn := db.NewTransactionAt(math.MaxUint64, true)
  151. if err := txn.Delete(kv.key); err != nil {
  152. return err
  153. }
  154. if err := txn.CommitAt(kv.version, func(rerr error) {
  155. if rerr != nil {
  156. select {
  157. case errCh <- rerr:
  158. default:
  159. }
  160. }
  161. wg.Done()
  162. }); err != nil {
  163. return err
  164. }
  165. }
  166. wg.Wait()
  167. select {
  168. case err := <-errCh:
  169. return err
  170. default:
  171. return nil
  172. }
  173. }