123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- package grpool
- import "sync"
- // Gorouting instance which can accept client jobs
- type worker struct {
- workerPool chan *worker
- jobChannel chan Job
- stop chan struct{}
- }
- func (w *worker) start() {
- go func() {
- var job Job
- for {
- // worker free, add it to pool
- w.workerPool <- w
- select {
- case job = <-w.jobChannel:
- job()
- case <-w.stop:
- w.stop <- struct{}{}
- return
- }
- }
- }()
- }
- func newWorker(pool chan *worker) *worker {
- return &worker{
- workerPool: pool,
- jobChannel: make(chan Job),
- stop: make(chan struct{}),
- }
- }
- // Accepts jobs from clients, and waits for first free worker to deliver job
- type dispatcher struct {
- workerPool chan *worker
- jobQueue chan Job
- stop chan struct{}
- }
- func (d *dispatcher) dispatch() {
- for {
- select {
- case job := <-d.jobQueue:
- worker := <-d.workerPool
- worker.jobChannel <- job
- case <-d.stop:
- for i := 0; i < cap(d.workerPool); i++ {
- worker := <-d.workerPool
- worker.stop <- struct{}{}
- <-worker.stop
- }
- d.stop <- struct{}{}
- return
- }
- }
- }
- func newDispatcher(workerPool chan *worker, jobQueue chan Job) *dispatcher {
- d := &dispatcher{
- workerPool: workerPool,
- jobQueue: jobQueue,
- stop: make(chan struct{}),
- }
- for i := 0; i < cap(d.workerPool); i++ {
- worker := newWorker(d.workerPool)
- worker.start()
- }
- go d.dispatch()
- return d
- }
- // Represents user request, function which should be executed in some worker.
- type Job func()
- type Pool struct {
- JobQueue chan Job
- dispatcher *dispatcher
- wg sync.WaitGroup
- }
- // Will make pool of gorouting workers.
- // numWorkers - how many workers will be created for this pool
- // queueLen - how many jobs can we accept until we block
- //
- // Returned object contains JobQueue reference, which you can use to send job to pool.
- func NewPool(numWorkers int, jobQueueLen int) *Pool {
- jobQueue := make(chan Job, jobQueueLen)
- workerPool := make(chan *worker, numWorkers)
- pool := &Pool{
- JobQueue: jobQueue,
- dispatcher: newDispatcher(workerPool, jobQueue),
- }
- return pool
- }
- // In case you are using WaitAll fn, you should call this method
- // every time your job is done.
- //
- // If you are not using WaitAll then we assume you have your own way of synchronizing.
- func (p *Pool) JobDone() {
- p.wg.Done()
- }
- // How many jobs we should wait when calling WaitAll.
- // It is using WaitGroup Add/Done/Wait
- func (p *Pool) WaitCount(count int) {
- p.wg.Add(count)
- }
- // Will wait for all jobs to finish.
- func (p *Pool) WaitAll() {
- p.wg.Wait()
- }
- // Will release resources used by pool
- func (p *Pool) Release() {
- p.dispatcher.stop <- struct{}{}
- <-p.dispatcher.stop
- }
|