iterator.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package y
  17. import (
  18. "bytes"
  19. "container/heap"
  20. "encoding/binary"
  21. "github.com/pkg/errors"
  22. )
  23. // ValueStruct represents the value info that can be associated with a key, but also the internal
  24. // Meta field.
  25. type ValueStruct struct {
  26. Meta byte
  27. UserMeta byte
  28. ExpiresAt uint64
  29. Value []byte
  30. Version uint64 // This field is not serialized. Only for internal usage.
  31. }
  32. func sizeVarint(x uint64) (n int) {
  33. for {
  34. n++
  35. x >>= 7
  36. if x == 0 {
  37. break
  38. }
  39. }
  40. return n
  41. }
  42. // EncodedSize is the size of the ValueStruct when encoded
  43. func (v *ValueStruct) EncodedSize() uint16 {
  44. sz := len(v.Value) + 2 // meta, usermeta.
  45. if v.ExpiresAt == 0 {
  46. return uint16(sz + 1)
  47. }
  48. enc := sizeVarint(v.ExpiresAt)
  49. return uint16(sz + enc)
  50. }
  51. // Decode uses the length of the slice to infer the length of the Value field.
  52. func (v *ValueStruct) Decode(b []byte) {
  53. v.Meta = b[0]
  54. v.UserMeta = b[1]
  55. var sz int
  56. v.ExpiresAt, sz = binary.Uvarint(b[2:])
  57. v.Value = b[2+sz:]
  58. }
  59. // Encode expects a slice of length at least v.EncodedSize().
  60. func (v *ValueStruct) Encode(b []byte) {
  61. b[0] = v.Meta
  62. b[1] = v.UserMeta
  63. sz := binary.PutUvarint(b[2:], v.ExpiresAt)
  64. copy(b[2+sz:], v.Value)
  65. }
  66. // EncodeTo should be kept in sync with the Encode function above. The reason
  67. // this function exists is to avoid creating byte arrays per key-value pair in
  68. // table/builder.go.
  69. func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) {
  70. buf.WriteByte(v.Meta)
  71. buf.WriteByte(v.UserMeta)
  72. var enc [binary.MaxVarintLen64]byte
  73. sz := binary.PutUvarint(enc[:], v.ExpiresAt)
  74. buf.Write(enc[:sz])
  75. buf.Write(v.Value)
  76. }
  77. // Iterator is an interface for a basic iterator.
  78. type Iterator interface {
  79. Next()
  80. Rewind()
  81. Seek(key []byte)
  82. Key() []byte
  83. Value() ValueStruct
  84. Valid() bool
  85. // All iterators should be closed so that file garbage collection works.
  86. Close() error
  87. }
  88. type elem struct {
  89. itr Iterator
  90. nice int
  91. reversed bool
  92. }
  93. type elemHeap []*elem
  94. func (eh elemHeap) Len() int { return len(eh) }
  95. func (eh elemHeap) Swap(i, j int) { eh[i], eh[j] = eh[j], eh[i] }
  96. func (eh *elemHeap) Push(x interface{}) { *eh = append(*eh, x.(*elem)) }
  97. func (eh *elemHeap) Pop() interface{} {
  98. // Remove the last element, because Go has already swapped 0th elem <-> last.
  99. old := *eh
  100. n := len(old)
  101. x := old[n-1]
  102. *eh = old[0 : n-1]
  103. return x
  104. }
  105. func (eh elemHeap) Less(i, j int) bool {
  106. cmp := CompareKeys(eh[i].itr.Key(), eh[j].itr.Key())
  107. if cmp < 0 {
  108. return !eh[i].reversed
  109. }
  110. if cmp > 0 {
  111. return eh[i].reversed
  112. }
  113. // The keys are equal. In this case, lower nice take precedence. This is important.
  114. return eh[i].nice < eh[j].nice
  115. }
  116. // MergeIterator merges multiple iterators.
  117. // NOTE: MergeIterator owns the array of iterators and is responsible for closing them.
  118. type MergeIterator struct {
  119. h elemHeap
  120. curKey []byte
  121. reversed bool
  122. all []Iterator
  123. }
  124. // NewMergeIterator returns a new MergeIterator from a list of Iterators.
  125. func NewMergeIterator(iters []Iterator, reversed bool) *MergeIterator {
  126. m := &MergeIterator{all: iters, reversed: reversed}
  127. m.h = make(elemHeap, 0, len(iters))
  128. m.initHeap()
  129. return m
  130. }
  131. func (s *MergeIterator) storeKey(smallest Iterator) {
  132. if cap(s.curKey) < len(smallest.Key()) {
  133. s.curKey = make([]byte, 2*len(smallest.Key()))
  134. }
  135. s.curKey = s.curKey[:len(smallest.Key())]
  136. copy(s.curKey, smallest.Key())
  137. }
  138. // initHeap checks all iterators and initializes our heap and array of keys.
  139. // Whenever we reverse direction, we need to run this.
  140. func (s *MergeIterator) initHeap() {
  141. s.h = s.h[:0]
  142. for idx, itr := range s.all {
  143. if !itr.Valid() {
  144. continue
  145. }
  146. e := &elem{itr: itr, nice: idx, reversed: s.reversed}
  147. s.h = append(s.h, e)
  148. }
  149. heap.Init(&s.h)
  150. for len(s.h) > 0 {
  151. it := s.h[0].itr
  152. if it == nil || !it.Valid() {
  153. heap.Pop(&s.h)
  154. continue
  155. }
  156. s.storeKey(s.h[0].itr)
  157. break
  158. }
  159. }
  160. // Valid returns whether the MergeIterator is at a valid element.
  161. func (s *MergeIterator) Valid() bool {
  162. if s == nil {
  163. return false
  164. }
  165. if len(s.h) == 0 {
  166. return false
  167. }
  168. return s.h[0].itr.Valid()
  169. }
  170. // Key returns the key associated with the current iterator
  171. func (s *MergeIterator) Key() []byte {
  172. if len(s.h) == 0 {
  173. return nil
  174. }
  175. return s.h[0].itr.Key()
  176. }
  177. // Value returns the value associated with the iterator.
  178. func (s *MergeIterator) Value() ValueStruct {
  179. if len(s.h) == 0 {
  180. return ValueStruct{}
  181. }
  182. return s.h[0].itr.Value()
  183. }
  184. // Next returns the next element. If it is the same as the current key, ignore it.
  185. func (s *MergeIterator) Next() {
  186. if len(s.h) == 0 {
  187. return
  188. }
  189. smallest := s.h[0].itr
  190. smallest.Next()
  191. for len(s.h) > 0 {
  192. smallest = s.h[0].itr
  193. if !smallest.Valid() {
  194. heap.Pop(&s.h)
  195. continue
  196. }
  197. heap.Fix(&s.h, 0)
  198. smallest = s.h[0].itr
  199. if smallest.Valid() {
  200. if !bytes.Equal(smallest.Key(), s.curKey) {
  201. break
  202. }
  203. smallest.Next()
  204. }
  205. }
  206. if !smallest.Valid() {
  207. return
  208. }
  209. s.storeKey(smallest)
  210. }
  211. // Rewind seeks to first element (or last element for reverse iterator).
  212. func (s *MergeIterator) Rewind() {
  213. for _, itr := range s.all {
  214. itr.Rewind()
  215. }
  216. s.initHeap()
  217. }
  218. // Seek brings us to element with key >= given key.
  219. func (s *MergeIterator) Seek(key []byte) {
  220. for _, itr := range s.all {
  221. itr.Seek(key)
  222. }
  223. s.initHeap()
  224. }
  225. // Close implements y.Iterator
  226. func (s *MergeIterator) Close() error {
  227. for _, itr := range s.all {
  228. if err := itr.Close(); err != nil {
  229. return errors.Wrap(err, "MergeIterator")
  230. }
  231. }
  232. return nil
  233. }