pool.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package worker
  2. import (
  3. "fmt"
  4. "runtime"
  5. "sync"
  6. "time"
  7. "go-common/library/log"
  8. )
  9. const (
  10. _ratio = float32(0.8)
  11. )
  12. var (
  13. _default = &Conf{
  14. QueueSize: 1024,
  15. WorkerProcMax: 32,
  16. WorkerNumber: runtime.NumCPU() - 1,
  17. }
  18. )
  19. // Conf .
  20. type Conf struct {
  21. QueueSize int
  22. WorkerProcMax int
  23. WorkerNumber int
  24. }
  25. // Pool .
  26. type Pool struct {
  27. c *Conf
  28. queue chan func()
  29. workerNumber int
  30. close chan struct{}
  31. wg sync.WaitGroup
  32. }
  33. // New .
  34. func New(conf *Conf) (w *Pool) {
  35. if conf == nil {
  36. conf = _default
  37. }
  38. w = &Pool{
  39. c: conf,
  40. queue: make(chan func(), conf.QueueSize),
  41. workerNumber: conf.WorkerNumber,
  42. close: make(chan struct{}),
  43. }
  44. w.start()
  45. go w.moni()
  46. return
  47. }
  48. func (w *Pool) start() {
  49. for i := 0; i < w.workerNumber; i++ {
  50. w.wg.Add(1)
  51. go w.workerRoutine()
  52. }
  53. }
  54. func (w *Pool) moni() {
  55. var conf = w.c
  56. for {
  57. time.Sleep(time.Second * 5)
  58. var ratio = float32(len(w.queue)) / float32(conf.QueueSize)
  59. if ratio >= _ratio {
  60. if w.workerNumber >= conf.WorkerProcMax {
  61. log.Warn("work thread more than max(%d)", conf.WorkerProcMax)
  62. return
  63. }
  64. var next = minInt(w.workerNumber<<1, w.c.WorkerProcMax)
  65. var diff = next - w.workerNumber
  66. log.Info("current thread count=%d, queue ratio=%f, create new thread number=(%d)", w.workerNumber, ratio, diff)
  67. for i := 0; i < diff; i++ {
  68. w.wg.Add(1)
  69. go w.workerRoutine()
  70. }
  71. w.workerNumber = next
  72. }
  73. }
  74. }
  75. // Close .
  76. func (w *Pool) Close() {
  77. close(w.close)
  78. }
  79. // Wait .
  80. func (w *Pool) Wait() {
  81. w.wg.Wait()
  82. }
  83. func (w *Pool) workerRoutine() {
  84. defer func() {
  85. w.wg.Done()
  86. if x := recover(); x != nil {
  87. const size = 64 << 10
  88. buf := make([]byte, size)
  89. buf = buf[:runtime.Stack(buf, false)]
  90. log.Error("w.workerRoutine panic(%+v) :\n %s", x, buf)
  91. w.wg.Add(1)
  92. go w.workerRoutine()
  93. }
  94. }()
  95. loop:
  96. for {
  97. select {
  98. case f := <-w.queue:
  99. f()
  100. case <-w.close:
  101. log.Info("workerRoutine close()")
  102. break loop
  103. }
  104. }
  105. for f := range w.queue {
  106. f()
  107. }
  108. }
  109. // Add .
  110. func (w *Pool) Add(f func()) error {
  111. select {
  112. case w.queue <- f:
  113. default:
  114. return fmt.Errorf("task channel is full")
  115. }
  116. return nil
  117. }
  118. func minInt(a, b int) int {
  119. if a < b {
  120. return a
  121. }
  122. return b
  123. }