12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- package workpool
- import (
- "errors"
- "runtime"
- "sync/atomic"
- )
- // ringBuffer .
- type ringBuffer struct {
- capacity uint64
- mask uint64
- padding1 [7]uint64
- lastCommintIdx uint64
- padding2 [7]uint64
- nextFreeIdx uint64
- padding3 [7]uint64
- readerIdx uint64
- padding4 [7]uint64
- slots []*worker
- }
- // newRingBuffer .
- func newRingBuffer(c uint64) (*ringBuffer, error) {
- if c == 0 || c&3 != 0 {
- return nil, errors.New("capacity must be N power of 2")
- }
- return &ringBuffer{
- lastCommintIdx: 0,
- nextFreeIdx: 1,
- readerIdx: 0,
- capacity: c,
- mask: c - 1,
- slots: make([]*worker, c),
- }, nil
- }
- // push .
- func (r *ringBuffer) push(w *worker) error {
- var head, tail, next uint64
- for {
- head = r.nextFreeIdx
- tail = r.readerIdx
- if (head > tail+r.capacity-2) || (head < tail-1) {
- return errors.New("buffer is full")
- }
- next = (head + 1) & r.mask
- if atomic.CompareAndSwapUint64(&r.nextFreeIdx, head, next) {
- break
- }
- runtime.Gosched()
- }
- r.slots[head] = w
- for !atomic.CompareAndSwapUint64(&r.lastCommintIdx, head-1, head) {
- runtime.Gosched()
- }
- return nil
- }
- // pop .
- func (r *ringBuffer) pop() *worker {
- var head, next uint64
- for {
- head = r.readerIdx
- if head == r.lastCommintIdx {
- return r.slots[head]
- }
- next = (head + 1) & r.mask
- if atomic.CompareAndSwapUint64(&r.readerIdx, head, next) {
- break
- }
- runtime.Gosched()
- }
- return r.slots[head]
- }
- // size .
- func (r *ringBuffer) size() uint64 {
- return r.lastCommintIdx - r.readerIdx
- }
|