watermark.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. /*
  2. * Copyright 2018 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package y
  17. import (
  18. "container/heap"
  19. "sync/atomic"
  20. "golang.org/x/net/trace"
  21. )
  22. type uint64Heap []uint64
  23. func (u uint64Heap) Len() int { return len(u) }
  24. func (u uint64Heap) Less(i int, j int) bool { return u[i] < u[j] }
  25. func (u uint64Heap) Swap(i int, j int) { u[i], u[j] = u[j], u[i] }
  26. func (u *uint64Heap) Push(x interface{}) { *u = append(*u, x.(uint64)) }
  27. func (u *uint64Heap) Pop() interface{} {
  28. old := *u
  29. n := len(old)
  30. x := old[n-1]
  31. *u = old[0 : n-1]
  32. return x
  33. }
  34. type mark struct {
  35. readTs uint64
  36. done bool // Set to true if the pending mutation is done.
  37. }
  38. type WaterMark struct {
  39. markCh chan mark
  40. minReadTs uint64
  41. elog trace.EventLog
  42. }
  43. // Init initializes a WaterMark struct. MUST be called before using it.
  44. func (w *WaterMark) Init() {
  45. w.markCh = make(chan mark, 1000)
  46. w.elog = trace.NewEventLog("Badger", "MinReadTs")
  47. go w.process()
  48. }
  49. func (w *WaterMark) Begin(readTs uint64) {
  50. w.markCh <- mark{readTs: readTs, done: false}
  51. }
  52. func (w *WaterMark) Done(readTs uint64) {
  53. w.markCh <- mark{readTs: readTs, done: true}
  54. }
  55. // DoneUntil returns the maximum index until which all tasks are done.
  56. func (w *WaterMark) MinReadTs() uint64 {
  57. return atomic.LoadUint64(&w.minReadTs)
  58. }
  59. // process is used to process the Mark channel. This is not thread-safe,
  60. // so only run one goroutine for process. One is sufficient, because
  61. // all ops in the goroutine use only memory and cpu.
  62. func (w *WaterMark) process() {
  63. var reads uint64Heap
  64. // pending maps raft proposal index to the number of pending mutations for this proposal.
  65. pending := make(map[uint64]int)
  66. heap.Init(&reads)
  67. var loop uint64
  68. processOne := func(readTs uint64, done bool) {
  69. // If not already done, then set. Otherwise, don't undo a done entry.
  70. prev, present := pending[readTs]
  71. if !present {
  72. heap.Push(&reads, readTs)
  73. }
  74. delta := 1
  75. if done {
  76. delta = -1
  77. }
  78. pending[readTs] = prev + delta
  79. loop++
  80. if len(reads) > 0 && loop%1000 == 0 {
  81. min := reads[0]
  82. w.elog.Printf("ReadTs: %4d. Size: %4d MinReadTs: %-4d Looking for: %-4d. Value: %d\n",
  83. readTs, len(reads), w.MinReadTs(), min, pending[min])
  84. }
  85. // Update mark by going through all reads in order; and checking if they have
  86. // been done. Stop at the first readTs, which isn't done.
  87. minReadTs := w.MinReadTs()
  88. // Don't assert that minReadTs < readTs, to avoid any inconsistencies caused by managed
  89. // transactions, or testing where we explicitly set the readTs for transactions like in
  90. // TestTxnVersions.
  91. until := minReadTs
  92. loops := 0
  93. for len(reads) > 0 {
  94. min := reads[0]
  95. if done := pending[min]; done != 0 {
  96. break // len(reads) will be > 0.
  97. }
  98. heap.Pop(&reads)
  99. delete(pending, min)
  100. until = min
  101. loops++
  102. }
  103. if until != minReadTs {
  104. AssertTrue(atomic.CompareAndSwapUint64(&w.minReadTs, minReadTs, until))
  105. w.elog.Printf("MinReadTs: %d. Loops: %d\n", until, loops)
  106. }
  107. }
  108. for mark := range w.markCh {
  109. if mark.readTs > 0 {
  110. processOne(mark.readTs, mark.done)
  111. }
  112. }
  113. }