pool.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package workpool
  2. import (
  3. "errors"
  4. "runtime"
  5. "sync"
  6. "time"
  7. )
  8. const (
  9. stateCreate = 0
  10. stateRunning = 1
  11. stateStopping = 2
  12. stateShutdown = 3
  13. )
  14. // PoolConfig .
  15. type PoolConfig struct {
  16. MaxWorkers uint64
  17. MaxIdleWorkers uint64
  18. MinIdleWorkers uint64
  19. KeepAlive time.Duration
  20. }
  21. // Pool .
  22. type Pool struct {
  23. conf *PoolConfig
  24. padding1 [8]uint64
  25. ready *ringBuffer
  26. curWorkers uint64
  27. padding2 [8]uint64
  28. lock sync.Mutex
  29. state uint8
  30. stop chan uint8
  31. }
  32. // worker .
  33. type worker struct {
  34. id uint64
  35. lastUseTime time.Time
  36. ftch chan *FutureTask
  37. }
  38. var wChanCap = func() int {
  39. // Use blocking worker if GOMAXPROCS=1.
  40. // This immediately switches Serve to WorkerFunc, which results
  41. // in higher performance (under go1.5 at least).
  42. if runtime.GOMAXPROCS(0) == 1 {
  43. return 0
  44. }
  45. // Use non-blocking worker if GOMAXPROCS>1,
  46. // since otherwise the Serve caller (Acceptor) may lag accepting
  47. // new task if WorkerFunc is CPU-bound.
  48. return 1
  49. }()
  50. func newWorker(wid uint64) *worker {
  51. return &worker{
  52. id: wid,
  53. lastUseTime: time.Now(),
  54. ftch: make(chan *FutureTask, wChanCap),
  55. }
  56. }
  57. // NewWorkerPool .
  58. func NewWorkerPool(capacity uint64, conf *PoolConfig) (p *Pool, err error) {
  59. if capacity == 0 || capacity&3 != 0 {
  60. err = errors.New("capacity must bigger than zero and N power of 2")
  61. return
  62. }
  63. rb, err := newRingBuffer(capacity)
  64. if err != nil {
  65. return
  66. }
  67. p = &Pool{
  68. conf: conf,
  69. ready: rb,
  70. curWorkers: 0,
  71. state: stateCreate,
  72. stop: make(chan uint8, 1),
  73. }
  74. return
  75. }
  76. func (p *Pool) changeState(old, new uint8) bool {
  77. p.lock.Lock()
  78. defer p.lock.Unlock()
  79. if p.state != old {
  80. return false
  81. }
  82. p.state = new
  83. return true
  84. }
  85. // Start .
  86. func (p *Pool) Start() error {
  87. if !p.changeState(stateCreate, stateRunning) {
  88. return errors.New("workerpool already started")
  89. }
  90. go func() {
  91. defer close(p.stop)
  92. for {
  93. p.clean()
  94. select {
  95. case <-p.stop:
  96. p.cleanAll()
  97. for !p.changeState(stateStopping, stateShutdown) {
  98. runtime.Gosched()
  99. }
  100. return
  101. default:
  102. time.Sleep(p.conf.KeepAlive)
  103. }
  104. }
  105. }()
  106. return nil
  107. }
  108. // Stop .
  109. func (p *Pool) Stop() error {
  110. if !p.changeState(stateRunning, stateStopping) {
  111. return errors.New("workerpool is stopping")
  112. }
  113. p.stop <- stateStopping
  114. return nil
  115. }
  116. // Submit .
  117. func (p *Pool) Submit(ft *FutureTask) error {
  118. w, err := p.getReadyWorker()
  119. if err != nil {
  120. return err
  121. }
  122. w.ftch <- ft
  123. return nil
  124. }
  125. // getReadyWorker .
  126. func (p *Pool) getReadyWorker() (w *worker, err error) {
  127. w = p.ready.pop()
  128. if w == nil {
  129. p.lock.Lock()
  130. workerID := p.curWorkers
  131. if p.curWorkers >= p.conf.MaxWorkers {
  132. err = errors.New("workerpool is full")
  133. p.lock.Unlock()
  134. return
  135. }
  136. p.curWorkers++
  137. p.lock.Unlock()
  138. w = newWorker(workerID)
  139. go func(w *worker) {
  140. for {
  141. ft, ok := <-w.ftch
  142. if !ok {
  143. return
  144. }
  145. ft.out <- ft.T.Run()
  146. p.release(w)
  147. }
  148. }(w)
  149. }
  150. return
  151. }
  152. // close worker
  153. func (p *Pool) close(w *worker) {
  154. p.lock.Lock()
  155. defer p.lock.Unlock()
  156. if p.curWorkers > 0 {
  157. p.curWorkers--
  158. }
  159. close(w.ftch)
  160. }
  161. // release worker
  162. func (p *Pool) release(w *worker) {
  163. if p.state > stateRunning {
  164. p.close(w)
  165. return
  166. }
  167. w.lastUseTime = time.Now()
  168. if err := p.ready.push(w); err != nil {
  169. p.close(w)
  170. }
  171. }
  172. // clean: clean idle goroutine
  173. func (p *Pool) clean() {
  174. for {
  175. size := p.ready.size()
  176. if size <= p.conf.MinIdleWorkers {
  177. return
  178. }
  179. w := p.ready.pop()
  180. if w == nil {
  181. return
  182. }
  183. currentTime := time.Now()
  184. if currentTime.Sub(w.lastUseTime) < p.conf.KeepAlive {
  185. p.release(w)
  186. return
  187. }
  188. p.close(w)
  189. }
  190. }
  191. // cleanAll
  192. func (p *Pool) cleanAll() {
  193. for {
  194. w := p.ready.pop()
  195. if w == nil {
  196. return
  197. }
  198. p.release(w)
  199. }
  200. }