list.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package pool
  2. import (
  3. "container/list"
  4. "context"
  5. "io"
  6. "sync"
  7. "time"
  8. )
  9. var _ Pool = &List{}
  10. // List .
  11. type List struct {
  12. // New is an application supplied function for creating and configuring a
  13. // item.
  14. //
  15. // The item returned from new must not be in a special state
  16. // (subscribed to pubsub channel, transaction started, ...).
  17. New func(ctx context.Context) (io.Closer, error)
  18. // mu protects fields defined below.
  19. mu sync.Mutex
  20. cond chan struct{}
  21. closed bool
  22. active int
  23. // clean stale items
  24. cleanerCh chan struct{}
  25. // Stack of item with most recently used at the front.
  26. idles list.List
  27. // Config pool configuration
  28. conf *Config
  29. }
  30. // NewList creates a new pool.
  31. func NewList(c *Config) *List {
  32. // check Config
  33. if c == nil || c.Active < c.Idle {
  34. panic("config nil or Idle Must <= Active")
  35. }
  36. // new pool
  37. p := &List{conf: c}
  38. p.cond = make(chan struct{})
  39. p.startCleanerLocked(time.Duration(c.IdleTimeout))
  40. return p
  41. }
  42. // Reload reload config.
  43. func (p *List) Reload(c *Config) error {
  44. p.mu.Lock()
  45. p.startCleanerLocked(time.Duration(c.IdleTimeout))
  46. p.conf = c
  47. p.mu.Unlock()
  48. return nil
  49. }
  50. // startCleanerLocked
  51. func (p *List) startCleanerLocked(d time.Duration) {
  52. if d <= 0 {
  53. // if set 0, staleCleaner() will return directly
  54. return
  55. }
  56. if d < time.Duration(p.conf.IdleTimeout) && p.cleanerCh != nil {
  57. select {
  58. case p.cleanerCh <- struct{}{}:
  59. default:
  60. }
  61. }
  62. // run only one, clean stale items.
  63. if p.cleanerCh == nil {
  64. p.cleanerCh = make(chan struct{}, 1)
  65. go p.staleCleaner()
  66. }
  67. }
  68. // staleCleaner clean stale items proc.
  69. func (p *List) staleCleaner() {
  70. ticker := time.NewTicker(100 * time.Millisecond)
  71. for {
  72. select {
  73. case <-ticker.C:
  74. case <-p.cleanerCh: // maxLifetime was changed or db was closed.
  75. }
  76. p.mu.Lock()
  77. if p.closed || p.conf.IdleTimeout <= 0 {
  78. p.mu.Unlock()
  79. return
  80. }
  81. for i, n := 0, p.idles.Len(); i < n; i++ {
  82. e := p.idles.Back()
  83. if e == nil {
  84. // no possible
  85. break
  86. }
  87. ic := e.Value.(item)
  88. if !ic.expired(time.Duration(p.conf.IdleTimeout)) {
  89. // not need continue.
  90. break
  91. }
  92. p.idles.Remove(e)
  93. p.release()
  94. p.mu.Unlock()
  95. ic.c.Close()
  96. p.mu.Lock()
  97. }
  98. p.mu.Unlock()
  99. }
  100. }
  101. // Get returns a item from the idles List or
  102. // get a new item.
  103. func (p *List) Get(ctx context.Context) (io.Closer, error) {
  104. p.mu.Lock()
  105. if p.closed {
  106. p.mu.Unlock()
  107. return nil, ErrPoolClosed
  108. }
  109. for {
  110. // get idles item.
  111. for i, n := 0, p.idles.Len(); i < n; i++ {
  112. e := p.idles.Front()
  113. if e == nil {
  114. break
  115. }
  116. ic := e.Value.(item)
  117. p.idles.Remove(e)
  118. p.mu.Unlock()
  119. if !ic.expired(time.Duration(p.conf.IdleTimeout)) {
  120. return ic.c, nil
  121. }
  122. ic.c.Close()
  123. p.mu.Lock()
  124. p.release()
  125. }
  126. // Check for pool closed before dialing a new item.
  127. if p.closed {
  128. p.mu.Unlock()
  129. return nil, ErrPoolClosed
  130. }
  131. // new item if under limit.
  132. if p.conf.Active == 0 || p.active < p.conf.Active {
  133. newItem := p.New
  134. p.active++
  135. p.mu.Unlock()
  136. c, err := newItem(ctx)
  137. if err != nil {
  138. p.mu.Lock()
  139. p.release()
  140. p.mu.Unlock()
  141. c = nil
  142. }
  143. return c, err
  144. }
  145. if p.conf.WaitTimeout == 0 && !p.conf.Wait {
  146. p.mu.Unlock()
  147. return nil, ErrPoolExhausted
  148. }
  149. wt := p.conf.WaitTimeout
  150. p.mu.Unlock()
  151. // slowpath: reset context timeout
  152. nctx := ctx
  153. cancel := func() {}
  154. if wt > 0 {
  155. _, nctx, cancel = wt.Shrink(ctx)
  156. }
  157. select {
  158. case <-nctx.Done():
  159. cancel()
  160. return nil, nctx.Err()
  161. case <-p.cond:
  162. }
  163. cancel()
  164. p.mu.Lock()
  165. }
  166. }
  167. // Put put item into pool.
  168. func (p *List) Put(ctx context.Context, c io.Closer, forceClose bool) error {
  169. p.mu.Lock()
  170. if !p.closed && !forceClose {
  171. p.idles.PushFront(item{createdAt: nowFunc(), c: c})
  172. if p.idles.Len() > p.conf.Idle {
  173. c = p.idles.Remove(p.idles.Back()).(item).c
  174. } else {
  175. c = nil
  176. }
  177. }
  178. if c == nil {
  179. p.signal()
  180. p.mu.Unlock()
  181. return nil
  182. }
  183. p.release()
  184. p.mu.Unlock()
  185. return c.Close()
  186. }
  187. // Close releases the resources used by the pool.
  188. func (p *List) Close() error {
  189. p.mu.Lock()
  190. idles := p.idles
  191. p.idles.Init()
  192. p.closed = true
  193. p.active -= idles.Len()
  194. p.mu.Unlock()
  195. for e := idles.Front(); e != nil; e = e.Next() {
  196. e.Value.(item).c.Close()
  197. }
  198. return nil
  199. }
  200. // release decrements the active count and signals waiters. The caller must
  201. // hold p.mu during the call.
  202. func (p *List) release() {
  203. p.active--
  204. p.signal()
  205. }
  206. func (p *List) signal() {
  207. select {
  208. default:
  209. case p.cond <- struct{}{}:
  210. }
  211. }