bucket.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package diskqueue
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "os"
  10. "sync"
  11. )
  12. const (
  13. _blockByte int32 = 512
  14. _lenByte int32 = 2
  15. _dataByte = _blockByte - _lenByte
  16. )
  17. var errBucketFull = errors.New("bucket is full or not enough")
  18. var fullHeader = []byte{1, 254}
  19. var nextHeader = []byte{1, 255}
  20. var magicHeader = []byte{'D', 'Q'}
  21. type memBucketPool struct {
  22. cap int32
  23. pool sync.Pool
  24. }
  25. func newMemBucketPool(bucketByte int32) *memBucketPool {
  26. return &memBucketPool{
  27. pool: sync.Pool{New: func() interface{} {
  28. return make([]byte, bucketByte)
  29. }},
  30. cap: bucketByte / _blockByte,
  31. }
  32. }
  33. func (m *memBucketPool) new() *memBucket {
  34. data := m.pool.Get().([]byte)
  35. return &memBucket{data: data, cap: m.cap}
  36. }
  37. func (m *memBucketPool) free(bucket *memBucket) {
  38. m.pool.Put(bucket.data)
  39. }
  40. type memBucket struct {
  41. sync.Mutex
  42. cap int32
  43. readAt int32
  44. writeAt int32
  45. data []byte
  46. }
  47. func (m *memBucket) push(p []byte) error {
  48. m.Lock()
  49. defer m.Unlock()
  50. length := int32(len(p))
  51. if length > _dataByte*(m.cap-m.writeAt) {
  52. return errBucketFull
  53. }
  54. // if p length < blockbyte write it direct
  55. if length < _dataByte {
  56. ds := m.writeAt * _blockByte
  57. binary.BigEndian.PutUint16(m.data[ds:], uint16(length))
  58. copy(m.data[ds+_lenByte:], p)
  59. m.writeAt++
  60. return nil
  61. }
  62. // loop write block
  63. blocks := length / _dataByte
  64. re := length % _dataByte
  65. var i int32
  66. for i = 0; i < blocks-1; i++ {
  67. ds := m.writeAt * _blockByte
  68. copy(m.data[ds:], nextHeader)
  69. ps := i * _dataByte
  70. copy(m.data[ds+_lenByte:], p[ps:ps+_dataByte])
  71. m.writeAt++
  72. }
  73. var nh []byte
  74. if re == 0 {
  75. nh = fullHeader
  76. } else {
  77. nh = nextHeader
  78. }
  79. ds := m.writeAt * _blockByte
  80. copy(m.data[ds:], nh)
  81. ps := (blocks - 1) * _dataByte
  82. copy(m.data[ds+_lenByte:], p[ps:ps+_dataByte])
  83. m.writeAt++
  84. if re != 0 {
  85. ds := m.writeAt * _blockByte
  86. binary.BigEndian.PutUint16(m.data[ds:], uint16(re))
  87. copy(m.data[ds+_lenByte:], p[blocks*_dataByte:])
  88. m.writeAt++
  89. }
  90. return nil
  91. }
  92. func (m *memBucket) pop() ([]byte, error) {
  93. m.Lock()
  94. defer m.Unlock()
  95. if m.readAt >= m.writeAt {
  96. return nil, io.EOF
  97. }
  98. ret := make([]byte, 0, _blockByte)
  99. for m.readAt < m.writeAt {
  100. ds := m.readAt * _blockByte
  101. m.readAt++
  102. l := int32(binary.BigEndian.Uint16(m.data[ds : ds+_lenByte]))
  103. if l <= _dataByte {
  104. ret = append(ret, m.data[ds+_lenByte:ds+_lenByte+l]...)
  105. break
  106. }
  107. ret = append(ret, m.data[ds+_lenByte:ds+_blockByte]...)
  108. }
  109. return ret, nil
  110. }
  111. func (m *memBucket) dump(w io.Writer) (int, error) {
  112. header := make([]byte, 10)
  113. copy(header, magicHeader)
  114. binary.BigEndian.PutUint32(header[2:6], uint32(m.readAt))
  115. binary.BigEndian.PutUint32(header[6:10], uint32(m.writeAt))
  116. n1, err := w.Write(header)
  117. if err != nil {
  118. return n1, err
  119. }
  120. n2, err := w.Write(m.data[:m.writeAt*_blockByte])
  121. return n1 + n2, err
  122. }
  123. func newFileBucket(fpath string) (*fileBucket, error) {
  124. fp, err := os.Open(fpath)
  125. if err != nil {
  126. return nil, err
  127. }
  128. header := make([]byte, 10)
  129. n, err := fp.Read(header)
  130. if err != nil {
  131. return nil, err
  132. }
  133. if n != 10 {
  134. return nil, fmt.Errorf("expect read 10 byte header get: %d", n)
  135. }
  136. if !bytes.Equal(header[:2], magicHeader) {
  137. return nil, fmt.Errorf("invalid magic %s", header[:2])
  138. }
  139. readAt := int32(binary.BigEndian.Uint32(header[2:6]))
  140. writeAt := int32(binary.BigEndian.Uint32(header[6:10]))
  141. if _, err = fp.Seek(int64(readAt*_blockByte), os.SEEK_CUR); err != nil {
  142. return nil, err
  143. }
  144. return &fileBucket{
  145. fp: fp,
  146. readAt: readAt,
  147. writeAt: writeAt,
  148. bufRd: bufio.NewReader(fp),
  149. }, nil
  150. }
  151. type fileBucket struct {
  152. sync.Mutex
  153. fp *os.File
  154. readAt int32
  155. writeAt int32
  156. bufRd *bufio.Reader
  157. }
  158. func (f *fileBucket) pop() ([]byte, error) {
  159. f.Lock()
  160. defer f.Unlock()
  161. if f.readAt >= f.writeAt {
  162. return nil, io.EOF
  163. }
  164. ret := make([]byte, 0, _blockByte)
  165. block := make([]byte, _blockByte)
  166. for f.readAt < f.writeAt {
  167. n, err := f.bufRd.Read(block)
  168. if err != nil {
  169. return nil, err
  170. }
  171. if int32(n) != _blockByte {
  172. return nil, fmt.Errorf("expect read %d byte data get %d", _blockByte, n)
  173. }
  174. l := int32(binary.BigEndian.Uint16(block[:2]))
  175. if l <= _dataByte {
  176. ret = append(ret, block[2:2+l]...)
  177. break
  178. }
  179. ret = append(ret, block[2:_blockByte]...)
  180. }
  181. return ret, nil
  182. }
  183. func (f *fileBucket) close() error {
  184. return f.fp.Close()
  185. }