transaction.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bytes"
  19. "math"
  20. "sort"
  21. "strconv"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. "github.com/dgraph-io/badger/y"
  26. farm "github.com/dgryski/go-farm"
  27. "github.com/pkg/errors"
  28. )
  29. type oracle struct {
  30. // curRead must be at the top for memory alignment. See issue #311.
  31. curRead uint64 // Managed by the mutex.
  32. refCount int64
  33. isManaged bool // Does not change value, so no locking required.
  34. sync.Mutex
  35. writeLock sync.Mutex
  36. nextCommit uint64
  37. // Either of these is used to determine which versions can be permanently
  38. // discarded during compaction.
  39. discardTs uint64 // Used by ManagedDB.
  40. readMark y.WaterMark // Used by DB.
  41. // commits stores a key fingerprint and latest commit counter for it.
  42. // refCount is used to clear out commits map to avoid a memory blowup.
  43. commits map[uint64]uint64
  44. }
  45. func (o *oracle) addRef() {
  46. atomic.AddInt64(&o.refCount, 1)
  47. }
  48. func (o *oracle) decrRef() {
  49. if count := atomic.AddInt64(&o.refCount, -1); count == 0 {
  50. // Clear out commits maps to release memory.
  51. o.Lock()
  52. // Avoids the race where something new is added to commitsMap
  53. // after we check refCount and before we take Lock.
  54. if atomic.LoadInt64(&o.refCount) != 0 {
  55. o.Unlock()
  56. return
  57. }
  58. if len(o.commits) >= 1000 { // If the map is still small, let it slide.
  59. o.commits = make(map[uint64]uint64)
  60. }
  61. o.Unlock()
  62. }
  63. }
  64. func (o *oracle) readTs() uint64 {
  65. if o.isManaged {
  66. return math.MaxUint64
  67. }
  68. return atomic.LoadUint64(&o.curRead)
  69. }
  70. func (o *oracle) commitTs() uint64 {
  71. o.Lock()
  72. defer o.Unlock()
  73. return o.nextCommit
  74. }
  75. // Any deleted or invalid versions at or below ts would be discarded during
  76. // compaction to reclaim disk space in LSM tree and thence value log.
  77. func (o *oracle) setDiscardTs(ts uint64) {
  78. o.Lock()
  79. defer o.Unlock()
  80. o.discardTs = ts
  81. }
  82. func (o *oracle) discardAtOrBelow() uint64 {
  83. if o.isManaged {
  84. o.Lock()
  85. defer o.Unlock()
  86. return o.discardTs
  87. }
  88. return o.readMark.MinReadTs()
  89. }
  90. // hasConflict must be called while having a lock.
  91. func (o *oracle) hasConflict(txn *Txn) bool {
  92. if len(txn.reads) == 0 {
  93. return false
  94. }
  95. for _, ro := range txn.reads {
  96. if ts, has := o.commits[ro]; has && ts > txn.readTs {
  97. return true
  98. }
  99. }
  100. return false
  101. }
  102. func (o *oracle) newCommitTs(txn *Txn) uint64 {
  103. o.Lock()
  104. defer o.Unlock()
  105. if o.hasConflict(txn) {
  106. return 0
  107. }
  108. var ts uint64
  109. if !o.isManaged {
  110. // This is the general case, when user doesn't specify the read and commit ts.
  111. ts = o.nextCommit
  112. o.nextCommit++
  113. } else {
  114. // If commitTs is set, use it instead.
  115. ts = txn.commitTs
  116. }
  117. for _, w := range txn.writes {
  118. o.commits[w] = ts // Update the commitTs.
  119. }
  120. return ts
  121. }
  122. func (o *oracle) doneCommit(cts uint64) {
  123. if o.isManaged {
  124. // No need to update anything.
  125. return
  126. }
  127. for {
  128. curRead := atomic.LoadUint64(&o.curRead)
  129. if cts <= curRead {
  130. return
  131. }
  132. atomic.CompareAndSwapUint64(&o.curRead, curRead, cts)
  133. }
  134. }
  135. // Txn represents a Badger transaction.
  136. type Txn struct {
  137. readTs uint64
  138. commitTs uint64
  139. update bool // update is used to conditionally keep track of reads.
  140. reads []uint64 // contains fingerprints of keys read.
  141. writes []uint64 // contains fingerprints of keys written.
  142. pendingWrites map[string]*Entry // cache stores any writes done by txn.
  143. db *DB
  144. callbacks []func()
  145. discarded bool
  146. size int64
  147. count int64
  148. numIterators int32
  149. }
  150. type pendingWritesIterator struct {
  151. entries []*Entry
  152. nextIdx int
  153. readTs uint64
  154. reversed bool
  155. }
  156. func (pi *pendingWritesIterator) Next() {
  157. pi.nextIdx++
  158. }
  159. func (pi *pendingWritesIterator) Rewind() {
  160. pi.nextIdx = 0
  161. }
  162. func (pi *pendingWritesIterator) Seek(key []byte) {
  163. key = y.ParseKey(key)
  164. pi.nextIdx = sort.Search(len(pi.entries), func(idx int) bool {
  165. cmp := bytes.Compare(pi.entries[idx].Key, key)
  166. if !pi.reversed {
  167. return cmp >= 0
  168. }
  169. return cmp <= 0
  170. })
  171. }
  172. func (pi *pendingWritesIterator) Key() []byte {
  173. y.AssertTrue(pi.Valid())
  174. entry := pi.entries[pi.nextIdx]
  175. return y.KeyWithTs(entry.Key, pi.readTs)
  176. }
  177. func (pi *pendingWritesIterator) Value() y.ValueStruct {
  178. y.AssertTrue(pi.Valid())
  179. entry := pi.entries[pi.nextIdx]
  180. return y.ValueStruct{
  181. Value: entry.Value,
  182. Meta: entry.meta,
  183. UserMeta: entry.UserMeta,
  184. ExpiresAt: entry.ExpiresAt,
  185. Version: pi.readTs,
  186. }
  187. }
  188. func (pi *pendingWritesIterator) Valid() bool {
  189. return pi.nextIdx < len(pi.entries)
  190. }
  191. func (pi *pendingWritesIterator) Close() error {
  192. return nil
  193. }
  194. func (txn *Txn) newPendingWritesIterator(reversed bool) *pendingWritesIterator {
  195. if !txn.update || len(txn.pendingWrites) == 0 {
  196. return nil
  197. }
  198. entries := make([]*Entry, 0, len(txn.pendingWrites))
  199. for _, e := range txn.pendingWrites {
  200. entries = append(entries, e)
  201. }
  202. // Number of pending writes per transaction shouldn't be too big in general.
  203. sort.Slice(entries, func(i, j int) bool {
  204. cmp := bytes.Compare(entries[i].Key, entries[j].Key)
  205. if !reversed {
  206. return cmp < 0
  207. }
  208. return cmp > 0
  209. })
  210. return &pendingWritesIterator{
  211. readTs: txn.readTs,
  212. entries: entries,
  213. reversed: reversed,
  214. }
  215. }
  216. func (txn *Txn) checkSize(e *Entry) error {
  217. count := txn.count + 1
  218. // Extra bytes for version in key.
  219. size := txn.size + int64(e.estimateSize(txn.db.opt.ValueThreshold)) + 10
  220. if count >= txn.db.opt.maxBatchCount || size >= txn.db.opt.maxBatchSize {
  221. return ErrTxnTooBig
  222. }
  223. txn.count, txn.size = count, size
  224. return nil
  225. }
  226. // Set adds a key-value pair to the database.
  227. //
  228. // It will return ErrReadOnlyTxn if update flag was set to false when creating the
  229. // transaction.
  230. //
  231. // The current transaction keeps a reference to the key and val byte slice
  232. // arguments. Users must not modify key and val until the end of the transaction.
  233. func (txn *Txn) Set(key, val []byte) error {
  234. e := &Entry{
  235. Key: key,
  236. Value: val,
  237. }
  238. return txn.SetEntry(e)
  239. }
  240. // SetWithMeta adds a key-value pair to the database, along with a metadata
  241. // byte.
  242. //
  243. // This byte is stored alongside the key, and can be used as an aid to
  244. // interpret the value or store other contextual bits corresponding to the
  245. // key-value pair.
  246. //
  247. // The current transaction keeps a reference to the key and val byte slice
  248. // arguments. Users must not modify key and val until the end of the transaction.
  249. func (txn *Txn) SetWithMeta(key, val []byte, meta byte) error {
  250. e := &Entry{Key: key, Value: val, UserMeta: meta}
  251. return txn.SetEntry(e)
  252. }
  253. // SetWithDiscard acts like SetWithMeta, but adds a marker to discard earlier
  254. // versions of the key.
  255. //
  256. // This method is only useful if you have set a higher limit for
  257. // options.NumVersionsToKeep. The default setting is 1, in which case, this
  258. // function doesn't add any more benefit than just calling the normal
  259. // SetWithMeta (or Set) function. If however, you have a higher setting for
  260. // NumVersionsToKeep (in Dgraph, we set it to infinity), you can use this method
  261. // to indicate that all the older versions can be discarded and removed during
  262. // compactions.
  263. //
  264. // The current transaction keeps a reference to the key and val byte slice
  265. // arguments. Users must not modify key and val until the end of the
  266. // transaction.
  267. func (txn *Txn) SetWithDiscard(key, val []byte, meta byte) error {
  268. e := &Entry{
  269. Key: key,
  270. Value: val,
  271. UserMeta: meta,
  272. meta: bitDiscardEarlierVersions,
  273. }
  274. return txn.SetEntry(e)
  275. }
  276. // SetWithTTL adds a key-value pair to the database, along with a time-to-live
  277. // (TTL) setting. A key stored with a TTL would automatically expire after the
  278. // time has elapsed , and be eligible for garbage collection.
  279. //
  280. // The current transaction keeps a reference to the key and val byte slice
  281. // arguments. Users must not modify key and val until the end of the
  282. // transaction.
  283. func (txn *Txn) SetWithTTL(key, val []byte, dur time.Duration) error {
  284. expire := time.Now().Add(dur).Unix()
  285. e := &Entry{Key: key, Value: val, ExpiresAt: uint64(expire)}
  286. return txn.SetEntry(e)
  287. }
  288. func (txn *Txn) modify(e *Entry) error {
  289. if !txn.update {
  290. return ErrReadOnlyTxn
  291. } else if txn.discarded {
  292. return ErrDiscardedTxn
  293. } else if len(e.Key) == 0 {
  294. return ErrEmptyKey
  295. } else if len(e.Key) > maxKeySize {
  296. return exceedsMaxKeySizeError(e.Key)
  297. } else if int64(len(e.Value)) > txn.db.opt.ValueLogFileSize {
  298. return exceedsMaxValueSizeError(e.Value, txn.db.opt.ValueLogFileSize)
  299. }
  300. if err := txn.checkSize(e); err != nil {
  301. return err
  302. }
  303. fp := farm.Fingerprint64(e.Key) // Avoid dealing with byte arrays.
  304. txn.writes = append(txn.writes, fp)
  305. txn.pendingWrites[string(e.Key)] = e
  306. return nil
  307. }
  308. // SetEntry takes an Entry struct and adds the key-value pair in the struct,
  309. // along with other metadata to the database.
  310. //
  311. // The current transaction keeps a reference to the entry passed in argument.
  312. // Users must not modify the entry until the end of the transaction.
  313. func (txn *Txn) SetEntry(e *Entry) error {
  314. return txn.modify(e)
  315. }
  316. // Delete deletes a key.
  317. //
  318. // This is done by adding a delete marker for the key at commit timestamp. Any
  319. // reads happening before this timestamp would be unaffected. Any reads after
  320. // this commit would see the deletion.
  321. //
  322. // The current transaction keeps a reference to the key byte slice argument.
  323. // Users must not modify the key until the end of the transaction.
  324. func (txn *Txn) Delete(key []byte) error {
  325. e := &Entry{
  326. Key: key,
  327. meta: bitDelete,
  328. }
  329. return txn.modify(e)
  330. }
  331. // Get looks for key and returns corresponding Item.
  332. // If key is not found, ErrKeyNotFound is returned.
  333. func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
  334. if len(key) == 0 {
  335. return nil, ErrEmptyKey
  336. } else if txn.discarded {
  337. return nil, ErrDiscardedTxn
  338. }
  339. item = new(Item)
  340. if txn.update {
  341. if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) {
  342. if isDeletedOrExpired(e.meta, e.ExpiresAt) {
  343. return nil, ErrKeyNotFound
  344. }
  345. // Fulfill from cache.
  346. item.meta = e.meta
  347. item.val = e.Value
  348. item.userMeta = e.UserMeta
  349. item.key = key
  350. item.status = prefetched
  351. item.version = txn.readTs
  352. item.expiresAt = e.ExpiresAt
  353. // We probably don't need to set db on item here.
  354. return item, nil
  355. }
  356. // Only track reads if this is update txn. No need to track read if txn serviced it
  357. // internally.
  358. fp := farm.Fingerprint64(key)
  359. txn.reads = append(txn.reads, fp)
  360. }
  361. seek := y.KeyWithTs(key, txn.readTs)
  362. vs, err := txn.db.get(seek)
  363. if err != nil {
  364. return nil, errors.Wrapf(err, "DB::Get key: %q", key)
  365. }
  366. if vs.Value == nil && vs.Meta == 0 {
  367. return nil, ErrKeyNotFound
  368. }
  369. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  370. return nil, ErrKeyNotFound
  371. }
  372. item.key = key
  373. item.version = vs.Version
  374. item.meta = vs.Meta
  375. item.userMeta = vs.UserMeta
  376. item.db = txn.db
  377. item.vptr = vs.Value
  378. item.txn = txn
  379. item.expiresAt = vs.ExpiresAt
  380. return item, nil
  381. }
  382. func (txn *Txn) runCallbacks() {
  383. for _, cb := range txn.callbacks {
  384. cb()
  385. }
  386. txn.callbacks = txn.callbacks[:0]
  387. }
  388. // Discard discards a created transaction. This method is very important and must be called. Commit
  389. // method calls this internally, however, calling this multiple times doesn't cause any issues. So,
  390. // this can safely be called via a defer right when transaction is created.
  391. //
  392. // NOTE: If any operations are run on a discarded transaction, ErrDiscardedTxn is returned.
  393. func (txn *Txn) Discard() {
  394. if txn.discarded { // Avoid a re-run.
  395. return
  396. }
  397. if atomic.LoadInt32(&txn.numIterators) > 0 {
  398. panic("Unclosed iterator at time of Txn.Discard.")
  399. }
  400. txn.discarded = true
  401. txn.db.orc.readMark.Done(txn.readTs)
  402. txn.runCallbacks()
  403. if txn.update {
  404. txn.db.orc.decrRef()
  405. }
  406. }
  407. // Commit commits the transaction, following these steps:
  408. //
  409. // 1. If there are no writes, return immediately.
  410. //
  411. // 2. Check if read rows were updated since txn started. If so, return ErrConflict.
  412. //
  413. // 3. If no conflict, generate a commit timestamp and update written rows' commit ts.
  414. //
  415. // 4. Batch up all writes, write them to value log and LSM tree.
  416. //
  417. // 5. If callback is provided, Badger will return immediately after checking
  418. // for conflicts. Writes to the database will happen in the background. If
  419. // there is a conflict, an error will be returned and the callback will not
  420. // run. If there are no conflicts, the callback will be called in the
  421. // background upon successful completion of writes or any error during write.
  422. //
  423. // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
  424. // tree won't be updated, so there's no need for any rollback.
  425. func (txn *Txn) Commit(callback func(error)) error {
  426. if txn.commitTs == 0 && txn.db.opt.managedTxns {
  427. return ErrManagedTxn
  428. }
  429. if txn.discarded {
  430. return ErrDiscardedTxn
  431. }
  432. defer txn.Discard()
  433. if len(txn.writes) == 0 {
  434. return nil // Nothing to do.
  435. }
  436. state := txn.db.orc
  437. state.writeLock.Lock()
  438. commitTs := state.newCommitTs(txn)
  439. if commitTs == 0 {
  440. state.writeLock.Unlock()
  441. return ErrConflict
  442. }
  443. entries := make([]*Entry, 0, len(txn.pendingWrites)+1)
  444. for _, e := range txn.pendingWrites {
  445. // Suffix the keys with commit ts, so the key versions are sorted in
  446. // descending order of commit timestamp.
  447. e.Key = y.KeyWithTs(e.Key, commitTs)
  448. e.meta |= bitTxn
  449. entries = append(entries, e)
  450. }
  451. e := &Entry{
  452. Key: y.KeyWithTs(txnKey, commitTs),
  453. Value: []byte(strconv.FormatUint(commitTs, 10)),
  454. meta: bitFinTxn,
  455. }
  456. entries = append(entries, e)
  457. req, err := txn.db.sendToWriteCh(entries)
  458. state.writeLock.Unlock()
  459. if err != nil {
  460. return err
  461. }
  462. // Need to release all locks or writes can get deadlocked.
  463. txn.runCallbacks()
  464. if callback == nil {
  465. // If batchSet failed, LSM would not have been updated. So, no need to rollback anything.
  466. // TODO: What if some of the txns successfully make it to value log, but others fail.
  467. // Nothing gets updated to LSM, until a restart happens.
  468. defer state.doneCommit(commitTs)
  469. return req.Wait()
  470. }
  471. go func() {
  472. err := req.Wait()
  473. // Write is complete. Let's call the callback function now.
  474. state.doneCommit(commitTs)
  475. callback(err)
  476. }()
  477. return nil
  478. }
  479. // NewTransaction creates a new transaction. Badger supports concurrent execution of transactions,
  480. // providing serializable snapshot isolation, avoiding write skews. Badger achieves this by tracking
  481. // the keys read and at Commit time, ensuring that these read keys weren't concurrently modified by
  482. // another transaction.
  483. //
  484. // For read-only transactions, set update to false. In this mode, we don't track the rows read for
  485. // any changes. Thus, any long running iterations done in this mode wouldn't pay this overhead.
  486. //
  487. // Running transactions concurrently is OK. However, a transaction itself isn't thread safe, and
  488. // should only be run serially. It doesn't matter if a transaction is created by one goroutine and
  489. // passed down to other, as long as the Txn APIs are called serially.
  490. //
  491. // When you create a new transaction, it is absolutely essential to call
  492. // Discard(). This should be done irrespective of what the update param is set
  493. // to. Commit API internally runs Discard, but running it twice wouldn't cause
  494. // any issues.
  495. //
  496. // txn := db.NewTransaction(false)
  497. // defer txn.Discard()
  498. // // Call various APIs.
  499. func (db *DB) NewTransaction(update bool) *Txn {
  500. if db.opt.ReadOnly && update {
  501. // DB is read-only, force read-only transaction.
  502. update = false
  503. }
  504. txn := &Txn{
  505. update: update,
  506. db: db,
  507. readTs: db.orc.readTs(),
  508. count: 1, // One extra entry for BitFin.
  509. size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
  510. }
  511. db.orc.readMark.Begin(txn.readTs)
  512. if update {
  513. txn.pendingWrites = make(map[string]*Entry)
  514. txn.db.orc.addRef()
  515. }
  516. return txn
  517. }
  518. // View executes a function creating and managing a read-only transaction for the user. Error
  519. // returned by the function is relayed by the View method.
  520. func (db *DB) View(fn func(txn *Txn) error) error {
  521. if db.opt.managedTxns {
  522. return ErrManagedTxn
  523. }
  524. txn := db.NewTransaction(false)
  525. defer txn.Discard()
  526. return fn(txn)
  527. }
  528. // Update executes a function, creating and managing a read-write transaction
  529. // for the user. Error returned by the function is relayed by the Update method.
  530. func (db *DB) Update(fn func(txn *Txn) error) error {
  531. if db.opt.managedTxns {
  532. return ErrManagedTxn
  533. }
  534. txn := db.NewTransaction(true)
  535. defer txn.Discard()
  536. if err := fn(txn); err != nil {
  537. return err
  538. }
  539. return txn.Commit(nil)
  540. }