iterator.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package table
  17. import (
  18. "bytes"
  19. "io"
  20. "math"
  21. "sort"
  22. "github.com/dgraph-io/badger/y"
  23. "github.com/pkg/errors"
  24. )
  25. type blockIterator struct {
  26. data []byte
  27. pos uint32
  28. err error
  29. baseKey []byte
  30. key []byte
  31. val []byte
  32. init bool
  33. last header // The last header we saw.
  34. }
  35. func (itr *blockIterator) Reset() {
  36. itr.pos = 0
  37. itr.err = nil
  38. itr.baseKey = []byte{}
  39. itr.key = []byte{}
  40. itr.val = []byte{}
  41. itr.init = false
  42. itr.last = header{}
  43. }
  44. func (itr *blockIterator) Init() {
  45. if !itr.init {
  46. itr.Next()
  47. }
  48. }
  49. func (itr *blockIterator) Valid() bool {
  50. return itr != nil && itr.err == nil
  51. }
  52. func (itr *blockIterator) Error() error {
  53. return itr.err
  54. }
  55. func (itr *blockIterator) Close() {}
  56. var (
  57. origin = 0
  58. current = 1
  59. )
  60. // Seek brings us to the first block element that is >= input key.
  61. func (itr *blockIterator) Seek(key []byte, whence int) {
  62. itr.err = nil
  63. switch whence {
  64. case origin:
  65. itr.Reset()
  66. case current:
  67. }
  68. var done bool
  69. for itr.Init(); itr.Valid(); itr.Next() {
  70. k := itr.Key()
  71. if y.CompareKeys(k, key) >= 0 {
  72. // We are done as k is >= key.
  73. done = true
  74. break
  75. }
  76. }
  77. if !done {
  78. itr.err = io.EOF
  79. }
  80. }
  81. func (itr *blockIterator) SeekToFirst() {
  82. itr.err = nil
  83. itr.Init()
  84. }
  85. // SeekToLast brings us to the last element. Valid should return true.
  86. func (itr *blockIterator) SeekToLast() {
  87. itr.err = nil
  88. for itr.Init(); itr.Valid(); itr.Next() {
  89. }
  90. itr.Prev()
  91. }
  92. // parseKV would allocate a new byte slice for key and for value.
  93. func (itr *blockIterator) parseKV(h header) {
  94. if cap(itr.key) < int(h.plen+h.klen) {
  95. sz := int(h.plen) + int(h.klen) // Convert to int before adding to avoid uint16 overflow.
  96. itr.key = make([]byte, 2*sz)
  97. }
  98. itr.key = itr.key[:h.plen+h.klen]
  99. copy(itr.key, itr.baseKey[:h.plen])
  100. copy(itr.key[h.plen:], itr.data[itr.pos:itr.pos+uint32(h.klen)])
  101. itr.pos += uint32(h.klen)
  102. if itr.pos+uint32(h.vlen) > uint32(len(itr.data)) {
  103. itr.err = errors.Errorf("Value exceeded size of block: %d %d %d %d %v",
  104. itr.pos, h.klen, h.vlen, len(itr.data), h)
  105. return
  106. }
  107. itr.val = y.SafeCopy(itr.val, itr.data[itr.pos:itr.pos+uint32(h.vlen)])
  108. itr.pos += uint32(h.vlen)
  109. }
  110. func (itr *blockIterator) Next() {
  111. itr.init = true
  112. itr.err = nil
  113. if itr.pos >= uint32(len(itr.data)) {
  114. itr.err = io.EOF
  115. return
  116. }
  117. var h header
  118. itr.pos += uint32(h.Decode(itr.data[itr.pos:]))
  119. itr.last = h // Store the last header.
  120. if h.klen == 0 && h.plen == 0 {
  121. // Last entry in the table.
  122. itr.err = io.EOF
  123. return
  124. }
  125. // Populate baseKey if it isn't set yet. This would only happen for the first Next.
  126. if len(itr.baseKey) == 0 {
  127. // This should be the first Next() for this block. Hence, prefix length should be zero.
  128. y.AssertTrue(h.plen == 0)
  129. itr.baseKey = itr.data[itr.pos : itr.pos+uint32(h.klen)]
  130. }
  131. itr.parseKV(h)
  132. }
  133. func (itr *blockIterator) Prev() {
  134. if !itr.init {
  135. return
  136. }
  137. itr.err = nil
  138. if itr.last.prev == math.MaxUint32 {
  139. // This is the first element of the block!
  140. itr.err = io.EOF
  141. itr.pos = 0
  142. return
  143. }
  144. // Move back using current header's prev.
  145. itr.pos = itr.last.prev
  146. var h header
  147. y.AssertTruef(itr.pos < uint32(len(itr.data)), "%d %d", itr.pos, len(itr.data))
  148. itr.pos += uint32(h.Decode(itr.data[itr.pos:]))
  149. itr.parseKV(h)
  150. itr.last = h
  151. }
  152. func (itr *blockIterator) Key() []byte {
  153. if itr.err != nil {
  154. return nil
  155. }
  156. return itr.key
  157. }
  158. func (itr *blockIterator) Value() []byte {
  159. if itr.err != nil {
  160. return nil
  161. }
  162. return itr.val
  163. }
  164. // Iterator is an iterator for a Table.
  165. type Iterator struct {
  166. t *Table
  167. bpos int
  168. bi *blockIterator
  169. err error
  170. // Internally, Iterator is bidirectional. However, we only expose the
  171. // unidirectional functionality for now.
  172. reversed bool
  173. }
  174. // NewIterator returns a new iterator of the Table
  175. func (t *Table) NewIterator(reversed bool) *Iterator {
  176. t.IncrRef() // Important.
  177. ti := &Iterator{t: t, reversed: reversed}
  178. ti.next()
  179. return ti
  180. }
  181. // Close closes the iterator (and it must be called).
  182. func (itr *Iterator) Close() error {
  183. return itr.t.DecrRef()
  184. }
  185. func (itr *Iterator) reset() {
  186. itr.bpos = 0
  187. itr.err = nil
  188. }
  189. // Valid follows the y.Iterator interface
  190. func (itr *Iterator) Valid() bool {
  191. return itr.err == nil
  192. }
  193. func (itr *Iterator) seekToFirst() {
  194. numBlocks := len(itr.t.blockIndex)
  195. if numBlocks == 0 {
  196. itr.err = io.EOF
  197. return
  198. }
  199. itr.bpos = 0
  200. block, err := itr.t.block(itr.bpos)
  201. if err != nil {
  202. itr.err = err
  203. return
  204. }
  205. itr.bi = block.NewIterator()
  206. itr.bi.SeekToFirst()
  207. itr.err = itr.bi.Error()
  208. }
  209. func (itr *Iterator) seekToLast() {
  210. numBlocks := len(itr.t.blockIndex)
  211. if numBlocks == 0 {
  212. itr.err = io.EOF
  213. return
  214. }
  215. itr.bpos = numBlocks - 1
  216. block, err := itr.t.block(itr.bpos)
  217. if err != nil {
  218. itr.err = err
  219. return
  220. }
  221. itr.bi = block.NewIterator()
  222. itr.bi.SeekToLast()
  223. itr.err = itr.bi.Error()
  224. }
  225. func (itr *Iterator) seekHelper(blockIdx int, key []byte) {
  226. itr.bpos = blockIdx
  227. block, err := itr.t.block(blockIdx)
  228. if err != nil {
  229. itr.err = err
  230. return
  231. }
  232. itr.bi = block.NewIterator()
  233. itr.bi.Seek(key, origin)
  234. itr.err = itr.bi.Error()
  235. }
  236. // seekFrom brings us to a key that is >= input key.
  237. func (itr *Iterator) seekFrom(key []byte, whence int) {
  238. itr.err = nil
  239. switch whence {
  240. case origin:
  241. itr.reset()
  242. case current:
  243. }
  244. idx := sort.Search(len(itr.t.blockIndex), func(idx int) bool {
  245. ko := itr.t.blockIndex[idx]
  246. return y.CompareKeys(ko.key, key) > 0
  247. })
  248. if idx == 0 {
  249. // The smallest key in our table is already strictly > key. We can return that.
  250. // This is like a SeekToFirst.
  251. itr.seekHelper(0, key)
  252. return
  253. }
  254. // block[idx].smallest is > key.
  255. // Since idx>0, we know block[idx-1].smallest is <= key.
  256. // There are two cases.
  257. // 1) Everything in block[idx-1] is strictly < key. In this case, we should go to the first
  258. // element of block[idx].
  259. // 2) Some element in block[idx-1] is >= key. We should go to that element.
  260. itr.seekHelper(idx-1, key)
  261. if itr.err == io.EOF {
  262. // Case 1. Need to visit block[idx].
  263. if idx == len(itr.t.blockIndex) {
  264. // If idx == len(itr.t.blockIndex), then input key is greater than ANY element of table.
  265. // There's nothing we can do. Valid() should return false as we seek to end of table.
  266. return
  267. }
  268. // Since block[idx].smallest is > key. This is essentially a block[idx].SeekToFirst.
  269. itr.seekHelper(idx, key)
  270. }
  271. // Case 2: No need to do anything. We already did the seek in block[idx-1].
  272. }
  273. // seek will reset iterator and seek to >= key.
  274. func (itr *Iterator) seek(key []byte) {
  275. itr.seekFrom(key, origin)
  276. }
  277. // seekForPrev will reset iterator and seek to <= key.
  278. func (itr *Iterator) seekForPrev(key []byte) {
  279. // TODO: Optimize this. We shouldn't have to take a Prev step.
  280. itr.seekFrom(key, origin)
  281. if !bytes.Equal(itr.Key(), key) {
  282. itr.prev()
  283. }
  284. }
  285. func (itr *Iterator) next() {
  286. itr.err = nil
  287. if itr.bpos >= len(itr.t.blockIndex) {
  288. itr.err = io.EOF
  289. return
  290. }
  291. if itr.bi == nil {
  292. block, err := itr.t.block(itr.bpos)
  293. if err != nil {
  294. itr.err = err
  295. return
  296. }
  297. itr.bi = block.NewIterator()
  298. itr.bi.SeekToFirst()
  299. itr.err = itr.bi.Error()
  300. return
  301. }
  302. itr.bi.Next()
  303. if !itr.bi.Valid() {
  304. itr.bpos++
  305. itr.bi = nil
  306. itr.next()
  307. return
  308. }
  309. }
  310. func (itr *Iterator) prev() {
  311. itr.err = nil
  312. if itr.bpos < 0 {
  313. itr.err = io.EOF
  314. return
  315. }
  316. if itr.bi == nil {
  317. block, err := itr.t.block(itr.bpos)
  318. if err != nil {
  319. itr.err = err
  320. return
  321. }
  322. itr.bi = block.NewIterator()
  323. itr.bi.SeekToLast()
  324. itr.err = itr.bi.Error()
  325. return
  326. }
  327. itr.bi.Prev()
  328. if !itr.bi.Valid() {
  329. itr.bpos--
  330. itr.bi = nil
  331. itr.prev()
  332. return
  333. }
  334. }
  335. // Key follows the y.Iterator interface
  336. func (itr *Iterator) Key() []byte {
  337. return itr.bi.Key()
  338. }
  339. // Value follows the y.Iterator interface
  340. func (itr *Iterator) Value() (ret y.ValueStruct) {
  341. ret.Decode(itr.bi.Value())
  342. return
  343. }
  344. // Next follows the y.Iterator interface
  345. func (itr *Iterator) Next() {
  346. if !itr.reversed {
  347. itr.next()
  348. } else {
  349. itr.prev()
  350. }
  351. }
  352. // Rewind follows the y.Iterator interface
  353. func (itr *Iterator) Rewind() {
  354. if !itr.reversed {
  355. itr.seekToFirst()
  356. } else {
  357. itr.seekToLast()
  358. }
  359. }
  360. // Seek follows the y.Iterator interface
  361. func (itr *Iterator) Seek(key []byte) {
  362. if !itr.reversed {
  363. itr.seek(key)
  364. } else {
  365. itr.seekForPrev(key)
  366. }
  367. }
  368. // ConcatIterator concatenates the sequences defined by several iterators. (It only works with
  369. // TableIterators, probably just because it's faster to not be so generic.)
  370. type ConcatIterator struct {
  371. idx int // Which iterator is active now.
  372. cur *Iterator
  373. iters []*Iterator // Corresponds to tables.
  374. tables []*Table // Disregarding reversed, this is in ascending order.
  375. reversed bool
  376. }
  377. // NewConcatIterator creates a new concatenated iterator
  378. func NewConcatIterator(tbls []*Table, reversed bool) *ConcatIterator {
  379. iters := make([]*Iterator, len(tbls))
  380. for i := 0; i < len(tbls); i++ {
  381. iters[i] = tbls[i].NewIterator(reversed)
  382. }
  383. return &ConcatIterator{
  384. reversed: reversed,
  385. iters: iters,
  386. tables: tbls,
  387. idx: -1, // Not really necessary because s.it.Valid()=false, but good to have.
  388. }
  389. }
  390. func (s *ConcatIterator) setIdx(idx int) {
  391. s.idx = idx
  392. if idx < 0 || idx >= len(s.iters) {
  393. s.cur = nil
  394. } else {
  395. s.cur = s.iters[s.idx]
  396. }
  397. }
  398. // Rewind implements y.Interface
  399. func (s *ConcatIterator) Rewind() {
  400. if len(s.iters) == 0 {
  401. return
  402. }
  403. if !s.reversed {
  404. s.setIdx(0)
  405. } else {
  406. s.setIdx(len(s.iters) - 1)
  407. }
  408. s.cur.Rewind()
  409. }
  410. // Valid implements y.Interface
  411. func (s *ConcatIterator) Valid() bool {
  412. return s.cur != nil && s.cur.Valid()
  413. }
  414. // Key implements y.Interface
  415. func (s *ConcatIterator) Key() []byte {
  416. return s.cur.Key()
  417. }
  418. // Value implements y.Interface
  419. func (s *ConcatIterator) Value() y.ValueStruct {
  420. return s.cur.Value()
  421. }
  422. // Seek brings us to element >= key if reversed is false. Otherwise, <= key.
  423. func (s *ConcatIterator) Seek(key []byte) {
  424. var idx int
  425. if !s.reversed {
  426. idx = sort.Search(len(s.tables), func(i int) bool {
  427. return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
  428. })
  429. } else {
  430. n := len(s.tables)
  431. idx = n - 1 - sort.Search(n, func(i int) bool {
  432. return y.CompareKeys(s.tables[n-1-i].Smallest(), key) <= 0
  433. })
  434. }
  435. if idx >= len(s.tables) || idx < 0 {
  436. s.setIdx(-1)
  437. return
  438. }
  439. // For reversed=false, we know s.tables[i-1].Biggest() < key. Thus, the
  440. // previous table cannot possibly contain key.
  441. s.setIdx(idx)
  442. s.cur.Seek(key)
  443. }
  444. // Next advances our concat iterator.
  445. func (s *ConcatIterator) Next() {
  446. s.cur.Next()
  447. if s.cur.Valid() {
  448. // Nothing to do. Just stay with the current table.
  449. return
  450. }
  451. for { // In case there are empty tables.
  452. if !s.reversed {
  453. s.setIdx(s.idx + 1)
  454. } else {
  455. s.setIdx(s.idx - 1)
  456. }
  457. if s.cur == nil {
  458. // End of list. Valid will become false.
  459. return
  460. }
  461. s.cur.Rewind()
  462. if s.cur.Valid() {
  463. break
  464. }
  465. }
  466. }
  467. // Close implements y.Interface.
  468. func (s *ConcatIterator) Close() error {
  469. for _, it := range s.iters {
  470. if err := it.Close(); err != nil {
  471. return errors.Wrap(err, "ConcatIterator")
  472. }
  473. }
  474. return nil
  475. }