slice.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package pool
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "time"
  7. )
  8. var _ Pool = &Slice{}
  9. // Slice .
  10. type Slice struct {
  11. // New is an application supplied function for creating and configuring a
  12. // item.
  13. //
  14. // The item returned from new must not be in a special state
  15. // (subscribed to pubsub channel, transaction started, ...).
  16. New func(ctx context.Context) (io.Closer, error)
  17. stop func() // stop cancels the item opener.
  18. // mu protects fields defined below.
  19. mu sync.Mutex
  20. freeItem []*item
  21. itemRequests map[uint64]chan item
  22. nextRequest uint64 // Next key to use in itemRequests.
  23. active int // number of opened and pending open items
  24. // Used to signal the need for new items
  25. // a goroutine running itemOpener() reads on this chan and
  26. // maybeOpenNewItems sends on the chan (one send per needed item)
  27. // It is closed during db.Close(). The close tells the itemOpener
  28. // goroutine to exit.
  29. openerCh chan struct{}
  30. closed bool
  31. cleanerCh chan struct{}
  32. // Config pool configuration
  33. conf *Config
  34. }
  35. // NewSlice creates a new pool.
  36. func NewSlice(c *Config) *Slice {
  37. // check Config
  38. if c == nil || c.Active < c.Idle {
  39. panic("config nil or Idle Must <= Active")
  40. }
  41. ctx, cancel := context.WithCancel(context.Background())
  42. // new pool
  43. p := &Slice{
  44. conf: c,
  45. stop: cancel,
  46. itemRequests: make(map[uint64]chan item),
  47. openerCh: make(chan struct{}, 1000000),
  48. }
  49. p.startCleanerLocked(time.Duration(c.IdleTimeout))
  50. go p.itemOpener(ctx)
  51. return p
  52. }
  53. // Reload reload config.
  54. func (p *Slice) Reload(c *Config) error {
  55. p.mu.Lock()
  56. p.startCleanerLocked(time.Duration(c.IdleTimeout))
  57. p.setActive(c.Active)
  58. p.setIdle(c.Idle)
  59. p.conf = c
  60. p.mu.Unlock()
  61. return nil
  62. }
  63. // Get returns a newly-opened or cached *item.
  64. func (p *Slice) Get(ctx context.Context) (io.Closer, error) {
  65. p.mu.Lock()
  66. if p.closed {
  67. p.mu.Unlock()
  68. return nil, ErrPoolClosed
  69. }
  70. idleTimeout := time.Duration(p.conf.IdleTimeout)
  71. // Prefer a free item, if possible.
  72. numFree := len(p.freeItem)
  73. for numFree > 0 {
  74. i := p.freeItem[0]
  75. copy(p.freeItem, p.freeItem[1:])
  76. p.freeItem = p.freeItem[:numFree-1]
  77. p.mu.Unlock()
  78. if i.expired(idleTimeout) {
  79. i.close()
  80. p.mu.Lock()
  81. p.release()
  82. } else {
  83. return i.c, nil
  84. }
  85. numFree = len(p.freeItem)
  86. }
  87. // Out of free items or we were asked not to use one. If we're not
  88. // allowed to open any more items, make a request and wait.
  89. if p.conf.Active > 0 && p.active >= p.conf.Active {
  90. // check WaitTimeout and return directly
  91. if p.conf.WaitTimeout == 0 && !p.conf.Wait {
  92. p.mu.Unlock()
  93. return nil, ErrPoolExhausted
  94. }
  95. // Make the item channel. It's buffered so that the
  96. // itemOpener doesn't block while waiting for the req to be read.
  97. req := make(chan item, 1)
  98. reqKey := p.nextRequestKeyLocked()
  99. p.itemRequests[reqKey] = req
  100. wt := p.conf.WaitTimeout
  101. p.mu.Unlock()
  102. // reset context timeout
  103. if wt > 0 {
  104. var cancel func()
  105. _, ctx, cancel = wt.Shrink(ctx)
  106. defer cancel()
  107. }
  108. // Timeout the item request with the context.
  109. select {
  110. case <-ctx.Done():
  111. // Remove the item request and ensure no value has been sent
  112. // on it after removing.
  113. p.mu.Lock()
  114. delete(p.itemRequests, reqKey)
  115. p.mu.Unlock()
  116. return nil, ctx.Err()
  117. case ret, ok := <-req:
  118. if !ok {
  119. return nil, ErrPoolClosed
  120. }
  121. if ret.expired(idleTimeout) {
  122. ret.close()
  123. p.mu.Lock()
  124. p.release()
  125. } else {
  126. return ret.c, nil
  127. }
  128. }
  129. }
  130. p.active++ // optimistically
  131. p.mu.Unlock()
  132. c, err := p.New(ctx)
  133. if err != nil {
  134. p.mu.Lock()
  135. p.release()
  136. p.mu.Unlock()
  137. return nil, err
  138. }
  139. return c, nil
  140. }
  141. // Put adds a item to the p's free pool.
  142. // err is optionally the last error that occurred on this item.
  143. func (p *Slice) Put(ctx context.Context, c io.Closer, forceClose bool) error {
  144. p.mu.Lock()
  145. defer p.mu.Unlock()
  146. if forceClose {
  147. p.release()
  148. return c.Close()
  149. }
  150. added := p.putItemLocked(c)
  151. if !added {
  152. p.active--
  153. return c.Close()
  154. }
  155. return nil
  156. }
  157. // Satisfy a item or put the item in the idle pool and return true
  158. // or return false.
  159. // putItemLocked will satisfy a item if there is one, or it will
  160. // return the *item to the freeItem list if err == nil and the idle
  161. // item limit will not be exceeded.
  162. // If err != nil, the value of i is ignored.
  163. // If err == nil, then i must not equal nil.
  164. // If a item was fulfilled or the *item was placed in the
  165. // freeItem list, then true is returned, otherwise false is returned.
  166. func (p *Slice) putItemLocked(c io.Closer) bool {
  167. if p.closed {
  168. return false
  169. }
  170. if p.conf.Active > 0 && p.active > p.conf.Active {
  171. return false
  172. }
  173. i := item{
  174. c: c,
  175. createdAt: nowFunc(),
  176. }
  177. if l := len(p.itemRequests); l > 0 {
  178. var req chan item
  179. var reqKey uint64
  180. for reqKey, req = range p.itemRequests {
  181. break
  182. }
  183. delete(p.itemRequests, reqKey) // Remove from pending requests.
  184. req <- i
  185. return true
  186. } else if !p.closed && p.maxIdleItemsLocked() > len(p.freeItem) {
  187. p.freeItem = append(p.freeItem, &i)
  188. return true
  189. }
  190. return false
  191. }
  192. // Runs in a separate goroutine, opens new item when requested.
  193. func (p *Slice) itemOpener(ctx context.Context) {
  194. for {
  195. select {
  196. case <-ctx.Done():
  197. return
  198. case <-p.openerCh:
  199. p.openNewItem(ctx)
  200. }
  201. }
  202. }
  203. func (p *Slice) maybeOpenNewItems() {
  204. numRequests := len(p.itemRequests)
  205. if p.conf.Active > 0 {
  206. numCanOpen := p.conf.Active - p.active
  207. if numRequests > numCanOpen {
  208. numRequests = numCanOpen
  209. }
  210. }
  211. for numRequests > 0 {
  212. p.active++ // optimistically
  213. numRequests--
  214. if p.closed {
  215. return
  216. }
  217. p.openerCh <- struct{}{}
  218. }
  219. }
  220. // openNewItem one new item
  221. func (p *Slice) openNewItem(ctx context.Context) {
  222. // maybeOpenNewConnctions has already executed p.active++ before it sent
  223. // on p.openerCh. This function must execute p.active-- if the
  224. // item fails or is closed before returning.
  225. c, err := p.New(ctx)
  226. p.mu.Lock()
  227. defer p.mu.Unlock()
  228. if err != nil {
  229. p.release()
  230. return
  231. }
  232. if !p.putItemLocked(c) {
  233. p.active--
  234. c.Close()
  235. }
  236. }
  237. // setIdle sets the maximum number of items in the idle
  238. // item pool.
  239. //
  240. // If MaxOpenConns is greater than 0 but less than the new IdleConns
  241. // then the new IdleConns will be reduced to match the MaxOpenConns limit
  242. //
  243. // If n <= 0, no idle items are retained.
  244. func (p *Slice) setIdle(n int) {
  245. p.mu.Lock()
  246. if n > 0 {
  247. p.conf.Idle = n
  248. } else {
  249. // No idle items.
  250. p.conf.Idle = -1
  251. }
  252. // Make sure maxIdle doesn't exceed maxOpen
  253. if p.conf.Active > 0 && p.maxIdleItemsLocked() > p.conf.Active {
  254. p.conf.Idle = p.conf.Active
  255. }
  256. var closing []*item
  257. idleCount := len(p.freeItem)
  258. maxIdle := p.maxIdleItemsLocked()
  259. if idleCount > maxIdle {
  260. closing = p.freeItem[maxIdle:]
  261. p.freeItem = p.freeItem[:maxIdle]
  262. }
  263. p.mu.Unlock()
  264. for _, c := range closing {
  265. c.close()
  266. }
  267. }
  268. // setActive sets the maximum number of open items to the database.
  269. //
  270. // If IdleConns is greater than 0 and the new MaxOpenConns is less than
  271. // IdleConns, then IdleConns will be reduced to match the new
  272. // MaxOpenConns limit
  273. //
  274. // If n <= 0, then there is no limit on the number of open items.
  275. // The default is 0 (unlimited).
  276. func (p *Slice) setActive(n int) {
  277. p.mu.Lock()
  278. p.conf.Active = n
  279. if n < 0 {
  280. p.conf.Active = 0
  281. }
  282. syncIdle := p.conf.Active > 0 && p.maxIdleItemsLocked() > p.conf.Active
  283. p.mu.Unlock()
  284. if syncIdle {
  285. p.setIdle(n)
  286. }
  287. }
  288. // startCleanerLocked starts itemCleaner if needed.
  289. func (p *Slice) startCleanerLocked(d time.Duration) {
  290. if d <= 0 {
  291. // if set 0, staleCleaner() will return directly
  292. return
  293. }
  294. if d < time.Duration(p.conf.IdleTimeout) && p.cleanerCh != nil {
  295. select {
  296. case p.cleanerCh <- struct{}{}:
  297. default:
  298. }
  299. }
  300. // run only one, clean stale items.
  301. if p.cleanerCh == nil {
  302. p.cleanerCh = make(chan struct{}, 1)
  303. go p.staleCleaner(time.Duration(p.conf.IdleTimeout))
  304. }
  305. }
  306. func (p *Slice) staleCleaner(d time.Duration) {
  307. const minInterval = 100 * time.Millisecond
  308. if d < minInterval {
  309. d = minInterval
  310. }
  311. t := time.NewTimer(d)
  312. for {
  313. select {
  314. case <-t.C:
  315. case <-p.cleanerCh: // maxLifetime was changed or db was closed.
  316. }
  317. p.mu.Lock()
  318. d = time.Duration(p.conf.IdleTimeout)
  319. if p.closed || d <= 0 {
  320. p.mu.Unlock()
  321. return
  322. }
  323. expiredSince := nowFunc().Add(-d)
  324. var closing []*item
  325. for i := 0; i < len(p.freeItem); i++ {
  326. c := p.freeItem[i]
  327. if c.createdAt.Before(expiredSince) {
  328. closing = append(closing, c)
  329. p.active--
  330. last := len(p.freeItem) - 1
  331. p.freeItem[i] = p.freeItem[last]
  332. p.freeItem[last] = nil
  333. p.freeItem = p.freeItem[:last]
  334. i--
  335. }
  336. }
  337. p.mu.Unlock()
  338. for _, c := range closing {
  339. c.close()
  340. }
  341. if d < minInterval {
  342. d = minInterval
  343. }
  344. t.Reset(d)
  345. }
  346. }
  347. // nextRequestKeyLocked returns the next item request key.
  348. // It is assumed that nextRequest will not overflow.
  349. func (p *Slice) nextRequestKeyLocked() uint64 {
  350. next := p.nextRequest
  351. p.nextRequest++
  352. return next
  353. }
  354. const defaultIdleItems = 2
  355. func (p *Slice) maxIdleItemsLocked() int {
  356. n := p.conf.Idle
  357. switch {
  358. case n == 0:
  359. return defaultIdleItems
  360. case n < 0:
  361. return 0
  362. default:
  363. return n
  364. }
  365. }
  366. func (p *Slice) release() {
  367. p.active--
  368. p.maybeOpenNewItems()
  369. }
  370. // Close close pool.
  371. func (p *Slice) Close() error {
  372. p.mu.Lock()
  373. if p.closed {
  374. p.mu.Unlock()
  375. return nil
  376. }
  377. if p.cleanerCh != nil {
  378. close(p.cleanerCh)
  379. }
  380. var err error
  381. for _, i := range p.freeItem {
  382. i.close()
  383. }
  384. p.freeItem = nil
  385. p.closed = true
  386. for _, req := range p.itemRequests {
  387. close(req)
  388. }
  389. p.mu.Unlock()
  390. p.stop()
  391. return err
  392. }