builder.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package table
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "io"
  21. "math"
  22. "github.com/AndreasBriese/bbloom"
  23. "github.com/dgraph-io/badger/y"
  24. )
  25. var (
  26. restartInterval = 100 // Might want to change this to be based on total size instead of numKeys.
  27. )
  28. func newBuffer(sz int) *bytes.Buffer {
  29. b := new(bytes.Buffer)
  30. b.Grow(sz)
  31. return b
  32. }
  33. type header struct {
  34. plen uint16 // Overlap with base key.
  35. klen uint16 // Length of the diff.
  36. vlen uint16 // Length of value.
  37. prev uint32 // Offset for the previous key-value pair. The offset is relative to block base offset.
  38. }
  39. // Encode encodes the header.
  40. func (h header) Encode(b []byte) {
  41. binary.BigEndian.PutUint16(b[0:2], h.plen)
  42. binary.BigEndian.PutUint16(b[2:4], h.klen)
  43. binary.BigEndian.PutUint16(b[4:6], h.vlen)
  44. binary.BigEndian.PutUint32(b[6:10], h.prev)
  45. }
  46. // Decode decodes the header.
  47. func (h *header) Decode(buf []byte) int {
  48. h.plen = binary.BigEndian.Uint16(buf[0:2])
  49. h.klen = binary.BigEndian.Uint16(buf[2:4])
  50. h.vlen = binary.BigEndian.Uint16(buf[4:6])
  51. h.prev = binary.BigEndian.Uint32(buf[6:10])
  52. return h.Size()
  53. }
  54. // Size returns size of the header. Currently it's just a constant.
  55. func (h header) Size() int { return 10 }
  56. // Builder is used in building a table.
  57. type Builder struct {
  58. counter int // Number of keys written for the current block.
  59. // Typically tens or hundreds of meg. This is for one single file.
  60. buf *bytes.Buffer
  61. baseKey []byte // Base key for the current block.
  62. baseOffset uint32 // Offset for the current block.
  63. restarts []uint32 // Base offsets of every block.
  64. // Tracks offset for the previous key-value pair. Offset is relative to block base offset.
  65. prevOffset uint32
  66. keyBuf *bytes.Buffer
  67. keyCount int
  68. }
  69. // NewTableBuilder makes a new TableBuilder.
  70. func NewTableBuilder() *Builder {
  71. return &Builder{
  72. keyBuf: newBuffer(1 << 20),
  73. buf: newBuffer(1 << 20),
  74. prevOffset: math.MaxUint32, // Used for the first element!
  75. }
  76. }
  77. // Close closes the TableBuilder.
  78. func (b *Builder) Close() {}
  79. // Empty returns whether it's empty.
  80. func (b *Builder) Empty() bool { return b.buf.Len() == 0 }
  81. // keyDiff returns a suffix of newKey that is different from b.baseKey.
  82. func (b Builder) keyDiff(newKey []byte) []byte {
  83. var i int
  84. for i = 0; i < len(newKey) && i < len(b.baseKey); i++ {
  85. if newKey[i] != b.baseKey[i] {
  86. break
  87. }
  88. }
  89. return newKey[i:]
  90. }
  91. func (b *Builder) addHelper(key []byte, v y.ValueStruct) {
  92. // Add key to bloom filter.
  93. if len(key) > 0 {
  94. var klen [2]byte
  95. keyNoTs := y.ParseKey(key)
  96. binary.BigEndian.PutUint16(klen[:], uint16(len(keyNoTs)))
  97. b.keyBuf.Write(klen[:])
  98. b.keyBuf.Write(keyNoTs)
  99. b.keyCount++
  100. }
  101. // diffKey stores the difference of key with baseKey.
  102. var diffKey []byte
  103. if len(b.baseKey) == 0 {
  104. // Make a copy. Builder should not keep references. Otherwise, caller has to be very careful
  105. // and will have to make copies of keys every time they add to builder, which is even worse.
  106. b.baseKey = append(b.baseKey[:0], key...)
  107. diffKey = key
  108. } else {
  109. diffKey = b.keyDiff(key)
  110. }
  111. h := header{
  112. plen: uint16(len(key) - len(diffKey)),
  113. klen: uint16(len(diffKey)),
  114. vlen: uint16(v.EncodedSize()),
  115. prev: b.prevOffset, // prevOffset is the location of the last key-value added.
  116. }
  117. b.prevOffset = uint32(b.buf.Len()) - b.baseOffset // Remember current offset for the next Add call.
  118. // Layout: header, diffKey, value.
  119. var hbuf [10]byte
  120. h.Encode(hbuf[:])
  121. b.buf.Write(hbuf[:])
  122. b.buf.Write(diffKey) // We only need to store the key difference.
  123. v.EncodeTo(b.buf)
  124. b.counter++ // Increment number of keys added for this current block.
  125. }
  126. func (b *Builder) finishBlock() {
  127. // When we are at the end of the block and Valid=false, and the user wants to do a Prev,
  128. // we need a dummy header to tell us the offset of the previous key-value pair.
  129. b.addHelper([]byte{}, y.ValueStruct{})
  130. }
  131. // Add adds a key-value pair to the block.
  132. // If doNotRestart is true, we will not restart even if b.counter >= restartInterval.
  133. func (b *Builder) Add(key []byte, value y.ValueStruct) error {
  134. if b.counter >= restartInterval {
  135. b.finishBlock()
  136. // Start a new block. Initialize the block.
  137. b.restarts = append(b.restarts, uint32(b.buf.Len()))
  138. b.counter = 0
  139. b.baseKey = []byte{}
  140. b.baseOffset = uint32(b.buf.Len())
  141. b.prevOffset = math.MaxUint32 // First key-value pair of block has header.prev=MaxInt.
  142. }
  143. b.addHelper(key, value)
  144. return nil // Currently, there is no meaningful error.
  145. }
  146. // TODO: vvv this was the comment on ReachedCapacity.
  147. // FinalSize returns the *rough* final size of the array, counting the header which is not yet written.
  148. // TODO: Look into why there is a discrepancy. I suspect it is because of Write(empty, empty)
  149. // at the end. The diff can vary.
  150. // ReachedCapacity returns true if we... roughly (?) reached capacity?
  151. func (b *Builder) ReachedCapacity(cap int64) bool {
  152. estimateSz := b.buf.Len() + 8 /* empty header */ + 4*len(b.restarts) + 8 // 8 = end of buf offset + len(restarts).
  153. return int64(estimateSz) > cap
  154. }
  155. // blockIndex generates the block index for the table.
  156. // It is mainly a list of all the block base offsets.
  157. func (b *Builder) blockIndex() []byte {
  158. // Store the end offset, so we know the length of the final block.
  159. b.restarts = append(b.restarts, uint32(b.buf.Len()))
  160. // Add 4 because we want to write out number of restarts at the end.
  161. sz := 4*len(b.restarts) + 4
  162. out := make([]byte, sz)
  163. buf := out
  164. for _, r := range b.restarts {
  165. binary.BigEndian.PutUint32(buf[:4], r)
  166. buf = buf[4:]
  167. }
  168. binary.BigEndian.PutUint32(buf[:4], uint32(len(b.restarts)))
  169. return out
  170. }
  171. // Finish finishes the table by appending the index.
  172. func (b *Builder) Finish() []byte {
  173. bf := bbloom.New(float64(b.keyCount), 0.01)
  174. var klen [2]byte
  175. key := make([]byte, 1024)
  176. for {
  177. if _, err := b.keyBuf.Read(klen[:]); err == io.EOF {
  178. break
  179. } else if err != nil {
  180. y.Check(err)
  181. }
  182. kl := int(binary.BigEndian.Uint16(klen[:]))
  183. if cap(key) < kl {
  184. key = make([]byte, 2*int(kl)) // 2 * uint16 will overflow
  185. }
  186. key = key[:kl]
  187. y.Check2(b.keyBuf.Read(key))
  188. bf.Add(key)
  189. }
  190. b.finishBlock() // This will never start a new block.
  191. index := b.blockIndex()
  192. b.buf.Write(index)
  193. // Write bloom filter.
  194. bdata := bf.JSONMarshal()
  195. n, err := b.buf.Write(bdata)
  196. y.Check(err)
  197. var buf [4]byte
  198. binary.BigEndian.PutUint32(buf[:], uint32(n))
  199. b.buf.Write(buf[:])
  200. return b.buf.Bytes()
  201. }