123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593 |
- /*
- * Copyright 2017 Dgraph Labs, Inc. and Contributors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package badger
- import (
- "bytes"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- "github.com/dgraph-io/badger/options"
- "github.com/dgraph-io/badger/y"
- farm "github.com/dgryski/go-farm"
- )
- type prefetchStatus uint8
- const (
- prefetched prefetchStatus = iota + 1
- )
- // Item is returned during iteration. Both the Key() and Value() output is only valid until
- // iterator.Next() is called.
- type Item struct {
- status prefetchStatus
- err error
- wg sync.WaitGroup
- db *DB
- key []byte
- vptr []byte
- meta byte // We need to store meta to know about bitValuePointer.
- userMeta byte
- expiresAt uint64
- val []byte
- slice *y.Slice // Used only during prefetching.
- next *Item
- version uint64
- txn *Txn
- }
- // String returns a string representation of Item
- func (item *Item) String() string {
- return fmt.Sprintf("key=%q, version=%d, meta=%x", item.Key(), item.Version(), item.meta)
- }
- // Deprecated
- // ToString returns a string representation of Item
- func (item *Item) ToString() string {
- return item.String()
- }
- // Key returns the key.
- //
- // Key is only valid as long as item is valid, or transaction is valid. If you need to use it
- // outside its validity, please use KeyCopy
- func (item *Item) Key() []byte {
- return item.key
- }
- // KeyCopy returns a copy of the key of the item, writing it to dst slice.
- // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
- // returned.
- func (item *Item) KeyCopy(dst []byte) []byte {
- return y.SafeCopy(dst, item.key)
- }
- // Version returns the commit timestamp of the item.
- func (item *Item) Version() uint64 {
- return item.version
- }
- // Value retrieves the value of the item from the value log.
- //
- // This method must be called within a transaction. Calling it outside a
- // transaction is considered undefined behavior. If an iterator is being used,
- // then Item.Value() is defined in the current iteration only, because items are
- // reused.
- //
- // If you need to use a value outside a transaction, please use Item.ValueCopy
- // instead, or copy it yourself. Value might change once discard or commit is called.
- // Use ValueCopy if you want to do a Set after Get.
- func (item *Item) Value() ([]byte, error) {
- item.wg.Wait()
- if item.status == prefetched {
- return item.val, item.err
- }
- buf, cb, err := item.yieldItemValue()
- if cb != nil {
- item.txn.callbacks = append(item.txn.callbacks, cb)
- }
- return buf, err
- }
- // ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
- // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
- // returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
- //
- // This function is useful in long running iterate/update transactions to avoid a write deadlock.
- // See Github issue: https://github.com/dgraph-io/badger/issues/315
- func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
- item.wg.Wait()
- if item.status == prefetched {
- return y.SafeCopy(dst, item.val), item.err
- }
- buf, cb, err := item.yieldItemValue()
- defer runCallback(cb)
- return y.SafeCopy(dst, buf), err
- }
- func (item *Item) hasValue() bool {
- if item.meta == 0 && item.vptr == nil {
- // key not found
- return false
- }
- return true
- }
- // IsDeletedOrExpired returns true if item contains deleted or expired value.
- func (item *Item) IsDeletedOrExpired() bool {
- return isDeletedOrExpired(item.meta, item.expiresAt)
- }
- func (item *Item) DiscardEarlierVersions() bool {
- return item.meta&bitDiscardEarlierVersions > 0
- }
- func (item *Item) yieldItemValue() ([]byte, func(), error) {
- key := item.Key() // No need to copy.
- for {
- if !item.hasValue() {
- return nil, nil, nil
- }
- if item.slice == nil {
- item.slice = new(y.Slice)
- }
- if (item.meta & bitValuePointer) == 0 {
- val := item.slice.Resize(len(item.vptr))
- copy(val, item.vptr)
- return val, nil, nil
- }
- var vp valuePointer
- vp.Decode(item.vptr)
- result, cb, err := item.db.vlog.Read(vp, item.slice)
- if err != ErrRetry || bytes.HasPrefix(key, badgerMove) {
- // The error is not retry, or we have already searched the move keyspace.
- return result, cb, err
- }
- // The value pointer is pointing to a deleted value log. Look for the
- // move key and read that instead.
- runCallback(cb)
- // Do not put badgerMove on the left in append. It seems to cause some sort of manipulation.
- key = append([]byte{}, badgerMove...)
- key = append(key, y.KeyWithTs(item.Key(), item.Version())...)
- // Note that we can't set item.key to move key, because that would
- // change the key user sees before and after this call. Also, this move
- // logic is internal logic and should not impact the external behavior
- // of the retrieval.
- vs, err := item.db.get(key)
- if err != nil {
- return nil, nil, err
- }
- if vs.Version != item.Version() {
- return nil, nil, nil
- }
- // Bug fix: Always copy the vs.Value into vptr here. Otherwise, when item is reused this
- // slice gets overwritten.
- item.vptr = y.SafeCopy(item.vptr, vs.Value)
- item.meta &^= bitValuePointer // Clear the value pointer bit.
- if vs.Meta&bitValuePointer > 0 {
- item.meta |= bitValuePointer // This meta would only be about value pointer.
- }
- }
- }
- func runCallback(cb func()) {
- if cb != nil {
- cb()
- }
- }
- func (item *Item) prefetchValue() {
- val, cb, err := item.yieldItemValue()
- defer runCallback(cb)
- item.err = err
- item.status = prefetched
- if val == nil {
- return
- }
- if item.db.opt.ValueLogLoadingMode == options.MemoryMap {
- buf := item.slice.Resize(len(val))
- copy(buf, val)
- item.val = buf
- } else {
- item.val = val
- }
- }
- // EstimatedSize returns approximate size of the key-value pair.
- //
- // This can be called while iterating through a store to quickly estimate the
- // size of a range of key-value pairs (without fetching the corresponding
- // values).
- func (item *Item) EstimatedSize() int64 {
- if !item.hasValue() {
- return 0
- }
- if (item.meta & bitValuePointer) == 0 {
- return int64(len(item.key) + len(item.vptr))
- }
- var vp valuePointer
- vp.Decode(item.vptr)
- return int64(vp.Len) // includes key length.
- }
- // UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user
- // is used to interpret the value.
- func (item *Item) UserMeta() byte {
- return item.userMeta
- }
- // ExpiresAt returns a Unix time value indicating when the item will be
- // considered expired. 0 indicates that the item will never expire.
- func (item *Item) ExpiresAt() uint64 {
- return item.expiresAt
- }
- // TODO: Switch this to use linked list container in Go.
- type list struct {
- head *Item
- tail *Item
- }
- func (l *list) push(i *Item) {
- i.next = nil
- if l.tail == nil {
- l.head = i
- l.tail = i
- return
- }
- l.tail.next = i
- l.tail = i
- }
- func (l *list) pop() *Item {
- if l.head == nil {
- return nil
- }
- i := l.head
- if l.head == l.tail {
- l.tail = nil
- l.head = nil
- } else {
- l.head = i.next
- }
- i.next = nil
- return i
- }
- // IteratorOptions is used to set options when iterating over Badger key-value
- // stores.
- //
- // This package provides DefaultIteratorOptions which contains options that
- // should work for most applications. Consider using that as a starting point
- // before customizing it for your own needs.
- type IteratorOptions struct {
- // Indicates whether we should prefetch values during iteration and store them.
- PrefetchValues bool
- // How many KV pairs to prefetch while iterating. Valid only if PrefetchValues is true.
- PrefetchSize int
- Reverse bool // Direction of iteration. False is forward, true is backward.
- AllVersions bool // Fetch all valid versions of the same key.
- internalAccess bool // Used to allow internal access to badger keys.
- }
- // DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
- var DefaultIteratorOptions = IteratorOptions{
- PrefetchValues: true,
- PrefetchSize: 100,
- Reverse: false,
- AllVersions: false,
- }
- // Iterator helps iterating over the KV pairs in a lexicographically sorted order.
- type Iterator struct {
- iitr *y.MergeIterator
- txn *Txn
- readTs uint64
- opt IteratorOptions
- item *Item
- data list
- waste list
- lastKey []byte // Used to skip over multiple versions of the same key.
- }
- // NewIterator returns a new iterator. Depending upon the options, either only keys, or both
- // key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
- // Using prefetch is highly recommended if you're doing a long running iteration.
- // Avoid long running iterations in update transactions.
- func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
- if atomic.AddInt32(&txn.numIterators, 1) > 1 {
- panic("Only one iterator can be active at one time.")
- }
- tables, decr := txn.db.getMemTables()
- defer decr()
- txn.db.vlog.incrIteratorCount()
- var iters []y.Iterator
- if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
- iters = append(iters, itr)
- }
- for i := 0; i < len(tables); i++ {
- iters = append(iters, tables[i].NewUniIterator(opt.Reverse))
- }
- iters = txn.db.lc.appendIterators(iters, opt.Reverse) // This will increment references.
- res := &Iterator{
- txn: txn,
- iitr: y.NewMergeIterator(iters, opt.Reverse),
- opt: opt,
- readTs: txn.readTs,
- }
- return res
- }
- func (it *Iterator) newItem() *Item {
- item := it.waste.pop()
- if item == nil {
- item = &Item{slice: new(y.Slice), db: it.txn.db, txn: it.txn}
- }
- return item
- }
- // Item returns pointer to the current key-value pair.
- // This item is only valid until it.Next() gets called.
- func (it *Iterator) Item() *Item {
- tx := it.txn
- if tx.update {
- // Track reads if this is an update txn.
- tx.reads = append(tx.reads, farm.Fingerprint64(it.item.Key()))
- }
- return it.item
- }
- // Valid returns false when iteration is done.
- func (it *Iterator) Valid() bool { return it.item != nil }
- // ValidForPrefix returns false when iteration is done
- // or when the current key is not prefixed by the specified prefix.
- func (it *Iterator) ValidForPrefix(prefix []byte) bool {
- return it.item != nil && bytes.HasPrefix(it.item.key, prefix)
- }
- // Close would close the iterator. It is important to call this when you're done with iteration.
- func (it *Iterator) Close() {
- it.iitr.Close()
- // It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
- // goroutines behind, which are waiting to acquire file read locks after DB has been closed.
- waitFor := func(l list) {
- item := l.pop()
- for item != nil {
- item.wg.Wait()
- item = l.pop()
- }
- }
- waitFor(it.waste)
- waitFor(it.data)
- // TODO: We could handle this error.
- _ = it.txn.db.vlog.decrIteratorCount()
- atomic.AddInt32(&it.txn.numIterators, -1)
- }
- // Next would advance the iterator by one. Always check it.Valid() after a Next()
- // to ensure you have access to a valid it.Item().
- func (it *Iterator) Next() {
- // Reuse current item
- it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
- it.waste.push(it.item)
- // Set next item to current
- it.item = it.data.pop()
- for it.iitr.Valid() {
- if it.parseItem() {
- // parseItem calls one extra next.
- // This is used to deal with the complexity of reverse iteration.
- break
- }
- }
- }
- func isDeletedOrExpired(meta byte, expiresAt uint64) bool {
- if meta&bitDelete > 0 {
- return true
- }
- if expiresAt == 0 {
- return false
- }
- return expiresAt <= uint64(time.Now().Unix())
- }
- // parseItem is a complex function because it needs to handle both forward and reverse iteration
- // implementation. We store keys such that their versions are sorted in descending order. This makes
- // forward iteration efficient, but revese iteration complicated. This tradeoff is better because
- // forward iteration is more common than reverse.
- //
- // This function advances the iterator.
- func (it *Iterator) parseItem() bool {
- mi := it.iitr
- key := mi.Key()
- setItem := func(item *Item) {
- if it.item == nil {
- it.item = item
- } else {
- it.data.push(item)
- }
- }
- // Skip badger keys.
- if !it.opt.internalAccess && bytes.HasPrefix(key, badgerPrefix) {
- mi.Next()
- return false
- }
- // Skip any versions which are beyond the readTs.
- version := y.ParseTs(key)
- if version > it.readTs {
- mi.Next()
- return false
- }
- if it.opt.AllVersions {
- // Return deleted or expired values also, otherwise user can't figure out
- // whether the key was deleted.
- item := it.newItem()
- it.fill(item)
- setItem(item)
- mi.Next()
- return true
- }
- // If iterating in forward direction, then just checking the last key against current key would
- // be sufficient.
- if !it.opt.Reverse {
- if y.SameKey(it.lastKey, key) {
- mi.Next()
- return false
- }
- // Only track in forward direction.
- // We should update lastKey as soon as we find a different key in our snapshot.
- // Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
- // Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
- // which is wrong. Therefore, update lastKey here.
- it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
- }
- FILL:
- // If deleted, advance and return.
- vs := mi.Value()
- if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
- mi.Next()
- return false
- }
- item := it.newItem()
- it.fill(item)
- // fill item based on current cursor position. All Next calls have returned, so reaching here
- // means no Next was called.
- mi.Next() // Advance but no fill item yet.
- if !it.opt.Reverse || !mi.Valid() { // Forward direction, or invalid.
- setItem(item)
- return true
- }
- // Reverse direction.
- nextTs := y.ParseTs(mi.Key())
- mik := y.ParseKey(mi.Key())
- if nextTs <= it.readTs && bytes.Equal(mik, item.key) {
- // This is a valid potential candidate.
- goto FILL
- }
- // Ignore the next candidate. Return the current one.
- setItem(item)
- return true
- }
- func (it *Iterator) fill(item *Item) {
- vs := it.iitr.Value()
- item.meta = vs.Meta
- item.userMeta = vs.UserMeta
- item.expiresAt = vs.ExpiresAt
- item.version = y.ParseTs(it.iitr.Key())
- item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))
- item.vptr = y.SafeCopy(item.vptr, vs.Value)
- item.val = nil
- if it.opt.PrefetchValues {
- item.wg.Add(1)
- go func() {
- // FIXME we are not handling errors here.
- item.prefetchValue()
- item.wg.Done()
- }()
- }
- }
- func (it *Iterator) prefetch() {
- prefetchSize := 2
- if it.opt.PrefetchValues && it.opt.PrefetchSize > 1 {
- prefetchSize = it.opt.PrefetchSize
- }
- i := it.iitr
- var count int
- it.item = nil
- for i.Valid() {
- if !it.parseItem() {
- continue
- }
- count++
- if count == prefetchSize {
- break
- }
- }
- }
- // Seek would seek to the provided key if present. If absent, it would seek to the next smallest key
- // greater than provided if iterating in the forward direction. Behavior would be reversed is
- // iterating backwards.
- func (it *Iterator) Seek(key []byte) {
- for i := it.data.pop(); i != nil; i = it.data.pop() {
- i.wg.Wait()
- it.waste.push(i)
- }
- it.lastKey = it.lastKey[:0]
- if len(key) == 0 {
- it.iitr.Rewind()
- it.prefetch()
- return
- }
- if !it.opt.Reverse {
- key = y.KeyWithTs(key, it.txn.readTs)
- } else {
- key = y.KeyWithTs(key, 0)
- }
- it.iitr.Seek(key)
- it.prefetch()
- }
- // Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
- // smallest key if iterating forward, and largest if iterating backward. It does not keep track of
- // whether the cursor started with a Seek().
- func (it *Iterator) Rewind() {
- i := it.data.pop()
- for i != nil {
- i.wg.Wait() // Just cleaner to wait before pushing. No ref counting needed.
- it.waste.push(i)
- i = it.data.pop()
- }
- it.lastKey = it.lastKey[:0]
- it.iitr.Rewind()
- it.prefetch()
- }
|