123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- /*
- * Copyright 2017 Dgraph Labs, Inc. and Contributors
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package y
- import (
- "bytes"
- "container/heap"
- "encoding/binary"
- "github.com/pkg/errors"
- )
- // ValueStruct represents the value info that can be associated with a key, but also the internal
- // Meta field.
- type ValueStruct struct {
- Meta byte
- UserMeta byte
- ExpiresAt uint64
- Value []byte
- Version uint64 // This field is not serialized. Only for internal usage.
- }
- func sizeVarint(x uint64) (n int) {
- for {
- n++
- x >>= 7
- if x == 0 {
- break
- }
- }
- return n
- }
- // EncodedSize is the size of the ValueStruct when encoded
- func (v *ValueStruct) EncodedSize() uint16 {
- sz := len(v.Value) + 2 // meta, usermeta.
- if v.ExpiresAt == 0 {
- return uint16(sz + 1)
- }
- enc := sizeVarint(v.ExpiresAt)
- return uint16(sz + enc)
- }
- // Decode uses the length of the slice to infer the length of the Value field.
- func (v *ValueStruct) Decode(b []byte) {
- v.Meta = b[0]
- v.UserMeta = b[1]
- var sz int
- v.ExpiresAt, sz = binary.Uvarint(b[2:])
- v.Value = b[2+sz:]
- }
- // Encode expects a slice of length at least v.EncodedSize().
- func (v *ValueStruct) Encode(b []byte) {
- b[0] = v.Meta
- b[1] = v.UserMeta
- sz := binary.PutUvarint(b[2:], v.ExpiresAt)
- copy(b[2+sz:], v.Value)
- }
- // EncodeTo should be kept in sync with the Encode function above. The reason
- // this function exists is to avoid creating byte arrays per key-value pair in
- // table/builder.go.
- func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) {
- buf.WriteByte(v.Meta)
- buf.WriteByte(v.UserMeta)
- var enc [binary.MaxVarintLen64]byte
- sz := binary.PutUvarint(enc[:], v.ExpiresAt)
- buf.Write(enc[:sz])
- buf.Write(v.Value)
- }
- // Iterator is an interface for a basic iterator.
- type Iterator interface {
- Next()
- Rewind()
- Seek(key []byte)
- Key() []byte
- Value() ValueStruct
- Valid() bool
- // All iterators should be closed so that file garbage collection works.
- Close() error
- }
- type elem struct {
- itr Iterator
- nice int
- reversed bool
- }
- type elemHeap []*elem
- func (eh elemHeap) Len() int { return len(eh) }
- func (eh elemHeap) Swap(i, j int) { eh[i], eh[j] = eh[j], eh[i] }
- func (eh *elemHeap) Push(x interface{}) { *eh = append(*eh, x.(*elem)) }
- func (eh *elemHeap) Pop() interface{} {
- // Remove the last element, because Go has already swapped 0th elem <-> last.
- old := *eh
- n := len(old)
- x := old[n-1]
- *eh = old[0 : n-1]
- return x
- }
- func (eh elemHeap) Less(i, j int) bool {
- cmp := CompareKeys(eh[i].itr.Key(), eh[j].itr.Key())
- if cmp < 0 {
- return !eh[i].reversed
- }
- if cmp > 0 {
- return eh[i].reversed
- }
- // The keys are equal. In this case, lower nice take precedence. This is important.
- return eh[i].nice < eh[j].nice
- }
- // MergeIterator merges multiple iterators.
- // NOTE: MergeIterator owns the array of iterators and is responsible for closing them.
- type MergeIterator struct {
- h elemHeap
- curKey []byte
- reversed bool
- all []Iterator
- }
- // NewMergeIterator returns a new MergeIterator from a list of Iterators.
- func NewMergeIterator(iters []Iterator, reversed bool) *MergeIterator {
- m := &MergeIterator{all: iters, reversed: reversed}
- m.h = make(elemHeap, 0, len(iters))
- m.initHeap()
- return m
- }
- func (s *MergeIterator) storeKey(smallest Iterator) {
- if cap(s.curKey) < len(smallest.Key()) {
- s.curKey = make([]byte, 2*len(smallest.Key()))
- }
- s.curKey = s.curKey[:len(smallest.Key())]
- copy(s.curKey, smallest.Key())
- }
- // initHeap checks all iterators and initializes our heap and array of keys.
- // Whenever we reverse direction, we need to run this.
- func (s *MergeIterator) initHeap() {
- s.h = s.h[:0]
- for idx, itr := range s.all {
- if !itr.Valid() {
- continue
- }
- e := &elem{itr: itr, nice: idx, reversed: s.reversed}
- s.h = append(s.h, e)
- }
- heap.Init(&s.h)
- for len(s.h) > 0 {
- it := s.h[0].itr
- if it == nil || !it.Valid() {
- heap.Pop(&s.h)
- continue
- }
- s.storeKey(s.h[0].itr)
- break
- }
- }
- // Valid returns whether the MergeIterator is at a valid element.
- func (s *MergeIterator) Valid() bool {
- if s == nil {
- return false
- }
- if len(s.h) == 0 {
- return false
- }
- return s.h[0].itr.Valid()
- }
- // Key returns the key associated with the current iterator
- func (s *MergeIterator) Key() []byte {
- if len(s.h) == 0 {
- return nil
- }
- return s.h[0].itr.Key()
- }
- // Value returns the value associated with the iterator.
- func (s *MergeIterator) Value() ValueStruct {
- if len(s.h) == 0 {
- return ValueStruct{}
- }
- return s.h[0].itr.Value()
- }
- // Next returns the next element. If it is the same as the current key, ignore it.
- func (s *MergeIterator) Next() {
- if len(s.h) == 0 {
- return
- }
- smallest := s.h[0].itr
- smallest.Next()
- for len(s.h) > 0 {
- smallest = s.h[0].itr
- if !smallest.Valid() {
- heap.Pop(&s.h)
- continue
- }
- heap.Fix(&s.h, 0)
- smallest = s.h[0].itr
- if smallest.Valid() {
- if !bytes.Equal(smallest.Key(), s.curKey) {
- break
- }
- smallest.Next()
- }
- }
- if !smallest.Valid() {
- return
- }
- s.storeKey(smallest)
- }
- // Rewind seeks to first element (or last element for reverse iterator).
- func (s *MergeIterator) Rewind() {
- for _, itr := range s.all {
- itr.Rewind()
- }
- s.initHeap()
- }
- // Seek brings us to element with key >= given key.
- func (s *MergeIterator) Seek(key []byte) {
- for _, itr := range s.all {
- itr.Seek(key)
- }
- s.initHeap()
- }
- // Close implements y.Iterator
- func (s *MergeIterator) Close() error {
- for _, itr := range s.all {
- if err := itr.Close(); err != nil {
- return errors.Wrap(err, "MergeIterator")
- }
- }
- return nil
- }
|