buffer.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package workpool
  2. import (
  3. "errors"
  4. "runtime"
  5. "sync/atomic"
  6. )
  7. // ringBuffer .
  8. type ringBuffer struct {
  9. capacity uint64
  10. mask uint64
  11. padding1 [7]uint64
  12. lastCommintIdx uint64
  13. padding2 [7]uint64
  14. nextFreeIdx uint64
  15. padding3 [7]uint64
  16. readerIdx uint64
  17. padding4 [7]uint64
  18. slots []*worker
  19. }
  20. // newRingBuffer .
  21. func newRingBuffer(c uint64) (*ringBuffer, error) {
  22. if c == 0 || c&3 != 0 {
  23. return nil, errors.New("capacity must be N power of 2")
  24. }
  25. return &ringBuffer{
  26. lastCommintIdx: 0,
  27. nextFreeIdx: 1,
  28. readerIdx: 0,
  29. capacity: c,
  30. mask: c - 1,
  31. slots: make([]*worker, c),
  32. }, nil
  33. }
  34. // push .
  35. func (r *ringBuffer) push(w *worker) error {
  36. var head, tail, next uint64
  37. for {
  38. head = r.nextFreeIdx
  39. tail = r.readerIdx
  40. if (head > tail+r.capacity-2) || (head < tail-1) {
  41. return errors.New("buffer is full")
  42. }
  43. next = (head + 1) & r.mask
  44. if atomic.CompareAndSwapUint64(&r.nextFreeIdx, head, next) {
  45. break
  46. }
  47. runtime.Gosched()
  48. }
  49. r.slots[head] = w
  50. for !atomic.CompareAndSwapUint64(&r.lastCommintIdx, head-1, head) {
  51. runtime.Gosched()
  52. }
  53. return nil
  54. }
  55. // pop .
  56. func (r *ringBuffer) pop() *worker {
  57. var head, next uint64
  58. for {
  59. head = r.readerIdx
  60. if head == r.lastCommintIdx {
  61. return r.slots[head]
  62. }
  63. next = (head + 1) & r.mask
  64. if atomic.CompareAndSwapUint64(&r.readerIdx, head, next) {
  65. break
  66. }
  67. runtime.Gosched()
  68. }
  69. return r.slots[head]
  70. }
  71. // size .
  72. func (r *ringBuffer) size() uint64 {
  73. return r.lastCommintIdx - r.readerIdx
  74. }