grpool.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package grpool
  2. import "sync"
  3. // Gorouting instance which can accept client jobs
  4. type worker struct {
  5. workerPool chan *worker
  6. jobChannel chan Job
  7. stop chan struct{}
  8. }
  9. func (w *worker) start() {
  10. go func() {
  11. var job Job
  12. for {
  13. // worker free, add it to pool
  14. w.workerPool <- w
  15. select {
  16. case job = <-w.jobChannel:
  17. job()
  18. case <-w.stop:
  19. w.stop <- struct{}{}
  20. return
  21. }
  22. }
  23. }()
  24. }
  25. func newWorker(pool chan *worker) *worker {
  26. return &worker{
  27. workerPool: pool,
  28. jobChannel: make(chan Job),
  29. stop: make(chan struct{}),
  30. }
  31. }
  32. // Accepts jobs from clients, and waits for first free worker to deliver job
  33. type dispatcher struct {
  34. workerPool chan *worker
  35. jobQueue chan Job
  36. stop chan struct{}
  37. }
  38. func (d *dispatcher) dispatch() {
  39. for {
  40. select {
  41. case job := <-d.jobQueue:
  42. worker := <-d.workerPool
  43. worker.jobChannel <- job
  44. case <-d.stop:
  45. for i := 0; i < cap(d.workerPool); i++ {
  46. worker := <-d.workerPool
  47. worker.stop <- struct{}{}
  48. <-worker.stop
  49. }
  50. d.stop <- struct{}{}
  51. return
  52. }
  53. }
  54. }
  55. func newDispatcher(workerPool chan *worker, jobQueue chan Job) *dispatcher {
  56. d := &dispatcher{
  57. workerPool: workerPool,
  58. jobQueue: jobQueue,
  59. stop: make(chan struct{}),
  60. }
  61. for i := 0; i < cap(d.workerPool); i++ {
  62. worker := newWorker(d.workerPool)
  63. worker.start()
  64. }
  65. go d.dispatch()
  66. return d
  67. }
  68. // Represents user request, function which should be executed in some worker.
  69. type Job func()
  70. type Pool struct {
  71. JobQueue chan Job
  72. dispatcher *dispatcher
  73. wg sync.WaitGroup
  74. }
  75. // Will make pool of gorouting workers.
  76. // numWorkers - how many workers will be created for this pool
  77. // queueLen - how many jobs can we accept until we block
  78. //
  79. // Returned object contains JobQueue reference, which you can use to send job to pool.
  80. func NewPool(numWorkers int, jobQueueLen int) *Pool {
  81. jobQueue := make(chan Job, jobQueueLen)
  82. workerPool := make(chan *worker, numWorkers)
  83. pool := &Pool{
  84. JobQueue: jobQueue,
  85. dispatcher: newDispatcher(workerPool, jobQueue),
  86. }
  87. return pool
  88. }
  89. // In case you are using WaitAll fn, you should call this method
  90. // every time your job is done.
  91. //
  92. // If you are not using WaitAll then we assume you have your own way of synchronizing.
  93. func (p *Pool) JobDone() {
  94. p.wg.Done()
  95. }
  96. // How many jobs we should wait when calling WaitAll.
  97. // It is using WaitGroup Add/Done/Wait
  98. func (p *Pool) WaitCount(count int) {
  99. p.wg.Add(count)
  100. }
  101. // Will wait for all jobs to finish.
  102. func (p *Pool) WaitAll() {
  103. p.wg.Wait()
  104. }
  105. // Will release resources used by pool
  106. func (p *Pool) Release() {
  107. p.dispatcher.stop <- struct{}{}
  108. <-p.dispatcher.stop
  109. }