manifest.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bufio"
  19. "bytes"
  20. "encoding/binary"
  21. "fmt"
  22. "hash/crc32"
  23. "io"
  24. "os"
  25. "path/filepath"
  26. "sync"
  27. "github.com/dgraph-io/badger/protos"
  28. "github.com/dgraph-io/badger/y"
  29. "github.com/pkg/errors"
  30. )
  31. // Manifest represents the contents of the MANIFEST file in a Badger store.
  32. //
  33. // The MANIFEST file describes the startup state of the db -- all LSM files and what level they're
  34. // at.
  35. //
  36. // It consists of a sequence of ManifestChangeSet objects. Each of these is treated atomically,
  37. // and contains a sequence of ManifestChange's (file creations/deletions) which we use to
  38. // reconstruct the manifest at startup.
  39. type Manifest struct {
  40. Levels []levelManifest
  41. Tables map[uint64]tableManifest
  42. // Contains total number of creation and deletion changes in the manifest -- used to compute
  43. // whether it'd be useful to rewrite the manifest.
  44. Creations int
  45. Deletions int
  46. }
  47. func createManifest() Manifest {
  48. levels := make([]levelManifest, 0)
  49. return Manifest{
  50. Levels: levels,
  51. Tables: make(map[uint64]tableManifest),
  52. }
  53. }
  54. // levelManifest contains information about LSM tree levels
  55. // in the MANIFEST file.
  56. type levelManifest struct {
  57. Tables map[uint64]struct{} // Set of table id's
  58. }
  59. // tableManifest contains information about a specific level
  60. // in the LSM tree.
  61. type tableManifest struct {
  62. Level uint8
  63. }
  64. // manifestFile holds the file pointer (and other info) about the manifest file, which is a log
  65. // file we append to.
  66. type manifestFile struct {
  67. fp *os.File
  68. directory string
  69. // We make this configurable so that unit tests can hit rewrite() code quickly
  70. deletionsRewriteThreshold int
  71. // Guards appends, which includes access to the manifest field.
  72. appendLock sync.Mutex
  73. // Used to track the current state of the manifest, used when rewriting.
  74. manifest Manifest
  75. }
  76. const (
  77. // ManifestFilename is the filename for the manifest file.
  78. ManifestFilename = "MANIFEST"
  79. manifestRewriteFilename = "MANIFEST-REWRITE"
  80. manifestDeletionsRewriteThreshold = 10000
  81. manifestDeletionsRatio = 10
  82. )
  83. // asChanges returns a sequence of changes that could be used to recreate the Manifest in its
  84. // present state.
  85. func (m *Manifest) asChanges() []*protos.ManifestChange {
  86. changes := make([]*protos.ManifestChange, 0, len(m.Tables))
  87. for id, tm := range m.Tables {
  88. changes = append(changes, makeTableCreateChange(id, int(tm.Level)))
  89. }
  90. return changes
  91. }
  92. func (m *Manifest) clone() Manifest {
  93. changeSet := protos.ManifestChangeSet{Changes: m.asChanges()}
  94. ret := createManifest()
  95. y.Check(applyChangeSet(&ret, &changeSet))
  96. return ret
  97. }
  98. // openOrCreateManifestFile opens a Badger manifest file if it exists, or creates on if
  99. // one doesn’t.
  100. func openOrCreateManifestFile(dir string, readOnly bool) (ret *manifestFile, result Manifest, err error) {
  101. return helpOpenOrCreateManifestFile(dir, readOnly, manifestDeletionsRewriteThreshold)
  102. }
  103. func helpOpenOrCreateManifestFile(dir string, readOnly bool, deletionsThreshold int) (ret *manifestFile, result Manifest, err error) {
  104. path := filepath.Join(dir, ManifestFilename)
  105. var flags uint32
  106. if readOnly {
  107. flags |= y.ReadOnly
  108. }
  109. fp, err := y.OpenExistingFile(path, flags) // We explicitly sync in addChanges, outside the lock.
  110. if err != nil {
  111. if !os.IsNotExist(err) {
  112. return nil, Manifest{}, err
  113. }
  114. if readOnly {
  115. return nil, Manifest{}, fmt.Errorf("no manifest found, required for read-only db")
  116. }
  117. m := createManifest()
  118. fp, netCreations, err := helpRewrite(dir, &m)
  119. if err != nil {
  120. return nil, Manifest{}, err
  121. }
  122. y.AssertTrue(netCreations == 0)
  123. mf := &manifestFile{
  124. fp: fp,
  125. directory: dir,
  126. manifest: m.clone(),
  127. deletionsRewriteThreshold: deletionsThreshold,
  128. }
  129. return mf, m, nil
  130. }
  131. manifest, truncOffset, err := ReplayManifestFile(fp)
  132. if err != nil {
  133. _ = fp.Close()
  134. return nil, Manifest{}, err
  135. }
  136. if !readOnly {
  137. // Truncate file so we don't have a half-written entry at the end.
  138. if err := fp.Truncate(truncOffset); err != nil {
  139. _ = fp.Close()
  140. return nil, Manifest{}, err
  141. }
  142. }
  143. if _, err = fp.Seek(0, io.SeekEnd); err != nil {
  144. _ = fp.Close()
  145. return nil, Manifest{}, err
  146. }
  147. mf := &manifestFile{
  148. fp: fp,
  149. directory: dir,
  150. manifest: manifest.clone(),
  151. deletionsRewriteThreshold: deletionsThreshold,
  152. }
  153. return mf, manifest, nil
  154. }
  155. func (mf *manifestFile) close() error {
  156. return mf.fp.Close()
  157. }
  158. // addChanges writes a batch of changes, atomically, to the file. By "atomically" that means when
  159. // we replay the MANIFEST file, we'll either replay all the changes or none of them. (The truth of
  160. // this depends on the filesystem -- some might append garbage data if a system crash happens at
  161. // the wrong time.)
  162. func (mf *manifestFile) addChanges(changesParam []*protos.ManifestChange) error {
  163. changes := protos.ManifestChangeSet{Changes: changesParam}
  164. buf, err := changes.Marshal()
  165. if err != nil {
  166. return err
  167. }
  168. // Maybe we could use O_APPEND instead (on certain file systems)
  169. mf.appendLock.Lock()
  170. if err := applyChangeSet(&mf.manifest, &changes); err != nil {
  171. mf.appendLock.Unlock()
  172. return err
  173. }
  174. // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care
  175. if mf.manifest.Deletions > mf.deletionsRewriteThreshold &&
  176. mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) {
  177. if err := mf.rewrite(); err != nil {
  178. mf.appendLock.Unlock()
  179. return err
  180. }
  181. } else {
  182. var lenCrcBuf [8]byte
  183. binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(buf)))
  184. binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable))
  185. buf = append(lenCrcBuf[:], buf...)
  186. if _, err := mf.fp.Write(buf); err != nil {
  187. mf.appendLock.Unlock()
  188. return err
  189. }
  190. }
  191. mf.appendLock.Unlock()
  192. return mf.fp.Sync()
  193. }
  194. // Has to be 4 bytes. The value can never change, ever, anyway.
  195. var magicText = [4]byte{'B', 'd', 'g', 'r'}
  196. // The magic version number.
  197. const magicVersion = 4
  198. func helpRewrite(dir string, m *Manifest) (*os.File, int, error) {
  199. rewritePath := filepath.Join(dir, manifestRewriteFilename)
  200. // We explicitly sync.
  201. fp, err := y.OpenTruncFile(rewritePath, false)
  202. if err != nil {
  203. return nil, 0, err
  204. }
  205. buf := make([]byte, 8)
  206. copy(buf[0:4], magicText[:])
  207. binary.BigEndian.PutUint32(buf[4:8], magicVersion)
  208. netCreations := len(m.Tables)
  209. changes := m.asChanges()
  210. set := protos.ManifestChangeSet{Changes: changes}
  211. changeBuf, err := set.Marshal()
  212. if err != nil {
  213. fp.Close()
  214. return nil, 0, err
  215. }
  216. var lenCrcBuf [8]byte
  217. binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(changeBuf)))
  218. binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(changeBuf, y.CastagnoliCrcTable))
  219. buf = append(buf, lenCrcBuf[:]...)
  220. buf = append(buf, changeBuf...)
  221. if _, err := fp.Write(buf); err != nil {
  222. fp.Close()
  223. return nil, 0, err
  224. }
  225. if err := fp.Sync(); err != nil {
  226. fp.Close()
  227. return nil, 0, err
  228. }
  229. // In Windows the files should be closed before doing a Rename.
  230. if err = fp.Close(); err != nil {
  231. return nil, 0, err
  232. }
  233. manifestPath := filepath.Join(dir, ManifestFilename)
  234. if err := os.Rename(rewritePath, manifestPath); err != nil {
  235. return nil, 0, err
  236. }
  237. fp, err = y.OpenExistingFile(manifestPath, 0)
  238. if err != nil {
  239. return nil, 0, err
  240. }
  241. if _, err := fp.Seek(0, io.SeekEnd); err != nil {
  242. fp.Close()
  243. return nil, 0, err
  244. }
  245. if err := syncDir(dir); err != nil {
  246. fp.Close()
  247. return nil, 0, err
  248. }
  249. return fp, netCreations, nil
  250. }
  251. // Must be called while appendLock is held.
  252. func (mf *manifestFile) rewrite() error {
  253. // In Windows the files should be closed before doing a Rename.
  254. if err := mf.fp.Close(); err != nil {
  255. return err
  256. }
  257. fp, netCreations, err := helpRewrite(mf.directory, &mf.manifest)
  258. if err != nil {
  259. return err
  260. }
  261. mf.fp = fp
  262. mf.manifest.Creations = netCreations
  263. mf.manifest.Deletions = 0
  264. return nil
  265. }
  266. type countingReader struct {
  267. wrapped *bufio.Reader
  268. count int64
  269. }
  270. func (r *countingReader) Read(p []byte) (n int, err error) {
  271. n, err = r.wrapped.Read(p)
  272. r.count += int64(n)
  273. return
  274. }
  275. func (r *countingReader) ReadByte() (b byte, err error) {
  276. b, err = r.wrapped.ReadByte()
  277. if err == nil {
  278. r.count++
  279. }
  280. return
  281. }
  282. var (
  283. errBadMagic = errors.New("manifest has bad magic")
  284. )
  285. // ReplayManifestFile reads the manifest file and constructs two manifest objects. (We need one
  286. // immutable copy and one mutable copy of the manifest. Easiest way is to construct two of them.)
  287. // Also, returns the last offset after a completely read manifest entry -- the file must be
  288. // truncated at that point before further appends are made (if there is a partial entry after
  289. // that). In normal conditions, truncOffset is the file size.
  290. func ReplayManifestFile(fp *os.File) (ret Manifest, truncOffset int64, err error) {
  291. r := countingReader{wrapped: bufio.NewReader(fp)}
  292. var magicBuf [8]byte
  293. if _, err := io.ReadFull(&r, magicBuf[:]); err != nil {
  294. return Manifest{}, 0, errBadMagic
  295. }
  296. if !bytes.Equal(magicBuf[0:4], magicText[:]) {
  297. return Manifest{}, 0, errBadMagic
  298. }
  299. version := binary.BigEndian.Uint32(magicBuf[4:8])
  300. if version != magicVersion {
  301. return Manifest{}, 0,
  302. fmt.Errorf("manifest has unsupported version: %d (we support %d)", version, magicVersion)
  303. }
  304. build := createManifest()
  305. var offset int64
  306. for {
  307. offset = r.count
  308. var lenCrcBuf [8]byte
  309. _, err := io.ReadFull(&r, lenCrcBuf[:])
  310. if err != nil {
  311. if err == io.EOF || err == io.ErrUnexpectedEOF {
  312. break
  313. }
  314. return Manifest{}, 0, err
  315. }
  316. length := binary.BigEndian.Uint32(lenCrcBuf[0:4])
  317. var buf = make([]byte, length)
  318. if _, err := io.ReadFull(&r, buf); err != nil {
  319. if err == io.EOF || err == io.ErrUnexpectedEOF {
  320. break
  321. }
  322. return Manifest{}, 0, err
  323. }
  324. if crc32.Checksum(buf, y.CastagnoliCrcTable) != binary.BigEndian.Uint32(lenCrcBuf[4:8]) {
  325. break
  326. }
  327. var changeSet protos.ManifestChangeSet
  328. if err := changeSet.Unmarshal(buf); err != nil {
  329. return Manifest{}, 0, err
  330. }
  331. if err := applyChangeSet(&build, &changeSet); err != nil {
  332. return Manifest{}, 0, err
  333. }
  334. }
  335. return build, offset, err
  336. }
  337. func applyManifestChange(build *Manifest, tc *protos.ManifestChange) error {
  338. switch tc.Op {
  339. case protos.ManifestChange_CREATE:
  340. if _, ok := build.Tables[tc.Id]; ok {
  341. return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id)
  342. }
  343. build.Tables[tc.Id] = tableManifest{
  344. Level: uint8(tc.Level),
  345. }
  346. for len(build.Levels) <= int(tc.Level) {
  347. build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})})
  348. }
  349. build.Levels[tc.Level].Tables[tc.Id] = struct{}{}
  350. build.Creations++
  351. case protos.ManifestChange_DELETE:
  352. tm, ok := build.Tables[tc.Id]
  353. if !ok {
  354. return fmt.Errorf("MANIFEST removes non-existing table %d", tc.Id)
  355. }
  356. delete(build.Levels[tm.Level].Tables, tc.Id)
  357. delete(build.Tables, tc.Id)
  358. build.Deletions++
  359. default:
  360. return fmt.Errorf("MANIFEST file has invalid manifestChange op")
  361. }
  362. return nil
  363. }
  364. // This is not a "recoverable" error -- opening the KV store fails because the MANIFEST file is
  365. // just plain broken.
  366. func applyChangeSet(build *Manifest, changeSet *protos.ManifestChangeSet) error {
  367. for _, change := range changeSet.Changes {
  368. if err := applyManifestChange(build, change); err != nil {
  369. return err
  370. }
  371. }
  372. return nil
  373. }
  374. func makeTableCreateChange(id uint64, level int) *protos.ManifestChange {
  375. return &protos.ManifestChange{
  376. Id: id,
  377. Op: protos.ManifestChange_CREATE,
  378. Level: uint32(level),
  379. }
  380. }
  381. func makeTableDeleteChange(id uint64) *protos.ManifestChange {
  382. return &protos.ManifestChange{
  383. Id: id,
  384. Op: protos.ManifestChange_DELETE,
  385. }
  386. }