y.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package y
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. "hash/crc32"
  21. "math"
  22. "os"
  23. "sync"
  24. "github.com/pkg/errors"
  25. )
  26. // ErrEOF indicates an end of file when trying to read from a memory mapped file
  27. // and encountering the end of slice.
  28. var ErrEOF = errors.New("End of mapped region")
  29. const (
  30. // Sync indicates that O_DSYNC should be set on the underlying file,
  31. // ensuring that data writes do not return until the data is flushed
  32. // to disk.
  33. Sync = 1 << iota
  34. // ReadOnly opens the underlying file on a read-only basis.
  35. ReadOnly
  36. )
  37. var (
  38. // This is O_DSYNC (datasync) on platforms that support it -- see file_unix.go
  39. datasyncFileFlag = 0x0
  40. // CastagnoliCrcTable is a CRC32 polynomial table
  41. CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli)
  42. )
  43. // OpenExistingFile opens an existing file, errors if it doesn't exist.
  44. func OpenExistingFile(filename string, flags uint32) (*os.File, error) {
  45. openFlags := os.O_RDWR
  46. if flags&ReadOnly != 0 {
  47. openFlags = os.O_RDONLY
  48. }
  49. if flags&Sync != 0 {
  50. openFlags |= datasyncFileFlag
  51. }
  52. return os.OpenFile(filename, openFlags, 0)
  53. }
  54. // CreateSyncedFile creates a new file (using O_EXCL), errors if it already existed.
  55. func CreateSyncedFile(filename string, sync bool) (*os.File, error) {
  56. flags := os.O_RDWR | os.O_CREATE | os.O_EXCL
  57. if sync {
  58. flags |= datasyncFileFlag
  59. }
  60. return os.OpenFile(filename, flags, 0666)
  61. }
  62. // OpenSyncedFile creates the file if one doesn't exist.
  63. func OpenSyncedFile(filename string, sync bool) (*os.File, error) {
  64. flags := os.O_RDWR | os.O_CREATE
  65. if sync {
  66. flags |= datasyncFileFlag
  67. }
  68. return os.OpenFile(filename, flags, 0666)
  69. }
  70. // OpenTruncFile opens the file with O_RDWR | O_CREATE | O_TRUNC
  71. func OpenTruncFile(filename string, sync bool) (*os.File, error) {
  72. flags := os.O_RDWR | os.O_CREATE | os.O_TRUNC
  73. if sync {
  74. flags |= datasyncFileFlag
  75. }
  76. return os.OpenFile(filename, flags, 0666)
  77. }
  78. // SafeCopy does append(a[:0], src...).
  79. func SafeCopy(a []byte, src []byte) []byte {
  80. return append(a[:0], src...)
  81. }
  82. // Copy copies a byte slice and returns the copied slice.
  83. func Copy(a []byte) []byte {
  84. b := make([]byte, len(a))
  85. copy(b, a)
  86. return b
  87. }
  88. // KeyWithTs generates a new key by appending ts to key.
  89. func KeyWithTs(key []byte, ts uint64) []byte {
  90. out := make([]byte, len(key)+8)
  91. copy(out, key)
  92. binary.BigEndian.PutUint64(out[len(key):], math.MaxUint64-ts)
  93. return out
  94. }
  95. // ParseTs parses the timestamp from the key bytes.
  96. func ParseTs(key []byte) uint64 {
  97. if len(key) <= 8 {
  98. return 0
  99. }
  100. return math.MaxUint64 - binary.BigEndian.Uint64(key[len(key)-8:])
  101. }
  102. // CompareKeys checks the key without timestamp and checks the timestamp if keyNoTs
  103. // is same.
  104. // a<timestamp> would be sorted higher than aa<timestamp> if we use bytes.compare
  105. // All keys should have timestamp.
  106. func CompareKeys(key1 []byte, key2 []byte) int {
  107. AssertTrue(len(key1) > 8 && len(key2) > 8)
  108. if cmp := bytes.Compare(key1[:len(key1)-8], key2[:len(key2)-8]); cmp != 0 {
  109. return cmp
  110. }
  111. return bytes.Compare(key1[len(key1)-8:], key2[len(key2)-8:])
  112. }
  113. // ParseKey parses the actual key from the key bytes.
  114. func ParseKey(key []byte) []byte {
  115. if key == nil {
  116. return nil
  117. }
  118. AssertTruef(len(key) > 8, "key=%q", key)
  119. return key[:len(key)-8]
  120. }
  121. // SameKey checks for key equality ignoring the version timestamp suffix.
  122. func SameKey(src, dst []byte) bool {
  123. if len(src) != len(dst) {
  124. return false
  125. }
  126. return bytes.Equal(ParseKey(src), ParseKey(dst))
  127. }
  128. // Slice holds a reusable buf, will reallocate if you request a larger size than ever before.
  129. // One problem is with n distinct sizes in random order it'll reallocate log(n) times.
  130. type Slice struct {
  131. buf []byte
  132. }
  133. // Resize reuses the Slice's buffer (or makes a new one) and returns a slice in that buffer of
  134. // length sz.
  135. func (s *Slice) Resize(sz int) []byte {
  136. if cap(s.buf) < sz {
  137. s.buf = make([]byte, sz)
  138. }
  139. return s.buf[0:sz]
  140. }
  141. // Closer holds the two things we need to close a goroutine and wait for it to finish: a chan
  142. // to tell the goroutine to shut down, and a WaitGroup with which to wait for it to finish shutting
  143. // down.
  144. type Closer struct {
  145. closed chan struct{}
  146. waiting sync.WaitGroup
  147. }
  148. // NewCloser constructs a new Closer, with an initial count on the WaitGroup.
  149. func NewCloser(initial int) *Closer {
  150. ret := &Closer{closed: make(chan struct{})}
  151. ret.waiting.Add(initial)
  152. return ret
  153. }
  154. // AddRunning Add()'s delta to the WaitGroup.
  155. func (lc *Closer) AddRunning(delta int) {
  156. lc.waiting.Add(delta)
  157. }
  158. // Signal signals the HasBeenClosed signal.
  159. func (lc *Closer) Signal() {
  160. close(lc.closed)
  161. }
  162. // HasBeenClosed gets signaled when Signal() is called.
  163. func (lc *Closer) HasBeenClosed() <-chan struct{} {
  164. return lc.closed
  165. }
  166. // Done calls Done() on the WaitGroup.
  167. func (lc *Closer) Done() {
  168. lc.waiting.Done()
  169. }
  170. // Wait waits on the WaitGroup. (It waits for NewCloser's initial value, AddRunning, and Done
  171. // calls to balance out.)
  172. func (lc *Closer) Wait() {
  173. lc.waiting.Wait()
  174. }
  175. // SignalAndWait calls Signal(), then Wait().
  176. func (lc *Closer) SignalAndWait() {
  177. lc.Signal()
  178. lc.Wait()
  179. }