iterator.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bytes"
  19. "fmt"
  20. "sync"
  21. "sync/atomic"
  22. "time"
  23. "github.com/dgraph-io/badger/options"
  24. "github.com/dgraph-io/badger/y"
  25. farm "github.com/dgryski/go-farm"
  26. )
  27. type prefetchStatus uint8
  28. const (
  29. prefetched prefetchStatus = iota + 1
  30. )
  31. // Item is returned during iteration. Both the Key() and Value() output is only valid until
  32. // iterator.Next() is called.
  33. type Item struct {
  34. status prefetchStatus
  35. err error
  36. wg sync.WaitGroup
  37. db *DB
  38. key []byte
  39. vptr []byte
  40. meta byte // We need to store meta to know about bitValuePointer.
  41. userMeta byte
  42. expiresAt uint64
  43. val []byte
  44. slice *y.Slice // Used only during prefetching.
  45. next *Item
  46. version uint64
  47. txn *Txn
  48. }
  49. // String returns a string representation of Item
  50. func (item *Item) String() string {
  51. return fmt.Sprintf("key=%q, version=%d, meta=%x", item.Key(), item.Version(), item.meta)
  52. }
  53. // Deprecated
  54. // ToString returns a string representation of Item
  55. func (item *Item) ToString() string {
  56. return item.String()
  57. }
  58. // Key returns the key.
  59. //
  60. // Key is only valid as long as item is valid, or transaction is valid. If you need to use it
  61. // outside its validity, please use KeyCopy
  62. func (item *Item) Key() []byte {
  63. return item.key
  64. }
  65. // KeyCopy returns a copy of the key of the item, writing it to dst slice.
  66. // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
  67. // returned.
  68. func (item *Item) KeyCopy(dst []byte) []byte {
  69. return y.SafeCopy(dst, item.key)
  70. }
  71. // Version returns the commit timestamp of the item.
  72. func (item *Item) Version() uint64 {
  73. return item.version
  74. }
  75. // Value retrieves the value of the item from the value log.
  76. //
  77. // This method must be called within a transaction. Calling it outside a
  78. // transaction is considered undefined behavior. If an iterator is being used,
  79. // then Item.Value() is defined in the current iteration only, because items are
  80. // reused.
  81. //
  82. // If you need to use a value outside a transaction, please use Item.ValueCopy
  83. // instead, or copy it yourself. Value might change once discard or commit is called.
  84. // Use ValueCopy if you want to do a Set after Get.
  85. func (item *Item) Value() ([]byte, error) {
  86. item.wg.Wait()
  87. if item.status == prefetched {
  88. return item.val, item.err
  89. }
  90. buf, cb, err := item.yieldItemValue()
  91. if cb != nil {
  92. item.txn.callbacks = append(item.txn.callbacks, cb)
  93. }
  94. return buf, err
  95. }
  96. // ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
  97. // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
  98. // returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
  99. //
  100. // This function is useful in long running iterate/update transactions to avoid a write deadlock.
  101. // See Github issue: https://github.com/dgraph-io/badger/issues/315
  102. func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
  103. item.wg.Wait()
  104. if item.status == prefetched {
  105. return y.SafeCopy(dst, item.val), item.err
  106. }
  107. buf, cb, err := item.yieldItemValue()
  108. defer runCallback(cb)
  109. return y.SafeCopy(dst, buf), err
  110. }
  111. func (item *Item) hasValue() bool {
  112. if item.meta == 0 && item.vptr == nil {
  113. // key not found
  114. return false
  115. }
  116. return true
  117. }
  118. // IsDeletedOrExpired returns true if item contains deleted or expired value.
  119. func (item *Item) IsDeletedOrExpired() bool {
  120. return isDeletedOrExpired(item.meta, item.expiresAt)
  121. }
  122. func (item *Item) DiscardEarlierVersions() bool {
  123. return item.meta&bitDiscardEarlierVersions > 0
  124. }
  125. func (item *Item) yieldItemValue() ([]byte, func(), error) {
  126. key := item.Key() // No need to copy.
  127. for {
  128. if !item.hasValue() {
  129. return nil, nil, nil
  130. }
  131. if item.slice == nil {
  132. item.slice = new(y.Slice)
  133. }
  134. if (item.meta & bitValuePointer) == 0 {
  135. val := item.slice.Resize(len(item.vptr))
  136. copy(val, item.vptr)
  137. return val, nil, nil
  138. }
  139. var vp valuePointer
  140. vp.Decode(item.vptr)
  141. result, cb, err := item.db.vlog.Read(vp, item.slice)
  142. if err != ErrRetry || bytes.HasPrefix(key, badgerMove) {
  143. // The error is not retry, or we have already searched the move keyspace.
  144. return result, cb, err
  145. }
  146. // The value pointer is pointing to a deleted value log. Look for the
  147. // move key and read that instead.
  148. runCallback(cb)
  149. // Do not put badgerMove on the left in append. It seems to cause some sort of manipulation.
  150. key = append([]byte{}, badgerMove...)
  151. key = append(key, y.KeyWithTs(item.Key(), item.Version())...)
  152. // Note that we can't set item.key to move key, because that would
  153. // change the key user sees before and after this call. Also, this move
  154. // logic is internal logic and should not impact the external behavior
  155. // of the retrieval.
  156. vs, err := item.db.get(key)
  157. if err != nil {
  158. return nil, nil, err
  159. }
  160. if vs.Version != item.Version() {
  161. return nil, nil, nil
  162. }
  163. // Bug fix: Always copy the vs.Value into vptr here. Otherwise, when item is reused this
  164. // slice gets overwritten.
  165. item.vptr = y.SafeCopy(item.vptr, vs.Value)
  166. item.meta &^= bitValuePointer // Clear the value pointer bit.
  167. if vs.Meta&bitValuePointer > 0 {
  168. item.meta |= bitValuePointer // This meta would only be about value pointer.
  169. }
  170. }
  171. }
  172. func runCallback(cb func()) {
  173. if cb != nil {
  174. cb()
  175. }
  176. }
  177. func (item *Item) prefetchValue() {
  178. val, cb, err := item.yieldItemValue()
  179. defer runCallback(cb)
  180. item.err = err
  181. item.status = prefetched
  182. if val == nil {
  183. return
  184. }
  185. if item.db.opt.ValueLogLoadingMode == options.MemoryMap {
  186. buf := item.slice.Resize(len(val))
  187. copy(buf, val)
  188. item.val = buf
  189. } else {
  190. item.val = val
  191. }
  192. }
  193. // EstimatedSize returns approximate size of the key-value pair.
  194. //
  195. // This can be called while iterating through a store to quickly estimate the
  196. // size of a range of key-value pairs (without fetching the corresponding
  197. // values).
  198. func (item *Item) EstimatedSize() int64 {
  199. if !item.hasValue() {
  200. return 0
  201. }
  202. if (item.meta & bitValuePointer) == 0 {
  203. return int64(len(item.key) + len(item.vptr))
  204. }
  205. var vp valuePointer
  206. vp.Decode(item.vptr)
  207. return int64(vp.Len) // includes key length.
  208. }
  209. // UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user
  210. // is used to interpret the value.
  211. func (item *Item) UserMeta() byte {
  212. return item.userMeta
  213. }
  214. // ExpiresAt returns a Unix time value indicating when the item will be
  215. // considered expired. 0 indicates that the item will never expire.
  216. func (item *Item) ExpiresAt() uint64 {
  217. return item.expiresAt
  218. }
  219. // TODO: Switch this to use linked list container in Go.
  220. type list struct {
  221. head *Item
  222. tail *Item
  223. }
  224. func (l *list) push(i *Item) {
  225. i.next = nil
  226. if l.tail == nil {
  227. l.head = i
  228. l.tail = i
  229. return
  230. }
  231. l.tail.next = i
  232. l.tail = i
  233. }
  234. func (l *list) pop() *Item {
  235. if l.head == nil {
  236. return nil
  237. }
  238. i := l.head
  239. if l.head == l.tail {
  240. l.tail = nil
  241. l.head = nil
  242. } else {
  243. l.head = i.next
  244. }
  245. i.next = nil
  246. return i
  247. }
  248. // IteratorOptions is used to set options when iterating over Badger key-value
  249. // stores.
  250. //
  251. // This package provides DefaultIteratorOptions which contains options that
  252. // should work for most applications. Consider using that as a starting point
  253. // before customizing it for your own needs.
  254. type IteratorOptions struct {
  255. // Indicates whether we should prefetch values during iteration and store them.
  256. PrefetchValues bool
  257. // How many KV pairs to prefetch while iterating. Valid only if PrefetchValues is true.
  258. PrefetchSize int
  259. Reverse bool // Direction of iteration. False is forward, true is backward.
  260. AllVersions bool // Fetch all valid versions of the same key.
  261. internalAccess bool // Used to allow internal access to badger keys.
  262. }
  263. // DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
  264. var DefaultIteratorOptions = IteratorOptions{
  265. PrefetchValues: true,
  266. PrefetchSize: 100,
  267. Reverse: false,
  268. AllVersions: false,
  269. }
  270. // Iterator helps iterating over the KV pairs in a lexicographically sorted order.
  271. type Iterator struct {
  272. iitr *y.MergeIterator
  273. txn *Txn
  274. readTs uint64
  275. opt IteratorOptions
  276. item *Item
  277. data list
  278. waste list
  279. lastKey []byte // Used to skip over multiple versions of the same key.
  280. }
  281. // NewIterator returns a new iterator. Depending upon the options, either only keys, or both
  282. // key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
  283. // Using prefetch is highly recommended if you're doing a long running iteration.
  284. // Avoid long running iterations in update transactions.
  285. func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
  286. if atomic.AddInt32(&txn.numIterators, 1) > 1 {
  287. panic("Only one iterator can be active at one time.")
  288. }
  289. tables, decr := txn.db.getMemTables()
  290. defer decr()
  291. txn.db.vlog.incrIteratorCount()
  292. var iters []y.Iterator
  293. if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
  294. iters = append(iters, itr)
  295. }
  296. for i := 0; i < len(tables); i++ {
  297. iters = append(iters, tables[i].NewUniIterator(opt.Reverse))
  298. }
  299. iters = txn.db.lc.appendIterators(iters, opt.Reverse) // This will increment references.
  300. res := &Iterator{
  301. txn: txn,
  302. iitr: y.NewMergeIterator(iters, opt.Reverse),
  303. opt: opt,
  304. readTs: txn.readTs,
  305. }
  306. return res
  307. }
  308. func (it *Iterator) newItem() *Item {
  309. item := it.waste.pop()
  310. if item == nil {
  311. item = &Item{slice: new(y.Slice), db: it.txn.db, txn: it.txn}
  312. }
  313. return item
  314. }
  315. // Item returns pointer to the current key-value pair.
  316. // This item is only valid until it.Next() gets called.
  317. func (it *Iterator) Item() *Item {
  318. tx := it.txn
  319. if tx.update {
  320. // Track reads if this is an update txn.
  321. tx.reads = append(tx.reads, farm.Fingerprint64(it.item.Key()))
  322. }
  323. return it.item
  324. }
  325. // Valid returns false when iteration is done.
  326. func (it *Iterator) Valid() bool { return it.item != nil }
  327. // ValidForPrefix returns false when iteration is done
  328. // or when the current key is not prefixed by the specified prefix.
  329. func (it *Iterator) ValidForPrefix(prefix []byte) bool {
  330. return it.item != nil && bytes.HasPrefix(it.item.key, prefix)
  331. }
  332. // Close would close the iterator. It is important to call this when you're done with iteration.
  333. func (it *Iterator) Close() {
  334. it.iitr.Close()
  335. // It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
  336. // goroutines behind, which are waiting to acquire file read locks after DB has been closed.
  337. waitFor := func(l list) {
  338. item := l.pop()
  339. for item != nil {
  340. item.wg.Wait()
  341. item = l.pop()
  342. }
  343. }
  344. waitFor(it.waste)
  345. waitFor(it.data)
  346. // TODO: We could handle this error.
  347. _ = it.txn.db.vlog.decrIteratorCount()
  348. atomic.AddInt32(&it.txn.numIterators, -1)
  349. }
  350. // Next would advance the iterator by one. Always check it.Valid() after a Next()
  351. // to ensure you have access to a valid it.Item().
  352. func (it *Iterator) Next() {
  353. // Reuse current item
  354. it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
  355. it.waste.push(it.item)
  356. // Set next item to current
  357. it.item = it.data.pop()
  358. for it.iitr.Valid() {
  359. if it.parseItem() {
  360. // parseItem calls one extra next.
  361. // This is used to deal with the complexity of reverse iteration.
  362. break
  363. }
  364. }
  365. }
  366. func isDeletedOrExpired(meta byte, expiresAt uint64) bool {
  367. if meta&bitDelete > 0 {
  368. return true
  369. }
  370. if expiresAt == 0 {
  371. return false
  372. }
  373. return expiresAt <= uint64(time.Now().Unix())
  374. }
  375. // parseItem is a complex function because it needs to handle both forward and reverse iteration
  376. // implementation. We store keys such that their versions are sorted in descending order. This makes
  377. // forward iteration efficient, but revese iteration complicated. This tradeoff is better because
  378. // forward iteration is more common than reverse.
  379. //
  380. // This function advances the iterator.
  381. func (it *Iterator) parseItem() bool {
  382. mi := it.iitr
  383. key := mi.Key()
  384. setItem := func(item *Item) {
  385. if it.item == nil {
  386. it.item = item
  387. } else {
  388. it.data.push(item)
  389. }
  390. }
  391. // Skip badger keys.
  392. if !it.opt.internalAccess && bytes.HasPrefix(key, badgerPrefix) {
  393. mi.Next()
  394. return false
  395. }
  396. // Skip any versions which are beyond the readTs.
  397. version := y.ParseTs(key)
  398. if version > it.readTs {
  399. mi.Next()
  400. return false
  401. }
  402. if it.opt.AllVersions {
  403. // Return deleted or expired values also, otherwise user can't figure out
  404. // whether the key was deleted.
  405. item := it.newItem()
  406. it.fill(item)
  407. setItem(item)
  408. mi.Next()
  409. return true
  410. }
  411. // If iterating in forward direction, then just checking the last key against current key would
  412. // be sufficient.
  413. if !it.opt.Reverse {
  414. if y.SameKey(it.lastKey, key) {
  415. mi.Next()
  416. return false
  417. }
  418. // Only track in forward direction.
  419. // We should update lastKey as soon as we find a different key in our snapshot.
  420. // Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
  421. // Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
  422. // which is wrong. Therefore, update lastKey here.
  423. it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
  424. }
  425. FILL:
  426. // If deleted, advance and return.
  427. vs := mi.Value()
  428. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  429. mi.Next()
  430. return false
  431. }
  432. item := it.newItem()
  433. it.fill(item)
  434. // fill item based on current cursor position. All Next calls have returned, so reaching here
  435. // means no Next was called.
  436. mi.Next() // Advance but no fill item yet.
  437. if !it.opt.Reverse || !mi.Valid() { // Forward direction, or invalid.
  438. setItem(item)
  439. return true
  440. }
  441. // Reverse direction.
  442. nextTs := y.ParseTs(mi.Key())
  443. mik := y.ParseKey(mi.Key())
  444. if nextTs <= it.readTs && bytes.Equal(mik, item.key) {
  445. // This is a valid potential candidate.
  446. goto FILL
  447. }
  448. // Ignore the next candidate. Return the current one.
  449. setItem(item)
  450. return true
  451. }
  452. func (it *Iterator) fill(item *Item) {
  453. vs := it.iitr.Value()
  454. item.meta = vs.Meta
  455. item.userMeta = vs.UserMeta
  456. item.expiresAt = vs.ExpiresAt
  457. item.version = y.ParseTs(it.iitr.Key())
  458. item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))
  459. item.vptr = y.SafeCopy(item.vptr, vs.Value)
  460. item.val = nil
  461. if it.opt.PrefetchValues {
  462. item.wg.Add(1)
  463. go func() {
  464. // FIXME we are not handling errors here.
  465. item.prefetchValue()
  466. item.wg.Done()
  467. }()
  468. }
  469. }
  470. func (it *Iterator) prefetch() {
  471. prefetchSize := 2
  472. if it.opt.PrefetchValues && it.opt.PrefetchSize > 1 {
  473. prefetchSize = it.opt.PrefetchSize
  474. }
  475. i := it.iitr
  476. var count int
  477. it.item = nil
  478. for i.Valid() {
  479. if !it.parseItem() {
  480. continue
  481. }
  482. count++
  483. if count == prefetchSize {
  484. break
  485. }
  486. }
  487. }
  488. // Seek would seek to the provided key if present. If absent, it would seek to the next smallest key
  489. // greater than provided if iterating in the forward direction. Behavior would be reversed is
  490. // iterating backwards.
  491. func (it *Iterator) Seek(key []byte) {
  492. for i := it.data.pop(); i != nil; i = it.data.pop() {
  493. i.wg.Wait()
  494. it.waste.push(i)
  495. }
  496. it.lastKey = it.lastKey[:0]
  497. if len(key) == 0 {
  498. it.iitr.Rewind()
  499. it.prefetch()
  500. return
  501. }
  502. if !it.opt.Reverse {
  503. key = y.KeyWithTs(key, it.txn.readTs)
  504. } else {
  505. key = y.KeyWithTs(key, 0)
  506. }
  507. it.iitr.Seek(key)
  508. it.prefetch()
  509. }
  510. // Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
  511. // smallest key if iterating forward, and largest if iterating backward. It does not keep track of
  512. // whether the cursor started with a Seek().
  513. func (it *Iterator) Rewind() {
  514. i := it.data.pop()
  515. for i != nil {
  516. i.wg.Wait() // Just cleaner to wait before pushing. No ref counting needed.
  517. it.waste.push(i)
  518. i = it.data.pop()
  519. }
  520. it.lastKey = it.lastKey[:0]
  521. it.iitr.Rewind()
  522. it.prefetch()
  523. }