table.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package table
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "os"
  21. "path"
  22. "path/filepath"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "github.com/AndreasBriese/bbloom"
  28. "github.com/dgraph-io/badger/options"
  29. "github.com/dgraph-io/badger/y"
  30. "github.com/pkg/errors"
  31. )
  32. const fileSuffix = ".sst"
  33. type keyOffset struct {
  34. key []byte
  35. offset int
  36. len int
  37. }
  38. // Table represents a loaded table file with the info we have about it
  39. type Table struct {
  40. sync.Mutex
  41. fd *os.File // Own fd.
  42. tableSize int // Initialized in OpenTable, using fd.Stat().
  43. blockIndex []keyOffset
  44. ref int32 // For file garbage collection. Atomic.
  45. loadingMode options.FileLoadingMode
  46. mmap []byte // Memory mapped.
  47. // The following are initialized once and const.
  48. smallest, biggest []byte // Smallest and largest keys.
  49. id uint64 // file id, part of filename
  50. bf bbloom.Bloom
  51. }
  52. // IncrRef increments the refcount (having to do with whether the file should be deleted)
  53. func (t *Table) IncrRef() {
  54. atomic.AddInt32(&t.ref, 1)
  55. }
  56. // DecrRef decrements the refcount and possibly deletes the table
  57. func (t *Table) DecrRef() error {
  58. newRef := atomic.AddInt32(&t.ref, -1)
  59. if newRef == 0 {
  60. // We can safely delete this file, because for all the current files, we always have
  61. // at least one reference pointing to them.
  62. // It's necessary to delete windows files
  63. if t.loadingMode == options.MemoryMap {
  64. y.Munmap(t.mmap)
  65. }
  66. if err := t.fd.Truncate(0); err != nil {
  67. // This is very important to let the FS know that the file is deleted.
  68. return err
  69. }
  70. filename := t.fd.Name()
  71. if err := t.fd.Close(); err != nil {
  72. return err
  73. }
  74. if err := os.Remove(filename); err != nil {
  75. return err
  76. }
  77. }
  78. return nil
  79. }
  80. type block struct {
  81. offset int
  82. data []byte
  83. }
  84. func (b block) NewIterator() *blockIterator {
  85. return &blockIterator{data: b.data}
  86. }
  87. // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function
  88. // entry. Returns a table with one reference count on it (decrementing which may delete the file!
  89. // -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before
  90. // deleting.
  91. func OpenTable(fd *os.File, loadingMode options.FileLoadingMode) (*Table, error) {
  92. fileInfo, err := fd.Stat()
  93. if err != nil {
  94. // It's OK to ignore fd.Close() errs in this function because we have only read
  95. // from the file.
  96. _ = fd.Close()
  97. return nil, y.Wrap(err)
  98. }
  99. filename := fileInfo.Name()
  100. id, ok := ParseFileID(filename)
  101. if !ok {
  102. _ = fd.Close()
  103. return nil, errors.Errorf("Invalid filename: %s", filename)
  104. }
  105. t := &Table{
  106. fd: fd,
  107. ref: 1, // Caller is given one reference.
  108. id: id,
  109. loadingMode: loadingMode,
  110. }
  111. t.tableSize = int(fileInfo.Size())
  112. if loadingMode == options.MemoryMap {
  113. t.mmap, err = y.Mmap(fd, false, fileInfo.Size())
  114. if err != nil {
  115. _ = fd.Close()
  116. return nil, y.Wrapf(err, "Unable to map file")
  117. }
  118. } else if loadingMode == options.LoadToRAM {
  119. err = t.loadToRAM()
  120. if err != nil {
  121. _ = fd.Close()
  122. return nil, y.Wrap(err)
  123. }
  124. }
  125. if err := t.readIndex(); err != nil {
  126. return nil, y.Wrap(err)
  127. }
  128. it := t.NewIterator(false)
  129. defer it.Close()
  130. it.Rewind()
  131. if it.Valid() {
  132. t.smallest = it.Key()
  133. }
  134. it2 := t.NewIterator(true)
  135. defer it2.Close()
  136. it2.Rewind()
  137. if it2.Valid() {
  138. t.biggest = it2.Key()
  139. }
  140. return t, nil
  141. }
  142. // Close closes the open table. (Releases resources back to the OS.)
  143. func (t *Table) Close() error {
  144. if t.loadingMode == options.MemoryMap {
  145. y.Munmap(t.mmap)
  146. }
  147. return t.fd.Close()
  148. }
  149. func (t *Table) read(off int, sz int) ([]byte, error) {
  150. if len(t.mmap) > 0 {
  151. if len(t.mmap[off:]) < sz {
  152. return nil, y.ErrEOF
  153. }
  154. return t.mmap[off : off+sz], nil
  155. }
  156. res := make([]byte, sz)
  157. nbr, err := t.fd.ReadAt(res, int64(off))
  158. y.NumReads.Add(1)
  159. y.NumBytesRead.Add(int64(nbr))
  160. return res, err
  161. }
  162. func (t *Table) readNoFail(off int, sz int) []byte {
  163. res, err := t.read(off, sz)
  164. y.Check(err)
  165. return res
  166. }
  167. func (t *Table) readIndex() error {
  168. readPos := t.tableSize
  169. // Read bloom filter.
  170. readPos -= 4
  171. buf := t.readNoFail(readPos, 4)
  172. bloomLen := int(binary.BigEndian.Uint32(buf))
  173. readPos -= bloomLen
  174. data := t.readNoFail(readPos, bloomLen)
  175. t.bf = bbloom.JSONUnmarshal(data)
  176. readPos -= 4
  177. buf = t.readNoFail(readPos, 4)
  178. restartsLen := int(binary.BigEndian.Uint32(buf))
  179. readPos -= 4 * restartsLen
  180. buf = t.readNoFail(readPos, 4*restartsLen)
  181. offsets := make([]int, restartsLen)
  182. for i := 0; i < restartsLen; i++ {
  183. offsets[i] = int(binary.BigEndian.Uint32(buf[:4]))
  184. buf = buf[4:]
  185. }
  186. // The last offset stores the end of the last block.
  187. for i := 0; i < len(offsets); i++ {
  188. var o int
  189. if i == 0 {
  190. o = 0
  191. } else {
  192. o = offsets[i-1]
  193. }
  194. ko := keyOffset{
  195. offset: o,
  196. len: offsets[i] - o,
  197. }
  198. t.blockIndex = append(t.blockIndex, ko)
  199. }
  200. che := make(chan error, len(t.blockIndex))
  201. blocks := make(chan int, len(t.blockIndex))
  202. for i := 0; i < len(t.blockIndex); i++ {
  203. blocks <- i
  204. }
  205. for i := 0; i < 64; i++ { // Run 64 goroutines.
  206. go func() {
  207. var h header
  208. for index := range blocks {
  209. ko := &t.blockIndex[index]
  210. offset := ko.offset
  211. buf, err := t.read(offset, h.Size())
  212. if err != nil {
  213. che <- errors.Wrap(err, "While reading first header in block")
  214. continue
  215. }
  216. h.Decode(buf)
  217. y.AssertTruef(h.plen == 0, "Key offset: %+v, h.plen = %d", *ko, h.plen)
  218. offset += h.Size()
  219. buf = make([]byte, h.klen)
  220. var out []byte
  221. if out, err = t.read(offset, int(h.klen)); err != nil {
  222. che <- errors.Wrap(err, "While reading first key in block")
  223. continue
  224. }
  225. y.AssertTrue(len(buf) == copy(buf, out))
  226. ko.key = buf
  227. che <- nil
  228. }
  229. }()
  230. }
  231. close(blocks) // to stop reading goroutines
  232. var readError error
  233. for i := 0; i < len(t.blockIndex); i++ {
  234. if err := <-che; err != nil && readError == nil {
  235. readError = err
  236. }
  237. }
  238. if readError != nil {
  239. return readError
  240. }
  241. return nil
  242. }
  243. func (t *Table) block(idx int) (block, error) {
  244. y.AssertTruef(idx >= 0, "idx=%d", idx)
  245. if idx >= len(t.blockIndex) {
  246. return block{}, errors.New("block out of index")
  247. }
  248. ko := t.blockIndex[idx]
  249. blk := block{
  250. offset: ko.offset,
  251. }
  252. var err error
  253. blk.data, err = t.read(blk.offset, ko.len)
  254. return blk, err
  255. }
  256. // Size is its file size in bytes
  257. func (t *Table) Size() int64 { return int64(t.tableSize) }
  258. // Smallest is its smallest key, or nil if there are none
  259. func (t *Table) Smallest() []byte { return t.smallest }
  260. // Biggest is its biggest key, or nil if there are none
  261. func (t *Table) Biggest() []byte { return t.biggest }
  262. // Filename is NOT the file name. Just kidding, it is.
  263. func (t *Table) Filename() string { return t.fd.Name() }
  264. // ID is the table's ID number (used to make the file name).
  265. func (t *Table) ID() uint64 { return t.id }
  266. // DoesNotHave returns true if (but not "only if") the table does not have the key. It does a
  267. // bloom filter lookup.
  268. func (t *Table) DoesNotHave(key []byte) bool { return !t.bf.Has(key) }
  269. // ParseFileID reads the file id out of a filename.
  270. func ParseFileID(name string) (uint64, bool) {
  271. name = path.Base(name)
  272. if !strings.HasSuffix(name, fileSuffix) {
  273. return 0, false
  274. }
  275. // suffix := name[len(fileSuffix):]
  276. name = strings.TrimSuffix(name, fileSuffix)
  277. id, err := strconv.Atoi(name)
  278. if err != nil {
  279. return 0, false
  280. }
  281. y.AssertTrue(id >= 0)
  282. return uint64(id), true
  283. }
  284. // IDToFilename does the inverse of ParseFileID
  285. func IDToFilename(id uint64) string {
  286. return fmt.Sprintf("%06d", id) + fileSuffix
  287. }
  288. // NewFilename should be named TableFilepath -- it combines the dir with the ID to make a table
  289. // filepath.
  290. func NewFilename(id uint64, dir string) string {
  291. return filepath.Join(dir, IDToFilename(id))
  292. }
  293. func (t *Table) loadToRAM() error {
  294. t.mmap = make([]byte, t.tableSize)
  295. read, err := t.fd.ReadAt(t.mmap, 0)
  296. if err != nil || read != t.tableSize {
  297. return y.Wrapf(err, "Unable to load file in memory. Table file: %s", t.Filename())
  298. }
  299. y.NumReads.Add(1)
  300. y.NumBytesRead.Add(int64(read))
  301. return nil
  302. }