levels.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "fmt"
  19. "math"
  20. "math/rand"
  21. "os"
  22. "sort"
  23. "time"
  24. "golang.org/x/net/trace"
  25. "github.com/dgraph-io/badger/protos"
  26. "github.com/dgraph-io/badger/table"
  27. "github.com/dgraph-io/badger/y"
  28. "github.com/pkg/errors"
  29. )
  30. type levelsController struct {
  31. nextFileID uint64 // Atomic
  32. elog trace.EventLog
  33. // The following are initialized once and const.
  34. levels []*levelHandler
  35. kv *DB
  36. cstatus compactStatus
  37. }
  38. var (
  39. // This is for getting timings between stalls.
  40. lastUnstalled time.Time
  41. )
  42. // revertToManifest checks that all necessary table files exist and removes all table files not
  43. // referenced by the manifest. idMap is a set of table file id's that were read from the directory
  44. // listing.
  45. func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error {
  46. // 1. Check all files in manifest exist.
  47. for id := range mf.Tables {
  48. if _, ok := idMap[id]; !ok {
  49. return fmt.Errorf("file does not exist for table %d", id)
  50. }
  51. }
  52. // 2. Delete files that shouldn't exist.
  53. for id := range idMap {
  54. if _, ok := mf.Tables[id]; !ok {
  55. kv.elog.Printf("Table file %d not referenced in MANIFEST\n", id)
  56. filename := table.NewFilename(id, kv.opt.Dir)
  57. if err := os.Remove(filename); err != nil {
  58. return y.Wrapf(err, "While removing table %d", id)
  59. }
  60. }
  61. }
  62. return nil
  63. }
  64. func newLevelsController(kv *DB, mf *Manifest) (*levelsController, error) {
  65. y.AssertTrue(kv.opt.NumLevelZeroTablesStall > kv.opt.NumLevelZeroTables)
  66. s := &levelsController{
  67. kv: kv,
  68. elog: kv.elog,
  69. levels: make([]*levelHandler, kv.opt.MaxLevels),
  70. }
  71. s.cstatus.levels = make([]*levelCompactStatus, kv.opt.MaxLevels)
  72. for i := 0; i < kv.opt.MaxLevels; i++ {
  73. s.levels[i] = newLevelHandler(kv, i)
  74. if i == 0 {
  75. // Do nothing.
  76. } else if i == 1 {
  77. // Level 1 probably shouldn't be too much bigger than level 0.
  78. s.levels[i].maxTotalSize = kv.opt.LevelOneSize
  79. } else {
  80. s.levels[i].maxTotalSize = s.levels[i-1].maxTotalSize * int64(kv.opt.LevelSizeMultiplier)
  81. }
  82. s.cstatus.levels[i] = new(levelCompactStatus)
  83. }
  84. // Compare manifest against directory, check for existent/non-existent files, and remove.
  85. if err := revertToManifest(kv, mf, getIDMap(kv.opt.Dir)); err != nil {
  86. return nil, err
  87. }
  88. // Some files may be deleted. Let's reload.
  89. tables := make([][]*table.Table, kv.opt.MaxLevels)
  90. var maxFileID uint64
  91. for fileID, tableManifest := range mf.Tables {
  92. fname := table.NewFilename(fileID, kv.opt.Dir)
  93. var flags uint32 = y.Sync
  94. if kv.opt.ReadOnly {
  95. flags |= y.ReadOnly
  96. }
  97. fd, err := y.OpenExistingFile(fname, flags)
  98. if err != nil {
  99. closeAllTables(tables)
  100. return nil, errors.Wrapf(err, "Opening file: %q", fname)
  101. }
  102. t, err := table.OpenTable(fd, kv.opt.TableLoadingMode)
  103. if err != nil {
  104. closeAllTables(tables)
  105. return nil, errors.Wrapf(err, "Opening table: %q", fname)
  106. }
  107. level := tableManifest.Level
  108. tables[level] = append(tables[level], t)
  109. if fileID > maxFileID {
  110. maxFileID = fileID
  111. }
  112. }
  113. s.nextFileID = maxFileID + 1
  114. for i, tbls := range tables {
  115. s.levels[i].initTables(tbls)
  116. }
  117. // Make sure key ranges do not overlap etc.
  118. if err := s.validate(); err != nil {
  119. _ = s.cleanupLevels()
  120. return nil, errors.Wrap(err, "Level validation")
  121. }
  122. // Sync directory (because we have at least removed some files, or previously created the
  123. // manifest file).
  124. if err := syncDir(kv.opt.Dir); err != nil {
  125. _ = s.close()
  126. return nil, err
  127. }
  128. return s, nil
  129. }
  130. // Closes the tables, for cleanup in newLevelsController. (We Close() instead of using DecrRef()
  131. // because that would delete the underlying files.) We ignore errors, which is OK because tables
  132. // are read-only.
  133. func closeAllTables(tables [][]*table.Table) {
  134. for _, tableSlice := range tables {
  135. for _, table := range tableSlice {
  136. _ = table.Close()
  137. }
  138. }
  139. }
  140. func (s *levelsController) cleanupLevels() error {
  141. var firstErr error
  142. for _, l := range s.levels {
  143. if err := l.close(); err != nil && firstErr == nil {
  144. firstErr = err
  145. }
  146. }
  147. return firstErr
  148. }
  149. // This function picks all tables from all levels, creates a manifest changeset,
  150. // applies it, and then decrements the refs of these tables, which would result
  151. // in their deletion. It spares one table from L0, to keep the badgerHead key
  152. // persisted, so we don't lose where we are w.r.t. value log.
  153. // NOTE: This function in itself isn't sufficient to completely delete all the
  154. // data. After this, one would still need to iterate over the KV pairs and mark
  155. // them as deleted.
  156. func (s *levelsController) deleteLSMTree() (int, error) {
  157. var all []*table.Table
  158. var keepOne *table.Table
  159. for _, l := range s.levels {
  160. l.RLock()
  161. if l.level == 0 && len(l.tables) > 1 {
  162. // Skip the last table. We do this to keep the badgerMove key persisted.
  163. lastIdx := len(l.tables) - 1
  164. keepOne = l.tables[lastIdx]
  165. all = append(all, l.tables[:lastIdx]...)
  166. } else {
  167. all = append(all, l.tables...)
  168. }
  169. l.RUnlock()
  170. }
  171. if len(all) == 0 {
  172. return 0, nil
  173. }
  174. // Generate the manifest changes.
  175. changes := []*protos.ManifestChange{}
  176. for _, table := range all {
  177. changes = append(changes, makeTableDeleteChange(table.ID()))
  178. }
  179. changeSet := protos.ManifestChangeSet{Changes: changes}
  180. if err := s.kv.manifest.addChanges(changeSet.Changes); err != nil {
  181. return 0, err
  182. }
  183. for _, l := range s.levels {
  184. l.Lock()
  185. l.totalSize = 0
  186. if l.level == 0 && len(l.tables) > 1 {
  187. l.tables = []*table.Table{keepOne}
  188. l.totalSize += keepOne.Size()
  189. } else {
  190. l.tables = l.tables[:0]
  191. }
  192. l.Unlock()
  193. }
  194. // Now allow deletion of tables.
  195. for _, table := range all {
  196. if err := table.DecrRef(); err != nil {
  197. return 0, err
  198. }
  199. }
  200. return len(all), nil
  201. }
  202. func (s *levelsController) startCompact(lc *y.Closer) {
  203. n := s.kv.opt.NumCompactors
  204. lc.AddRunning(n - 1)
  205. for i := 0; i < n; i++ {
  206. go s.runWorker(lc)
  207. }
  208. }
  209. func (s *levelsController) runWorker(lc *y.Closer) {
  210. defer lc.Done()
  211. if s.kv.opt.DoNotCompact {
  212. return
  213. }
  214. time.Sleep(time.Duration(rand.Int31n(1000)) * time.Millisecond)
  215. ticker := time.NewTicker(time.Second)
  216. defer ticker.Stop()
  217. for {
  218. select {
  219. // Can add a done channel or other stuff.
  220. case <-ticker.C:
  221. prios := s.pickCompactLevels()
  222. for _, p := range prios {
  223. // TODO: Handle error.
  224. didCompact, _ := s.doCompact(p)
  225. if didCompact {
  226. break
  227. }
  228. }
  229. case <-lc.HasBeenClosed():
  230. return
  231. }
  232. }
  233. }
  234. // Returns true if level zero may be compacted, without accounting for compactions that already
  235. // might be happening.
  236. func (s *levelsController) isLevel0Compactable() bool {
  237. return s.levels[0].numTables() >= s.kv.opt.NumLevelZeroTables
  238. }
  239. // Returns true if the non-zero level may be compacted. delSize provides the size of the tables
  240. // which are currently being compacted so that we treat them as already having started being
  241. // compacted (because they have been, yet their size is already counted in getTotalSize).
  242. func (l *levelHandler) isCompactable(delSize int64) bool {
  243. return l.getTotalSize()-delSize >= l.maxTotalSize
  244. }
  245. type compactionPriority struct {
  246. level int
  247. score float64
  248. }
  249. // pickCompactLevel determines which level to compact.
  250. // Based on: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
  251. func (s *levelsController) pickCompactLevels() (prios []compactionPriority) {
  252. // This function must use identical criteria for guaranteeing compaction's progress that
  253. // addLevel0Table uses.
  254. // cstatus is checked to see if level 0's tables are already being compacted
  255. if !s.cstatus.overlapsWith(0, infRange) && s.isLevel0Compactable() {
  256. pri := compactionPriority{
  257. level: 0,
  258. score: float64(s.levels[0].numTables()) / float64(s.kv.opt.NumLevelZeroTables),
  259. }
  260. prios = append(prios, pri)
  261. }
  262. for i, l := range s.levels[1:] {
  263. // Don't consider those tables that are already being compacted right now.
  264. delSize := s.cstatus.delSize(i + 1)
  265. if l.isCompactable(delSize) {
  266. pri := compactionPriority{
  267. level: i + 1,
  268. score: float64(l.getTotalSize()-delSize) / float64(l.maxTotalSize),
  269. }
  270. prios = append(prios, pri)
  271. }
  272. }
  273. sort.Slice(prios, func(i, j int) bool {
  274. return prios[i].score > prios[j].score
  275. })
  276. return prios
  277. }
  278. // compactBuildTables merge topTables and botTables to form a list of new tables.
  279. func (s *levelsController) compactBuildTables(
  280. l int, cd compactDef) ([]*table.Table, func() error, error) {
  281. topTables := cd.top
  282. botTables := cd.bot
  283. var hasOverlap bool
  284. {
  285. kr := getKeyRange(cd.top)
  286. for i, lh := range s.levels {
  287. if i <= l { // Skip upper levels.
  288. continue
  289. }
  290. lh.RLock()
  291. left, right := lh.overlappingTables(levelHandlerRLocked{}, kr)
  292. lh.RUnlock()
  293. if right-left > 0 {
  294. hasOverlap = true
  295. break
  296. }
  297. }
  298. cd.elog.LazyPrintf("Key range overlaps with lower levels: %v", hasOverlap)
  299. }
  300. // Try to collect stats so that we can inform value log about GC. That would help us find which
  301. // value log file should be GCed.
  302. discardStats := make(map[uint32]int64)
  303. updateStats := func(vs y.ValueStruct) {
  304. if vs.Meta&bitValuePointer > 0 {
  305. var vp valuePointer
  306. vp.Decode(vs.Value)
  307. discardStats[vp.Fid] += int64(vp.Len)
  308. }
  309. }
  310. // Create iterators across all the tables involved first.
  311. var iters []y.Iterator
  312. if l == 0 {
  313. iters = appendIteratorsReversed(iters, topTables, false)
  314. } else {
  315. y.AssertTrue(len(topTables) == 1)
  316. iters = []y.Iterator{topTables[0].NewIterator(false)}
  317. }
  318. // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
  319. iters = append(iters, table.NewConcatIterator(botTables, false))
  320. it := y.NewMergeIterator(iters, false)
  321. defer it.Close() // Important to close the iterator to do ref counting.
  322. it.Rewind()
  323. // Pick a discard ts, so we can discard versions below this ts. We should
  324. // never discard any versions starting from above this timestamp, because
  325. // that would affect the snapshot view guarantee provided by transactions.
  326. discardTs := s.kv.orc.discardAtOrBelow()
  327. // Start generating new tables.
  328. type newTableResult struct {
  329. table *table.Table
  330. err error
  331. }
  332. resultCh := make(chan newTableResult)
  333. var numBuilds, numVersions int
  334. var lastKey, skipKey []byte
  335. for it.Valid() {
  336. timeStart := time.Now()
  337. builder := table.NewTableBuilder()
  338. var numKeys, numSkips uint64
  339. for ; it.Valid(); it.Next() {
  340. // See if we need to skip this key.
  341. if len(skipKey) > 0 {
  342. if y.SameKey(it.Key(), skipKey) {
  343. numSkips++
  344. updateStats(it.Value())
  345. continue
  346. } else {
  347. skipKey = skipKey[:0]
  348. }
  349. }
  350. if !y.SameKey(it.Key(), lastKey) {
  351. if builder.ReachedCapacity(s.kv.opt.MaxTableSize) {
  352. // Only break if we are on a different key, and have reached capacity. We want
  353. // to ensure that all versions of the key are stored in the same sstable, and
  354. // not divided across multiple tables at the same level.
  355. break
  356. }
  357. lastKey = y.SafeCopy(lastKey, it.Key())
  358. numVersions = 0
  359. }
  360. vs := it.Value()
  361. version := y.ParseTs(it.Key())
  362. if version <= discardTs {
  363. // Keep track of the number of versions encountered for this key. Only consider the
  364. // versions which are below the minReadTs, otherwise, we might end up discarding the
  365. // only valid version for a running transaction.
  366. numVersions++
  367. lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0
  368. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) ||
  369. numVersions > s.kv.opt.NumVersionsToKeep ||
  370. lastValidVersion {
  371. // If this version of the key is deleted or expired, skip all the rest of the
  372. // versions. Ensure that we're only removing versions below readTs.
  373. skipKey = y.SafeCopy(skipKey, it.Key())
  374. if lastValidVersion {
  375. // Add this key. We have set skipKey, so the following key versions
  376. // would be skipped.
  377. } else if hasOverlap {
  378. // If this key range has overlap with lower levels, then keep the deletion
  379. // marker with the latest version, discarding the rest. We have set skipKey,
  380. // so the following key versions would be skipped.
  381. } else {
  382. // If no overlap, we can skip all the versions, by continuing here.
  383. numSkips++
  384. updateStats(vs)
  385. continue // Skip adding this key.
  386. }
  387. }
  388. }
  389. numKeys++
  390. y.Check(builder.Add(it.Key(), it.Value()))
  391. }
  392. // It was true that it.Valid() at least once in the loop above, which means we
  393. // called Add() at least once, and builder is not Empty().
  394. cd.elog.LazyPrintf("Added %d keys. Skipped %d keys.", numKeys, numSkips)
  395. cd.elog.LazyPrintf("LOG Compact. Iteration took: %v\n", time.Since(timeStart))
  396. if !builder.Empty() {
  397. numBuilds++
  398. fileID := s.reserveFileID()
  399. go func(builder *table.Builder) {
  400. defer builder.Close()
  401. fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
  402. if err != nil {
  403. resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)}
  404. return
  405. }
  406. if _, err := fd.Write(builder.Finish()); err != nil {
  407. resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)}
  408. return
  409. }
  410. tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode)
  411. // decrRef is added below.
  412. resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())}
  413. }(builder)
  414. }
  415. }
  416. newTables := make([]*table.Table, 0, 20)
  417. // Wait for all table builders to finish.
  418. var firstErr error
  419. for x := 0; x < numBuilds; x++ {
  420. res := <-resultCh
  421. newTables = append(newTables, res.table)
  422. if firstErr == nil {
  423. firstErr = res.err
  424. }
  425. }
  426. if firstErr == nil {
  427. // Ensure created files' directory entries are visible. We don't mind the extra latency
  428. // from not doing this ASAP after all file creation has finished because this is a
  429. // background operation.
  430. firstErr = syncDir(s.kv.opt.Dir)
  431. }
  432. if firstErr != nil {
  433. // An error happened. Delete all the newly created table files (by calling DecrRef
  434. // -- we're the only holders of a ref).
  435. for j := 0; j < numBuilds; j++ {
  436. if newTables[j] != nil {
  437. newTables[j].DecrRef()
  438. }
  439. }
  440. errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
  441. return nil, nil, errorReturn
  442. }
  443. sort.Slice(newTables, func(i, j int) bool {
  444. return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
  445. })
  446. s.kv.vlog.updateGCStats(discardStats)
  447. cd.elog.LazyPrintf("Discard stats: %v", discardStats)
  448. return newTables, func() error { return decrRefs(newTables) }, nil
  449. }
  450. func buildChangeSet(cd *compactDef, newTables []*table.Table) protos.ManifestChangeSet {
  451. changes := []*protos.ManifestChange{}
  452. for _, table := range newTables {
  453. changes = append(changes, makeTableCreateChange(table.ID(), cd.nextLevel.level))
  454. }
  455. for _, table := range cd.top {
  456. changes = append(changes, makeTableDeleteChange(table.ID()))
  457. }
  458. for _, table := range cd.bot {
  459. changes = append(changes, makeTableDeleteChange(table.ID()))
  460. }
  461. return protos.ManifestChangeSet{Changes: changes}
  462. }
  463. type compactDef struct {
  464. elog trace.Trace
  465. thisLevel *levelHandler
  466. nextLevel *levelHandler
  467. top []*table.Table
  468. bot []*table.Table
  469. thisRange keyRange
  470. nextRange keyRange
  471. thisSize int64
  472. }
  473. func (cd *compactDef) lockLevels() {
  474. cd.thisLevel.RLock()
  475. cd.nextLevel.RLock()
  476. }
  477. func (cd *compactDef) unlockLevels() {
  478. cd.nextLevel.RUnlock()
  479. cd.thisLevel.RUnlock()
  480. }
  481. func (s *levelsController) fillTablesL0(cd *compactDef) bool {
  482. cd.lockLevels()
  483. defer cd.unlockLevels()
  484. cd.top = make([]*table.Table, len(cd.thisLevel.tables))
  485. copy(cd.top, cd.thisLevel.tables)
  486. if len(cd.top) == 0 {
  487. return false
  488. }
  489. cd.thisRange = infRange
  490. kr := getKeyRange(cd.top)
  491. left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, kr)
  492. cd.bot = make([]*table.Table, right-left)
  493. copy(cd.bot, cd.nextLevel.tables[left:right])
  494. if len(cd.bot) == 0 {
  495. cd.nextRange = kr
  496. } else {
  497. cd.nextRange = getKeyRange(cd.bot)
  498. }
  499. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  500. return false
  501. }
  502. return true
  503. }
  504. func (s *levelsController) fillTables(cd *compactDef) bool {
  505. cd.lockLevels()
  506. defer cd.unlockLevels()
  507. tbls := make([]*table.Table, len(cd.thisLevel.tables))
  508. copy(tbls, cd.thisLevel.tables)
  509. if len(tbls) == 0 {
  510. return false
  511. }
  512. // Find the biggest table, and compact that first.
  513. // TODO: Try other table picking strategies.
  514. sort.Slice(tbls, func(i, j int) bool {
  515. return tbls[i].Size() > tbls[j].Size()
  516. })
  517. for _, t := range tbls {
  518. cd.thisSize = t.Size()
  519. cd.thisRange = keyRange{
  520. // We pick all the versions of the smallest and the biggest key.
  521. left: y.KeyWithTs(y.ParseKey(t.Smallest()), math.MaxUint64),
  522. // Note that version zero would be the rightmost key.
  523. right: y.KeyWithTs(y.ParseKey(t.Biggest()), 0),
  524. }
  525. if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
  526. continue
  527. }
  528. cd.top = []*table.Table{t}
  529. left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
  530. cd.bot = make([]*table.Table, right-left)
  531. copy(cd.bot, cd.nextLevel.tables[left:right])
  532. if len(cd.bot) == 0 {
  533. cd.bot = []*table.Table{}
  534. cd.nextRange = cd.thisRange
  535. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  536. continue
  537. }
  538. return true
  539. }
  540. cd.nextRange = getKeyRange(cd.bot)
  541. if s.cstatus.overlapsWith(cd.nextLevel.level, cd.nextRange) {
  542. continue
  543. }
  544. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  545. continue
  546. }
  547. return true
  548. }
  549. return false
  550. }
  551. func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
  552. timeStart := time.Now()
  553. thisLevel := cd.thisLevel
  554. nextLevel := cd.nextLevel
  555. // Table should never be moved directly between levels, always be rewritten to allow discarding
  556. // invalid versions.
  557. newTables, decr, err := s.compactBuildTables(l, cd)
  558. if err != nil {
  559. return err
  560. }
  561. defer func() {
  562. // Only assign to err, if it's not already nil.
  563. if decErr := decr(); err == nil {
  564. err = decErr
  565. }
  566. }()
  567. changeSet := buildChangeSet(&cd, newTables)
  568. // We write to the manifest _before_ we delete files (and after we created files)
  569. if err := s.kv.manifest.addChanges(changeSet.Changes); err != nil {
  570. return err
  571. }
  572. // See comment earlier in this function about the ordering of these ops, and the order in which
  573. // we access levels when reading.
  574. if err := nextLevel.replaceTables(newTables); err != nil {
  575. return err
  576. }
  577. if err := thisLevel.deleteTables(cd.top); err != nil {
  578. return err
  579. }
  580. // Note: For level 0, while doCompact is running, it is possible that new tables are added.
  581. // However, the tables are added only to the end, so it is ok to just delete the first table.
  582. cd.elog.LazyPrintf("LOG Compact %d->%d, del %d tables, add %d tables, took %v\n",
  583. l, l+1, len(cd.top)+len(cd.bot), len(newTables), time.Since(timeStart))
  584. return nil
  585. }
  586. // doCompact picks some table on level l and compacts it away to the next level.
  587. func (s *levelsController) doCompact(p compactionPriority) (bool, error) {
  588. l := p.level
  589. y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.
  590. cd := compactDef{
  591. elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
  592. thisLevel: s.levels[l],
  593. nextLevel: s.levels[l+1],
  594. }
  595. cd.elog.SetMaxEvents(100)
  596. defer cd.elog.Finish()
  597. cd.elog.LazyPrintf("Got compaction priority: %+v", p)
  598. // While picking tables to be compacted, both levels' tables are expected to
  599. // remain unchanged.
  600. if l == 0 {
  601. if !s.fillTablesL0(&cd) {
  602. cd.elog.LazyPrintf("fillTables failed for level: %d\n", l)
  603. return false, nil
  604. }
  605. } else {
  606. if !s.fillTables(&cd) {
  607. cd.elog.LazyPrintf("fillTables failed for level: %d\n", l)
  608. return false, nil
  609. }
  610. }
  611. defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
  612. cd.elog.LazyPrintf("Running for level: %d\n", cd.thisLevel.level)
  613. s.cstatus.toLog(cd.elog)
  614. if err := s.runCompactDef(l, cd); err != nil {
  615. // This compaction couldn't be done successfully.
  616. cd.elog.LazyPrintf("\tLOG Compact FAILED with error: %+v: %+v", err, cd)
  617. return false, err
  618. }
  619. s.cstatus.toLog(cd.elog)
  620. cd.elog.LazyPrintf("Compaction for level: %d DONE", cd.thisLevel.level)
  621. return true, nil
  622. }
  623. func (s *levelsController) addLevel0Table(t *table.Table) error {
  624. // We update the manifest _before_ the table becomes part of a levelHandler, because at that
  625. // point it could get used in some compaction. This ensures the manifest file gets updated in
  626. // the proper order. (That means this update happens before that of some compaction which
  627. // deletes the table.)
  628. err := s.kv.manifest.addChanges([]*protos.ManifestChange{
  629. makeTableCreateChange(t.ID(), 0),
  630. })
  631. if err != nil {
  632. return err
  633. }
  634. for !s.levels[0].tryAddLevel0Table(t) {
  635. // Stall. Make sure all levels are healthy before we unstall.
  636. var timeStart time.Time
  637. {
  638. s.elog.Printf("STALLED STALLED STALLED STALLED STALLED STALLED STALLED STALLED: %v\n",
  639. time.Since(lastUnstalled))
  640. s.cstatus.RLock()
  641. for i := 0; i < s.kv.opt.MaxLevels; i++ {
  642. s.elog.Printf("level=%d. Status=%s Size=%d\n",
  643. i, s.cstatus.levels[i].debug(), s.levels[i].getTotalSize())
  644. }
  645. s.cstatus.RUnlock()
  646. timeStart = time.Now()
  647. }
  648. // Before we unstall, we need to make sure that level 0 and 1 are healthy. Otherwise, we
  649. // will very quickly fill up level 0 again and if the compaction strategy favors level 0,
  650. // then level 1 is going to super full.
  651. for i := 0; ; i++ {
  652. // Passing 0 for delSize to compactable means we're treating incomplete compactions as
  653. // not having finished -- we wait for them to finish. Also, it's crucial this behavior
  654. // replicates pickCompactLevels' behavior in computing compactability in order to
  655. // guarantee progress.
  656. if !s.isLevel0Compactable() && !s.levels[1].isCompactable(0) {
  657. break
  658. }
  659. time.Sleep(10 * time.Millisecond)
  660. if i%100 == 0 {
  661. prios := s.pickCompactLevels()
  662. s.elog.Printf("Waiting to add level 0 table. Compaction priorities: %+v\n", prios)
  663. i = 0
  664. }
  665. }
  666. {
  667. s.elog.Printf("UNSTALLED UNSTALLED UNSTALLED UNSTALLED UNSTALLED UNSTALLED: %v\n",
  668. time.Since(timeStart))
  669. lastUnstalled = time.Now()
  670. }
  671. }
  672. return nil
  673. }
  674. func (s *levelsController) close() error {
  675. err := s.cleanupLevels()
  676. return errors.Wrap(err, "levelsController.Close")
  677. }
  678. // get returns the found value if any. If not found, we return nil.
  679. func (s *levelsController) get(key []byte) (y.ValueStruct, error) {
  680. // It's important that we iterate the levels from 0 on upward. The reason is, if we iterated
  681. // in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could
  682. // read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do
  683. // parallelize this, we will need to call the h.RLock() function by increasing order of level
  684. // number.)
  685. for _, h := range s.levels {
  686. vs, err := h.get(key) // Calls h.RLock() and h.RUnlock().
  687. if err != nil {
  688. return y.ValueStruct{}, errors.Wrapf(err, "get key: %q", key)
  689. }
  690. if vs.Value == nil && vs.Meta == 0 {
  691. continue
  692. }
  693. return vs, nil
  694. }
  695. return y.ValueStruct{}, nil
  696. }
  697. func appendIteratorsReversed(out []y.Iterator, th []*table.Table, reversed bool) []y.Iterator {
  698. for i := len(th) - 1; i >= 0; i-- {
  699. // This will increment the reference of the table handler.
  700. out = append(out, th[i].NewIterator(reversed))
  701. }
  702. return out
  703. }
  704. // appendIterators appends iterators to an array of iterators, for merging.
  705. // Note: This obtains references for the table handlers. Remember to close these iterators.
  706. func (s *levelsController) appendIterators(
  707. iters []y.Iterator, reversed bool) []y.Iterator {
  708. // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
  709. // data when there's a compaction.
  710. for _, level := range s.levels {
  711. iters = level.appendIterators(iters, reversed)
  712. }
  713. return iters
  714. }
  715. type TableInfo struct {
  716. ID uint64
  717. Level int
  718. Left []byte
  719. Right []byte
  720. }
  721. func (s *levelsController) getTableInfo() (result []TableInfo) {
  722. for _, l := range s.levels {
  723. for _, t := range l.tables {
  724. info := TableInfo{
  725. ID: t.ID(),
  726. Level: l.level,
  727. Left: t.Smallest(),
  728. Right: t.Biggest(),
  729. }
  730. result = append(result, info)
  731. }
  732. }
  733. sort.Slice(result, func(i, j int) bool {
  734. if result[i].Level != result[j].Level {
  735. return result[i].Level < result[j].Level
  736. }
  737. return result[i].ID < result[j].ID
  738. })
  739. return
  740. }