db.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "encoding/binary"
  19. "expvar"
  20. "log"
  21. "math"
  22. "os"
  23. "path/filepath"
  24. "strconv"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "github.com/dgraph-io/badger/options"
  29. "golang.org/x/net/trace"
  30. "github.com/dgraph-io/badger/skl"
  31. "github.com/dgraph-io/badger/table"
  32. "github.com/dgraph-io/badger/y"
  33. "github.com/pkg/errors"
  34. )
  35. var (
  36. badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
  37. head = []byte("!badger!head") // For storing value offset for replay.
  38. txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
  39. badgerMove = []byte("!badger!move") // For key-value pairs which got moved during GC.
  40. )
  41. type closers struct {
  42. updateSize *y.Closer
  43. compactors *y.Closer
  44. memtable *y.Closer
  45. writes *y.Closer
  46. valueGC *y.Closer
  47. }
  48. // DB provides the various functions required to interact with Badger.
  49. // DB is thread-safe.
  50. type DB struct {
  51. sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
  52. dirLockGuard *directoryLockGuard
  53. // nil if Dir and ValueDir are the same
  54. valueDirGuard *directoryLockGuard
  55. closers closers
  56. elog trace.EventLog
  57. mt *skl.Skiplist // Our latest (actively written) in-memory table
  58. imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
  59. opt Options
  60. manifest *manifestFile
  61. lc *levelsController
  62. vlog valueLog
  63. vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt
  64. writeCh chan *request
  65. flushChan chan flushTask // For flushing memtables.
  66. blockWrites int32
  67. orc *oracle
  68. }
  69. const (
  70. kvWriteChCapacity = 1000
  71. )
  72. func replayFunction(out *DB) func(Entry, valuePointer) error {
  73. type txnEntry struct {
  74. nk []byte
  75. v y.ValueStruct
  76. }
  77. var txn []txnEntry
  78. var lastCommit uint64
  79. toLSM := func(nk []byte, vs y.ValueStruct) {
  80. for err := out.ensureRoomForWrite(); err != nil; err = out.ensureRoomForWrite() {
  81. out.elog.Printf("Replay: Making room for writes")
  82. time.Sleep(10 * time.Millisecond)
  83. }
  84. out.mt.Put(nk, vs)
  85. }
  86. first := true
  87. return func(e Entry, vp valuePointer) error { // Function for replaying.
  88. if first {
  89. out.elog.Printf("First key=%q\n", e.Key)
  90. }
  91. first = false
  92. if out.orc.curRead < y.ParseTs(e.Key) {
  93. out.orc.curRead = y.ParseTs(e.Key)
  94. }
  95. nk := make([]byte, len(e.Key))
  96. copy(nk, e.Key)
  97. var nv []byte
  98. meta := e.meta
  99. if out.shouldWriteValueToLSM(e) {
  100. nv = make([]byte, len(e.Value))
  101. copy(nv, e.Value)
  102. } else {
  103. nv = make([]byte, vptrSize)
  104. vp.Encode(nv)
  105. meta = meta | bitValuePointer
  106. }
  107. v := y.ValueStruct{
  108. Value: nv,
  109. Meta: meta,
  110. UserMeta: e.UserMeta,
  111. }
  112. if e.meta&bitFinTxn > 0 {
  113. txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
  114. if err != nil {
  115. return errors.Wrapf(err, "Unable to parse txn fin: %q", e.Value)
  116. }
  117. y.AssertTrue(lastCommit == txnTs)
  118. y.AssertTrue(len(txn) > 0)
  119. // Got the end of txn. Now we can store them.
  120. for _, t := range txn {
  121. toLSM(t.nk, t.v)
  122. }
  123. txn = txn[:0]
  124. lastCommit = 0
  125. } else if e.meta&bitTxn == 0 {
  126. // This entry is from a rewrite.
  127. toLSM(nk, v)
  128. // We shouldn't get this entry in the middle of a transaction.
  129. y.AssertTrue(lastCommit == 0)
  130. y.AssertTrue(len(txn) == 0)
  131. } else {
  132. txnTs := y.ParseTs(nk)
  133. if lastCommit == 0 {
  134. lastCommit = txnTs
  135. }
  136. y.AssertTrue(lastCommit == txnTs)
  137. te := txnEntry{nk: nk, v: v}
  138. txn = append(txn, te)
  139. }
  140. return nil
  141. }
  142. }
  143. // Open returns a new DB object.
  144. func Open(opt Options) (db *DB, err error) {
  145. opt.maxBatchSize = (15 * opt.MaxTableSize) / 100
  146. opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
  147. if opt.ValueThreshold > math.MaxUint16-16 {
  148. return nil, ErrValueThreshold
  149. }
  150. if opt.ReadOnly {
  151. // Can't truncate if the DB is read only.
  152. opt.Truncate = false
  153. }
  154. for _, path := range []string{opt.Dir, opt.ValueDir} {
  155. dirExists, err := exists(path)
  156. if err != nil {
  157. return nil, y.Wrapf(err, "Invalid Dir: %q", path)
  158. }
  159. if !dirExists {
  160. if opt.ReadOnly {
  161. return nil, y.Wrapf(err, "Cannot find Dir for read-only open: %q", path)
  162. }
  163. // Try to create the directory
  164. err = os.Mkdir(path, 0700)
  165. if err != nil {
  166. return nil, y.Wrapf(err, "Error Creating Dir: %q", path)
  167. }
  168. }
  169. }
  170. absDir, err := filepath.Abs(opt.Dir)
  171. if err != nil {
  172. return nil, err
  173. }
  174. absValueDir, err := filepath.Abs(opt.ValueDir)
  175. if err != nil {
  176. return nil, err
  177. }
  178. var dirLockGuard, valueDirLockGuard *directoryLockGuard
  179. dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
  180. if err != nil {
  181. return nil, err
  182. }
  183. defer func() {
  184. if dirLockGuard != nil {
  185. _ = dirLockGuard.release()
  186. }
  187. }()
  188. if absValueDir != absDir {
  189. valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
  190. if err != nil {
  191. return nil, err
  192. }
  193. }
  194. defer func() {
  195. if valueDirLockGuard != nil {
  196. _ = valueDirLockGuard.release()
  197. }
  198. }()
  199. if !(opt.ValueLogFileSize <= 2<<30 && opt.ValueLogFileSize >= 1<<20) {
  200. return nil, ErrValueLogSize
  201. }
  202. if !(opt.ValueLogLoadingMode == options.FileIO ||
  203. opt.ValueLogLoadingMode == options.MemoryMap) {
  204. return nil, ErrInvalidLoadingMode
  205. }
  206. manifestFile, manifest, err := openOrCreateManifestFile(opt.Dir, opt.ReadOnly)
  207. if err != nil {
  208. return nil, err
  209. }
  210. defer func() {
  211. if manifestFile != nil {
  212. _ = manifestFile.close()
  213. }
  214. }()
  215. orc := &oracle{
  216. isManaged: opt.managedTxns,
  217. nextCommit: 1,
  218. commits: make(map[uint64]uint64),
  219. readMark: y.WaterMark{},
  220. }
  221. orc.readMark.Init()
  222. db = &DB{
  223. imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
  224. flushChan: make(chan flushTask, opt.NumMemtables),
  225. writeCh: make(chan *request, kvWriteChCapacity),
  226. opt: opt,
  227. manifest: manifestFile,
  228. elog: trace.NewEventLog("Badger", "DB"),
  229. dirLockGuard: dirLockGuard,
  230. valueDirGuard: valueDirLockGuard,
  231. orc: orc,
  232. }
  233. // Calculate initial size.
  234. db.calculateSize()
  235. db.closers.updateSize = y.NewCloser(1)
  236. go db.updateSize(db.closers.updateSize)
  237. db.mt = skl.NewSkiplist(arenaSize(opt))
  238. // newLevelsController potentially loads files in directory.
  239. if db.lc, err = newLevelsController(db, &manifest); err != nil {
  240. return nil, err
  241. }
  242. if !opt.ReadOnly {
  243. db.closers.compactors = y.NewCloser(1)
  244. db.lc.startCompact(db.closers.compactors)
  245. db.closers.memtable = y.NewCloser(1)
  246. go db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
  247. }
  248. if err = db.vlog.Open(db, opt); err != nil {
  249. return nil, err
  250. }
  251. headKey := y.KeyWithTs(head, math.MaxUint64)
  252. // Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
  253. vs, err := db.get(headKey)
  254. if err != nil {
  255. return nil, errors.Wrap(err, "Retrieving head")
  256. }
  257. db.orc.curRead = vs.Version
  258. var vptr valuePointer
  259. if len(vs.Value) > 0 {
  260. vptr.Decode(vs.Value)
  261. }
  262. // lastUsedCasCounter will either be the value stored in !badger!head, or some subsequently
  263. // written value log entry that we replay. (Subsequent value log entries might be _less_
  264. // than lastUsedCasCounter, if there was value log gc so we have to max() values while
  265. // replaying.)
  266. // out.lastUsedCasCounter = item.casCounter
  267. // TODO: Figure this out. This would update the read timestamp, and set nextCommitTs.
  268. replayCloser := y.NewCloser(1)
  269. go db.doWrites(replayCloser)
  270. if err = db.vlog.Replay(vptr, replayFunction(db)); err != nil {
  271. return db, err
  272. }
  273. replayCloser.SignalAndWait() // Wait for replay to be applied first.
  274. // Now that we have the curRead, we can update the nextCommit.
  275. db.orc.nextCommit = db.orc.curRead + 1
  276. // Mmap writable log
  277. lf := db.vlog.filesMap[db.vlog.maxFid]
  278. if err = lf.mmap(2 * db.vlog.opt.ValueLogFileSize); err != nil {
  279. return db, errors.Wrapf(err, "Unable to mmap RDWR log file")
  280. }
  281. db.writeCh = make(chan *request, kvWriteChCapacity)
  282. db.closers.writes = y.NewCloser(1)
  283. go db.doWrites(db.closers.writes)
  284. db.closers.valueGC = y.NewCloser(1)
  285. go db.vlog.waitOnGC(db.closers.valueGC)
  286. valueDirLockGuard = nil
  287. dirLockGuard = nil
  288. manifestFile = nil
  289. return db, nil
  290. }
  291. // Close closes a DB. It's crucial to call it to ensure all the pending updates
  292. // make their way to disk. Calling DB.Close() multiple times is not safe and would
  293. // cause panic.
  294. func (db *DB) Close() (err error) {
  295. db.elog.Printf("Closing database")
  296. // Stop value GC first.
  297. db.closers.valueGC.SignalAndWait()
  298. // Stop writes next.
  299. db.closers.writes.SignalAndWait()
  300. // Now close the value log.
  301. if vlogErr := db.vlog.Close(); err == nil {
  302. err = errors.Wrap(vlogErr, "DB.Close")
  303. }
  304. // Make sure that block writer is done pushing stuff into memtable!
  305. // Otherwise, you will have a race condition: we are trying to flush memtables
  306. // and remove them completely, while the block / memtable writer is still
  307. // trying to push stuff into the memtable. This will also resolve the value
  308. // offset problem: as we push into memtable, we update value offsets there.
  309. if !db.mt.Empty() {
  310. db.elog.Printf("Flushing memtable")
  311. for {
  312. pushedFlushTask := func() bool {
  313. db.Lock()
  314. defer db.Unlock()
  315. y.AssertTrue(db.mt != nil)
  316. select {
  317. case db.flushChan <- flushTask{db.mt, db.vptr}:
  318. db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
  319. db.mt = nil // Will segfault if we try writing!
  320. db.elog.Printf("pushed to flush chan\n")
  321. return true
  322. default:
  323. // If we fail to push, we need to unlock and wait for a short while.
  324. // The flushing operation needs to update s.imm. Otherwise, we have a deadlock.
  325. // TODO: Think about how to do this more cleanly, maybe without any locks.
  326. }
  327. return false
  328. }()
  329. if pushedFlushTask {
  330. break
  331. }
  332. time.Sleep(10 * time.Millisecond)
  333. }
  334. }
  335. db.flushChan <- flushTask{nil, valuePointer{}} // Tell flusher to quit.
  336. if db.closers.memtable != nil {
  337. db.closers.memtable.Wait()
  338. db.elog.Printf("Memtable flushed")
  339. }
  340. if db.closers.compactors != nil {
  341. db.closers.compactors.SignalAndWait()
  342. db.elog.Printf("Compaction finished")
  343. }
  344. // Force Compact L0
  345. // We don't need to care about cstatus since no parallel compaction is running.
  346. cd := compactDef{
  347. elog: trace.New("Badger", "Compact"),
  348. thisLevel: db.lc.levels[0],
  349. nextLevel: db.lc.levels[1],
  350. }
  351. cd.elog.SetMaxEvents(100)
  352. defer cd.elog.Finish()
  353. if db.lc.fillTablesL0(&cd) {
  354. if err := db.lc.runCompactDef(0, cd); err != nil {
  355. cd.elog.LazyPrintf("\tLOG Compact FAILED with error: %+v: %+v", err, cd)
  356. }
  357. } else {
  358. cd.elog.LazyPrintf("fillTables failed for level zero. No compaction required")
  359. }
  360. if lcErr := db.lc.close(); err == nil {
  361. err = errors.Wrap(lcErr, "DB.Close")
  362. }
  363. db.elog.Printf("Waiting for closer")
  364. db.closers.updateSize.SignalAndWait()
  365. db.elog.Finish()
  366. if db.dirLockGuard != nil {
  367. if guardErr := db.dirLockGuard.release(); err == nil {
  368. err = errors.Wrap(guardErr, "DB.Close")
  369. }
  370. }
  371. if db.valueDirGuard != nil {
  372. if guardErr := db.valueDirGuard.release(); err == nil {
  373. err = errors.Wrap(guardErr, "DB.Close")
  374. }
  375. }
  376. if manifestErr := db.manifest.close(); err == nil {
  377. err = errors.Wrap(manifestErr, "DB.Close")
  378. }
  379. // Fsync directories to ensure that lock file, and any other removed files whose directory
  380. // we haven't specifically fsynced, are guaranteed to have their directory entry removal
  381. // persisted to disk.
  382. if syncErr := syncDir(db.opt.Dir); err == nil {
  383. err = errors.Wrap(syncErr, "DB.Close")
  384. }
  385. if syncErr := syncDir(db.opt.ValueDir); err == nil {
  386. err = errors.Wrap(syncErr, "DB.Close")
  387. }
  388. return err
  389. }
  390. const (
  391. lockFile = "LOCK"
  392. )
  393. // When you create or delete a file, you have to ensure the directory entry for the file is synced
  394. // in order to guarantee the file is visible (if the system crashes). (See the man page for fsync,
  395. // or see https://github.com/coreos/etcd/issues/6368 for an example.)
  396. func syncDir(dir string) error {
  397. f, err := openDir(dir)
  398. if err != nil {
  399. return errors.Wrapf(err, "While opening directory: %s.", dir)
  400. }
  401. err = f.Sync()
  402. closeErr := f.Close()
  403. if err != nil {
  404. return errors.Wrapf(err, "While syncing directory: %s.", dir)
  405. }
  406. return errors.Wrapf(closeErr, "While closing directory: %s.", dir)
  407. }
  408. // getMemtables returns the current memtables and get references.
  409. func (db *DB) getMemTables() ([]*skl.Skiplist, func()) {
  410. db.RLock()
  411. defer db.RUnlock()
  412. tables := make([]*skl.Skiplist, len(db.imm)+1)
  413. // Get mutable memtable.
  414. tables[0] = db.mt
  415. tables[0].IncrRef()
  416. // Get immutable memtables.
  417. last := len(db.imm) - 1
  418. for i := range db.imm {
  419. tables[i+1] = db.imm[last-i]
  420. tables[i+1].IncrRef()
  421. }
  422. return tables, func() {
  423. for _, tbl := range tables {
  424. tbl.DecrRef()
  425. }
  426. }
  427. }
  428. // get returns the value in memtable or disk for given key.
  429. // Note that value will include meta byte.
  430. //
  431. // IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
  432. // maintain this invariant to search for the latest value of a key, or else we need to search in all
  433. // tables and find the max version among them. To maintain this invariant, we also need to ensure
  434. // that all versions of a key are always present in the same table from level 1, because compaction
  435. // can push any table down.
  436. func (db *DB) get(key []byte) (y.ValueStruct, error) {
  437. tables, decr := db.getMemTables() // Lock should be released.
  438. defer decr()
  439. y.NumGets.Add(1)
  440. for i := 0; i < len(tables); i++ {
  441. vs := tables[i].Get(key)
  442. y.NumMemtableGets.Add(1)
  443. if vs.Meta != 0 || vs.Value != nil {
  444. return vs, nil
  445. }
  446. }
  447. return db.lc.get(key)
  448. }
  449. func (db *DB) updateOffset(ptrs []valuePointer) {
  450. var ptr valuePointer
  451. for i := len(ptrs) - 1; i >= 0; i-- {
  452. p := ptrs[i]
  453. if !p.IsZero() {
  454. ptr = p
  455. break
  456. }
  457. }
  458. if ptr.IsZero() {
  459. return
  460. }
  461. db.Lock()
  462. defer db.Unlock()
  463. y.AssertTrue(!ptr.Less(db.vptr))
  464. db.vptr = ptr
  465. }
  466. var requestPool = sync.Pool{
  467. New: func() interface{} {
  468. return new(request)
  469. },
  470. }
  471. func (db *DB) shouldWriteValueToLSM(e Entry) bool {
  472. return len(e.Value) < db.opt.ValueThreshold
  473. }
  474. func (db *DB) writeToLSM(b *request) error {
  475. if len(b.Ptrs) != len(b.Entries) {
  476. return errors.Errorf("Ptrs and Entries don't match: %+v", b)
  477. }
  478. for i, entry := range b.Entries {
  479. if entry.meta&bitFinTxn != 0 {
  480. continue
  481. }
  482. if db.shouldWriteValueToLSM(*entry) { // Will include deletion / tombstone case.
  483. db.mt.Put(entry.Key,
  484. y.ValueStruct{
  485. Value: entry.Value,
  486. Meta: entry.meta,
  487. UserMeta: entry.UserMeta,
  488. ExpiresAt: entry.ExpiresAt,
  489. })
  490. } else {
  491. var offsetBuf [vptrSize]byte
  492. db.mt.Put(entry.Key,
  493. y.ValueStruct{
  494. Value: b.Ptrs[i].Encode(offsetBuf[:]),
  495. Meta: entry.meta | bitValuePointer,
  496. UserMeta: entry.UserMeta,
  497. ExpiresAt: entry.ExpiresAt,
  498. })
  499. }
  500. }
  501. return nil
  502. }
  503. // writeRequests is called serially by only one goroutine.
  504. func (db *DB) writeRequests(reqs []*request) error {
  505. if len(reqs) == 0 {
  506. return nil
  507. }
  508. done := func(err error) {
  509. for _, r := range reqs {
  510. r.Err = err
  511. r.Wg.Done()
  512. }
  513. }
  514. db.elog.Printf("writeRequests called. Writing to value log")
  515. err := db.vlog.write(reqs)
  516. if err != nil {
  517. done(err)
  518. return err
  519. }
  520. db.elog.Printf("Writing to memtable")
  521. var count int
  522. for _, b := range reqs {
  523. if len(b.Entries) == 0 {
  524. continue
  525. }
  526. count += len(b.Entries)
  527. var i uint64
  528. for err := db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
  529. i++
  530. if i%100 == 0 {
  531. db.elog.Printf("Making room for writes")
  532. }
  533. // We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
  534. // When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
  535. // you will get a deadlock.
  536. time.Sleep(10 * time.Millisecond)
  537. }
  538. if err != nil {
  539. done(err)
  540. return errors.Wrap(err, "writeRequests")
  541. }
  542. if err := db.writeToLSM(b); err != nil {
  543. done(err)
  544. return errors.Wrap(err, "writeRequests")
  545. }
  546. db.updateOffset(b.Ptrs)
  547. }
  548. done(nil)
  549. db.elog.Printf("%d entries written", count)
  550. return nil
  551. }
  552. func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
  553. if atomic.LoadInt32(&db.blockWrites) == 1 {
  554. return nil, ErrBlockedWrites
  555. }
  556. var count, size int64
  557. for _, e := range entries {
  558. size += int64(e.estimateSize(db.opt.ValueThreshold))
  559. count++
  560. }
  561. if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
  562. return nil, ErrTxnTooBig
  563. }
  564. // We can only service one request because we need each txn to be stored in a contigous section.
  565. // Txns should not interleave among other txns or rewrites.
  566. req := requestPool.Get().(*request)
  567. req.Entries = entries
  568. req.Wg = sync.WaitGroup{}
  569. req.Wg.Add(1)
  570. db.writeCh <- req // Handled in doWrites.
  571. y.NumPuts.Add(int64(len(entries)))
  572. return req, nil
  573. }
  574. func (db *DB) doWrites(lc *y.Closer) {
  575. defer lc.Done()
  576. pendingCh := make(chan struct{}, 1)
  577. writeRequests := func(reqs []*request) {
  578. if err := db.writeRequests(reqs); err != nil {
  579. log.Printf("ERROR in Badger::writeRequests: %v", err)
  580. }
  581. <-pendingCh
  582. }
  583. // This variable tracks the number of pending writes.
  584. reqLen := new(expvar.Int)
  585. y.PendingWrites.Set(db.opt.Dir, reqLen)
  586. reqs := make([]*request, 0, 10)
  587. for {
  588. var r *request
  589. select {
  590. case r = <-db.writeCh:
  591. case <-lc.HasBeenClosed():
  592. goto closedCase
  593. }
  594. for {
  595. reqs = append(reqs, r)
  596. reqLen.Set(int64(len(reqs)))
  597. if len(reqs) >= 3*kvWriteChCapacity {
  598. pendingCh <- struct{}{} // blocking.
  599. goto writeCase
  600. }
  601. select {
  602. // Either push to pending, or continue to pick from writeCh.
  603. case r = <-db.writeCh:
  604. case pendingCh <- struct{}{}:
  605. goto writeCase
  606. case <-lc.HasBeenClosed():
  607. goto closedCase
  608. }
  609. }
  610. closedCase:
  611. close(db.writeCh)
  612. for r := range db.writeCh { // Flush the channel.
  613. reqs = append(reqs, r)
  614. }
  615. pendingCh <- struct{}{} // Push to pending before doing a write.
  616. writeRequests(reqs)
  617. return
  618. writeCase:
  619. go writeRequests(reqs)
  620. reqs = make([]*request, 0, 10)
  621. reqLen.Set(0)
  622. }
  623. }
  624. // batchSet applies a list of badger.Entry. If a request level error occurs it
  625. // will be returned.
  626. // Check(kv.BatchSet(entries))
  627. func (db *DB) batchSet(entries []*Entry) error {
  628. req, err := db.sendToWriteCh(entries)
  629. if err != nil {
  630. return err
  631. }
  632. return req.Wait()
  633. }
  634. // batchSetAsync is the asynchronous version of batchSet. It accepts a callback
  635. // function which is called when all the sets are complete. If a request level
  636. // error occurs, it will be passed back via the callback.
  637. // err := kv.BatchSetAsync(entries, func(err error)) {
  638. // Check(err)
  639. // }
  640. func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
  641. req, err := db.sendToWriteCh(entries)
  642. if err != nil {
  643. return err
  644. }
  645. go func() {
  646. err := req.Wait()
  647. // Write is complete. Let's call the callback function now.
  648. f(err)
  649. }()
  650. return nil
  651. }
  652. var errNoRoom = errors.New("No room for write")
  653. // ensureRoomForWrite is always called serially.
  654. func (db *DB) ensureRoomForWrite() error {
  655. var err error
  656. db.Lock()
  657. defer db.Unlock()
  658. if db.mt.MemSize() < db.opt.MaxTableSize {
  659. return nil
  660. }
  661. y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
  662. select {
  663. case db.flushChan <- flushTask{db.mt, db.vptr}:
  664. db.elog.Printf("Flushing value log to disk if async mode.")
  665. // Ensure value log is synced to disk so this memtable's contents wouldn't be lost.
  666. err = db.vlog.sync()
  667. if err != nil {
  668. return err
  669. }
  670. db.elog.Printf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
  671. db.mt.MemSize(), len(db.flushChan))
  672. // We manage to push this task. Let's modify imm.
  673. db.imm = append(db.imm, db.mt)
  674. db.mt = skl.NewSkiplist(arenaSize(db.opt))
  675. // New memtable is empty. We certainly have room.
  676. return nil
  677. default:
  678. // We need to do this to unlock and allow the flusher to modify imm.
  679. return errNoRoom
  680. }
  681. }
  682. func arenaSize(opt Options) int64 {
  683. return opt.MaxTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
  684. }
  685. // WriteLevel0Table flushes memtable.
  686. func writeLevel0Table(s *skl.Skiplist, f *os.File) error {
  687. iter := s.NewIterator()
  688. defer iter.Close()
  689. b := table.NewTableBuilder()
  690. defer b.Close()
  691. for iter.SeekToFirst(); iter.Valid(); iter.Next() {
  692. if err := b.Add(iter.Key(), iter.Value()); err != nil {
  693. return err
  694. }
  695. }
  696. _, err := f.Write(b.Finish())
  697. return err
  698. }
  699. type flushTask struct {
  700. mt *skl.Skiplist
  701. vptr valuePointer
  702. }
  703. // TODO: Ensure that this function doesn't return, or is handled by another wrapper function.
  704. // Otherwise, we would have no goroutine which can flush memtables.
  705. func (db *DB) flushMemtable(lc *y.Closer) error {
  706. defer lc.Done()
  707. for ft := range db.flushChan {
  708. if ft.mt == nil {
  709. return nil
  710. }
  711. if !ft.mt.Empty() {
  712. // Store badger head even if vptr is zero, need it for readTs
  713. db.elog.Printf("Storing offset: %+v\n", ft.vptr)
  714. offset := make([]byte, vptrSize)
  715. ft.vptr.Encode(offset)
  716. // Pick the max commit ts, so in case of crash, our read ts would be higher than all the
  717. // commits.
  718. headTs := y.KeyWithTs(head, db.orc.commitTs())
  719. ft.mt.Put(headTs, y.ValueStruct{Value: offset})
  720. }
  721. fileID := db.lc.reserveFileID()
  722. fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
  723. if err != nil {
  724. return y.Wrap(err)
  725. }
  726. // Don't block just to sync the directory entry.
  727. dirSyncCh := make(chan error)
  728. go func() { dirSyncCh <- syncDir(db.opt.Dir) }()
  729. err = writeLevel0Table(ft.mt, fd)
  730. dirSyncErr := <-dirSyncCh
  731. if err != nil {
  732. db.elog.Errorf("ERROR while writing to level 0: %v", err)
  733. return err
  734. }
  735. if dirSyncErr != nil {
  736. db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
  737. return err
  738. }
  739. tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode)
  740. if err != nil {
  741. db.elog.Printf("ERROR while opening table: %v", err)
  742. return err
  743. }
  744. // We own a ref on tbl.
  745. err = db.lc.addLevel0Table(tbl) // This will incrRef (if we don't error, sure)
  746. tbl.DecrRef() // Releases our ref.
  747. if err != nil {
  748. return err
  749. }
  750. // Update s.imm. Need a lock.
  751. db.Lock()
  752. // This is a single-threaded operation. ft.mt corresponds to the head of
  753. // db.imm list. Once we flush it, we advance db.imm. The next ft.mt
  754. // which would arrive here would match db.imm[0], because we acquire a
  755. // lock over DB when pushing to flushChan.
  756. // TODO: This logic is dirty AF. Any change and this could easily break.
  757. y.AssertTrue(ft.mt == db.imm[0])
  758. db.imm = db.imm[1:]
  759. ft.mt.DecrRef() // Return memory.
  760. db.Unlock()
  761. }
  762. return nil
  763. }
  764. func exists(path string) (bool, error) {
  765. _, err := os.Stat(path)
  766. if err == nil {
  767. return true, nil
  768. }
  769. if os.IsNotExist(err) {
  770. return false, nil
  771. }
  772. return true, err
  773. }
  774. // This function does a filewalk, calculates the size of vlog and sst files and stores it in
  775. // y.LSMSize and y.VlogSize.
  776. func (db *DB) calculateSize() {
  777. newInt := func(val int64) *expvar.Int {
  778. v := new(expvar.Int)
  779. v.Add(val)
  780. return v
  781. }
  782. totalSize := func(dir string) (int64, int64) {
  783. var lsmSize, vlogSize int64
  784. err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
  785. if err != nil {
  786. return err
  787. }
  788. ext := filepath.Ext(path)
  789. if ext == ".sst" {
  790. lsmSize += info.Size()
  791. } else if ext == ".vlog" {
  792. vlogSize += info.Size()
  793. }
  794. return nil
  795. })
  796. if err != nil {
  797. db.elog.Printf("Got error while calculating total size of directory: %s", dir)
  798. }
  799. return lsmSize, vlogSize
  800. }
  801. lsmSize, vlogSize := totalSize(db.opt.Dir)
  802. y.LSMSize.Set(db.opt.Dir, newInt(lsmSize))
  803. // If valueDir is different from dir, we'd have to do another walk.
  804. if db.opt.ValueDir != db.opt.Dir {
  805. _, vlogSize = totalSize(db.opt.ValueDir)
  806. }
  807. y.VlogSize.Set(db.opt.Dir, newInt(vlogSize))
  808. }
  809. func (db *DB) updateSize(lc *y.Closer) {
  810. defer lc.Done()
  811. metricsTicker := time.NewTicker(time.Minute)
  812. defer metricsTicker.Stop()
  813. for {
  814. select {
  815. case <-metricsTicker.C:
  816. db.calculateSize()
  817. case <-lc.HasBeenClosed():
  818. return
  819. }
  820. }
  821. }
  822. // RunValueLogGC triggers a value log garbage collection.
  823. //
  824. // It picks value log files to perform GC based on statistics that are collected
  825. // duing compactions. If no such statistics are available, then log files are
  826. // picked in random order. The process stops as soon as the first log file is
  827. // encountered which does not result in garbage collection.
  828. //
  829. // When a log file is picked, it is first sampled. If the sample shows that we
  830. // can discard at least discardRatio space of that file, it would be rewritten.
  831. //
  832. // If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
  833. // thrown indicating that the call resulted in no file rewrites.
  834. //
  835. // We recommend setting discardRatio to 0.5, thus indicating that a file be
  836. // rewritten if half the space can be discarded. This results in a lifetime
  837. // value log write amplification of 2 (1 from original write + 0.5 rewrite +
  838. // 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
  839. // space reclaims, while setting it to a lower value would result in more space
  840. // reclaims at the cost of increased activity on the LSM tree. discardRatio
  841. // must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
  842. // ErrInvalidRequest is returned.
  843. //
  844. // Only one GC is allowed at a time. If another value log GC is running, or DB
  845. // has been closed, this would return an ErrRejected.
  846. //
  847. // Note: Every time GC is run, it would produce a spike of activity on the LSM
  848. // tree.
  849. func (db *DB) RunValueLogGC(discardRatio float64) error {
  850. if discardRatio >= 1.0 || discardRatio <= 0.0 {
  851. return ErrInvalidRequest
  852. }
  853. // Find head on disk
  854. headKey := y.KeyWithTs(head, math.MaxUint64)
  855. // Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
  856. val, err := db.lc.get(headKey)
  857. if err != nil {
  858. return errors.Wrap(err, "Retrieving head from on-disk LSM")
  859. }
  860. var head valuePointer
  861. if len(val.Value) > 0 {
  862. head.Decode(val.Value)
  863. }
  864. // Pick a log file and run GC
  865. return db.vlog.runGC(discardRatio, head)
  866. }
  867. // Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
  868. // call RunValueLogGC.
  869. func (db *DB) Size() (lsm int64, vlog int64) {
  870. if y.LSMSize.Get(db.opt.Dir) == nil {
  871. lsm, vlog = 0, 0
  872. return
  873. }
  874. lsm = y.LSMSize.Get(db.opt.Dir).(*expvar.Int).Value()
  875. vlog = y.VlogSize.Get(db.opt.Dir).(*expvar.Int).Value()
  876. return
  877. }
  878. // Sequence represents a Badger sequence.
  879. type Sequence struct {
  880. sync.Mutex
  881. db *DB
  882. key []byte
  883. next uint64
  884. leased uint64
  885. bandwidth uint64
  886. }
  887. // Next would return the next integer in the sequence, updating the lease by running a transaction
  888. // if needed.
  889. func (seq *Sequence) Next() (uint64, error) {
  890. seq.Lock()
  891. defer seq.Unlock()
  892. if seq.next >= seq.leased {
  893. if err := seq.updateLease(); err != nil {
  894. return 0, err
  895. }
  896. }
  897. val := seq.next
  898. seq.next++
  899. return val, nil
  900. }
  901. // Release the leased sequence to avoid wasted integers. This should be done right
  902. // before closing the associated DB. However it is valid to use the sequence after
  903. // it was released, causing a new lease with full bandwidth.
  904. func (seq *Sequence) Release() error {
  905. seq.Lock()
  906. defer seq.Unlock()
  907. err := seq.db.Update(func(txn *Txn) error {
  908. var buf [8]byte
  909. binary.BigEndian.PutUint64(buf[:], seq.next)
  910. return txn.Set(seq.key, buf[:])
  911. })
  912. if err != nil {
  913. return err
  914. }
  915. seq.leased = seq.next
  916. return nil
  917. }
  918. func (seq *Sequence) updateLease() error {
  919. return seq.db.Update(func(txn *Txn) error {
  920. item, err := txn.Get(seq.key)
  921. if err == ErrKeyNotFound {
  922. seq.next = 0
  923. } else if err != nil {
  924. return err
  925. } else {
  926. val, err := item.Value()
  927. if err != nil {
  928. return err
  929. }
  930. num := binary.BigEndian.Uint64(val)
  931. seq.next = num
  932. }
  933. lease := seq.next + seq.bandwidth
  934. var buf [8]byte
  935. binary.BigEndian.PutUint64(buf[:], lease)
  936. if err = txn.Set(seq.key, buf[:]); err != nil {
  937. return err
  938. }
  939. seq.leased = lease
  940. return nil
  941. })
  942. }
  943. // GetSequence would initiate a new sequence object, generating it from the stored lease, if
  944. // available, in the database. Sequence can be used to get a list of monotonically increasing
  945. // integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
  946. // size of the lease, determining how many Next() requests can be served from memory.
  947. func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
  948. switch {
  949. case len(key) == 0:
  950. return nil, ErrEmptyKey
  951. case bandwidth == 0:
  952. return nil, ErrZeroBandwidth
  953. }
  954. seq := &Sequence{
  955. db: db,
  956. key: key,
  957. next: 0,
  958. leased: 0,
  959. bandwidth: bandwidth,
  960. }
  961. err := seq.updateLease()
  962. return seq, err
  963. }
  964. func (db *DB) Tables() []TableInfo {
  965. return db.lc.getTableInfo()
  966. }
  967. // MergeOperator represents a Badger merge operator.
  968. type MergeOperator struct {
  969. sync.RWMutex
  970. f MergeFunc
  971. db *DB
  972. key []byte
  973. closer *y.Closer
  974. }
  975. // MergeFunc accepts two byte slices, one representing an existing value, and
  976. // another representing a new value that needs to be ‘merged’ into it. MergeFunc
  977. // contains the logic to perform the ‘merge’ and return an updated value.
  978. // MergeFunc could perform operations like integer addition, list appends etc.
  979. // Note that the ordering of the operands is unspecified, so the merge func
  980. // should either be agnostic to ordering or do additional handling if ordering
  981. // is required.
  982. type MergeFunc func(existing, val []byte) []byte
  983. // GetMergeOperator creates a new MergeOperator for a given key and returns a
  984. // pointer to it. It also fires off a goroutine that performs a compaction using
  985. // the merge function that runs periodically, as specified by dur.
  986. func (db *DB) GetMergeOperator(key []byte,
  987. f MergeFunc, dur time.Duration) *MergeOperator {
  988. op := &MergeOperator{
  989. f: f,
  990. db: db,
  991. key: key,
  992. closer: y.NewCloser(1),
  993. }
  994. go op.runCompactions(dur)
  995. return op
  996. }
  997. var errNoMerge = errors.New("No need for merge")
  998. func (op *MergeOperator) iterateAndMerge(txn *Txn) (val []byte, err error) {
  999. opt := DefaultIteratorOptions
  1000. opt.AllVersions = true
  1001. it := txn.NewIterator(opt)
  1002. defer it.Close()
  1003. var numVersions int
  1004. for it.Rewind(); it.ValidForPrefix(op.key); it.Next() {
  1005. item := it.Item()
  1006. numVersions++
  1007. if numVersions == 1 {
  1008. val, err = item.ValueCopy(val)
  1009. if err != nil {
  1010. return nil, err
  1011. }
  1012. } else {
  1013. newVal, err := item.Value()
  1014. if err != nil {
  1015. return nil, err
  1016. }
  1017. val = op.f(val, newVal)
  1018. }
  1019. if item.DiscardEarlierVersions() {
  1020. break
  1021. }
  1022. }
  1023. if numVersions == 0 {
  1024. return nil, ErrKeyNotFound
  1025. } else if numVersions == 1 {
  1026. return val, errNoMerge
  1027. }
  1028. return val, nil
  1029. }
  1030. func (op *MergeOperator) compact() error {
  1031. op.Lock()
  1032. defer op.Unlock()
  1033. err := op.db.Update(func(txn *Txn) error {
  1034. var (
  1035. val []byte
  1036. err error
  1037. )
  1038. val, err = op.iterateAndMerge(txn)
  1039. if err != nil {
  1040. return err
  1041. }
  1042. // Write value back to db
  1043. if err := txn.SetWithDiscard(op.key, val, 0); err != nil {
  1044. return err
  1045. }
  1046. return nil
  1047. })
  1048. if err == ErrKeyNotFound || err == errNoMerge {
  1049. // pass.
  1050. } else if err != nil {
  1051. return err
  1052. }
  1053. return nil
  1054. }
  1055. func (op *MergeOperator) runCompactions(dur time.Duration) {
  1056. ticker := time.NewTicker(dur)
  1057. defer op.closer.Done()
  1058. var stop bool
  1059. for {
  1060. select {
  1061. case <-op.closer.HasBeenClosed():
  1062. stop = true
  1063. case <-ticker.C: // wait for tick
  1064. }
  1065. if err := op.compact(); err != nil {
  1066. log.Printf("Error while running merge operation: %s", err)
  1067. }
  1068. if stop {
  1069. ticker.Stop()
  1070. break
  1071. }
  1072. }
  1073. }
  1074. // Add records a value in Badger which will eventually be merged by a background
  1075. // routine into the values that were recorded by previous invocations to Add().
  1076. func (op *MergeOperator) Add(val []byte) error {
  1077. return op.db.Update(func(txn *Txn) error {
  1078. return txn.Set(op.key, val)
  1079. })
  1080. }
  1081. // Get returns the latest value for the merge operator, which is derived by
  1082. // applying the merge function to all the values added so far.
  1083. //
  1084. // If Add has not been called even once, Get will return ErrKeyNotFound.
  1085. func (op *MergeOperator) Get() ([]byte, error) {
  1086. op.RLock()
  1087. defer op.RUnlock()
  1088. var existing []byte
  1089. err := op.db.View(func(txn *Txn) (err error) {
  1090. existing, err = op.iterateAndMerge(txn)
  1091. return err
  1092. })
  1093. if err == errNoMerge {
  1094. return existing, nil
  1095. }
  1096. return existing, err
  1097. }
  1098. // Stop waits for any pending merge to complete and then stops the background
  1099. // goroutine.
  1100. func (op *MergeOperator) Stop() {
  1101. op.closer.SignalAndWait()
  1102. }