queue.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. /*
  2. Copyright 2014 Workiva, LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. Package queue includes a regular queue and a priority queue.
  15. These queues rely on waitgroups to pause listening threads
  16. on empty queues until a message is received. If any thread
  17. calls Dispose on the queue, any listeners are immediately returned
  18. with an error. Any subsequent put to the queue will return an error
  19. as opposed to panicking as with channels. Queues will grow with unbounded
  20. behavior as opposed to channels which can be buffered but will pause
  21. while a thread attempts to put to a full channel.
  22. Recently added is a lockless ring buffer using the same basic C design as
  23. found here:
  24. http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
  25. Modified for use with Go with the addition of some dispose semantics providing
  26. the capability to release blocked threads. This works for both puts
  27. and gets, either will return an error if they are blocked and the buffer
  28. is disposed. This could serve as a signal to kill a goroutine. All threadsafety
  29. is acheived using CAS operations, making this buffer pretty quick.
  30. Benchmarks:
  31. BenchmarkPriorityQueue-8 2000000 782 ns/op
  32. BenchmarkQueue-8 2000000 671 ns/op
  33. BenchmarkChannel-8 1000000 2083 ns/op
  34. BenchmarkQueuePut-8 20000 84299 ns/op
  35. BenchmarkQueueGet-8 20000 80753 ns/op
  36. BenchmarkExecuteInParallel-8 20000 68891 ns/op
  37. BenchmarkRBLifeCycle-8 10000000 177 ns/op
  38. BenchmarkRBPut-8 30000000 58.1 ns/op
  39. BenchmarkRBGet-8 50000000 26.8 ns/op
  40. TODO: We really need a Fibonacci heap for the priority queue.
  41. TODO: Unify the types of queue to the same interface.
  42. */
  43. package queue
  44. import (
  45. "runtime"
  46. "sync"
  47. "sync/atomic"
  48. "time"
  49. )
  50. type waiters []*sema
  51. func (w *waiters) get() *sema {
  52. if len(*w) == 0 {
  53. return nil
  54. }
  55. sema := (*w)[0]
  56. copy((*w)[0:], (*w)[1:])
  57. (*w)[len(*w)-1] = nil // or the zero value of T
  58. *w = (*w)[:len(*w)-1]
  59. return sema
  60. }
  61. func (w *waiters) put(sema *sema) {
  62. *w = append(*w, sema)
  63. }
  64. func (w *waiters) remove(sema *sema) {
  65. if len(*w) == 0 {
  66. return
  67. }
  68. // build new slice, copy all except sema
  69. ws := *w
  70. newWs := make(waiters, 0, len(*w))
  71. for i := range ws {
  72. if ws[i] != sema {
  73. newWs = append(newWs, ws[i])
  74. }
  75. }
  76. *w = newWs
  77. }
  78. type items []interface{}
  79. func (items *items) get(number int64) []interface{} {
  80. returnItems := make([]interface{}, 0, number)
  81. index := int64(0)
  82. for i := int64(0); i < number; i++ {
  83. if i >= int64(len(*items)) {
  84. break
  85. }
  86. returnItems = append(returnItems, (*items)[i])
  87. (*items)[i] = nil
  88. index++
  89. }
  90. *items = (*items)[index:]
  91. return returnItems
  92. }
  93. func (items *items) peek() (interface{}, bool) {
  94. length := len(*items)
  95. if length == 0 {
  96. return nil, false
  97. }
  98. return (*items)[0], true
  99. }
  100. func (items *items) getUntil(checker func(item interface{}) bool) []interface{} {
  101. length := len(*items)
  102. if len(*items) == 0 {
  103. // returning nil here actually wraps that nil in a list
  104. // of interfaces... thanks go
  105. return []interface{}{}
  106. }
  107. returnItems := make([]interface{}, 0, length)
  108. index := -1
  109. for i, item := range *items {
  110. if !checker(item) {
  111. break
  112. }
  113. returnItems = append(returnItems, item)
  114. index = i
  115. (*items)[i] = nil // prevent memory leak
  116. }
  117. *items = (*items)[index+1:]
  118. return returnItems
  119. }
  120. type sema struct {
  121. ready chan bool
  122. response *sync.WaitGroup
  123. }
  124. func newSema() *sema {
  125. return &sema{
  126. ready: make(chan bool, 1),
  127. response: &sync.WaitGroup{},
  128. }
  129. }
  130. // Queue is the struct responsible for tracking the state
  131. // of the queue.
  132. type Queue struct {
  133. waiters waiters
  134. items items
  135. lock sync.Mutex
  136. disposed bool
  137. }
  138. // Put will add the specified items to the queue.
  139. func (q *Queue) Put(items ...interface{}) error {
  140. if len(items) == 0 {
  141. return nil
  142. }
  143. q.lock.Lock()
  144. if q.disposed {
  145. q.lock.Unlock()
  146. return ErrDisposed
  147. }
  148. q.items = append(q.items, items...)
  149. for {
  150. sema := q.waiters.get()
  151. if sema == nil {
  152. break
  153. }
  154. sema.response.Add(1)
  155. select {
  156. case sema.ready <- true:
  157. sema.response.Wait()
  158. default:
  159. // This semaphore timed out.
  160. }
  161. if len(q.items) == 0 {
  162. break
  163. }
  164. }
  165. q.lock.Unlock()
  166. return nil
  167. }
  168. // Get retrieves items from the queue. If there are some items in the
  169. // queue, get will return a number UP TO the number passed in as a
  170. // parameter. If no items are in the queue, this method will pause
  171. // until items are added to the queue.
  172. func (q *Queue) Get(number int64) ([]interface{}, error) {
  173. return q.Poll(number, 0)
  174. }
  175. // Poll retrieves items from the queue. If there are some items in the queue,
  176. // Poll will return a number UP TO the number passed in as a parameter. If no
  177. // items are in the queue, this method will pause until items are added to the
  178. // queue or the provided timeout is reached. A non-positive timeout will block
  179. // until items are added. If a timeout occurs, ErrTimeout is returned.
  180. func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error) {
  181. if number < 1 {
  182. // thanks again go
  183. return []interface{}{}, nil
  184. }
  185. q.lock.Lock()
  186. if q.disposed {
  187. q.lock.Unlock()
  188. return nil, ErrDisposed
  189. }
  190. var items []interface{}
  191. if len(q.items) == 0 {
  192. sema := newSema()
  193. q.waiters.put(sema)
  194. q.lock.Unlock()
  195. var timeoutC <-chan time.Time
  196. if timeout > 0 {
  197. timeoutC = time.After(timeout)
  198. }
  199. select {
  200. case <-sema.ready:
  201. // we are now inside the put's lock
  202. if q.disposed {
  203. return nil, ErrDisposed
  204. }
  205. items = q.items.get(number)
  206. sema.response.Done()
  207. return items, nil
  208. case <-timeoutC:
  209. // cleanup the sema that was added to waiters
  210. select {
  211. case sema.ready <- true:
  212. // we called this before Put() could
  213. // Remove sema from waiters.
  214. q.lock.Lock()
  215. q.waiters.remove(sema)
  216. q.lock.Unlock()
  217. default:
  218. // Put() got it already, we need to call Done() so Put() can move on
  219. sema.response.Done()
  220. }
  221. return nil, ErrTimeout
  222. }
  223. }
  224. items = q.items.get(number)
  225. q.lock.Unlock()
  226. return items, nil
  227. }
  228. // Peek returns a the first item in the queue by value
  229. // without modifying the queue.
  230. func (q *Queue) Peek() (interface{}, error) {
  231. q.lock.Lock()
  232. defer q.lock.Unlock()
  233. if q.disposed {
  234. return nil, ErrDisposed
  235. }
  236. peekItem, ok := q.items.peek()
  237. if !ok {
  238. return nil, ErrEmptyQueue
  239. }
  240. return peekItem, nil
  241. }
  242. // TakeUntil takes a function and returns a list of items that
  243. // match the checker until the checker returns false. This does not
  244. // wait if there are no items in the queue.
  245. func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error) {
  246. if checker == nil {
  247. return nil, nil
  248. }
  249. q.lock.Lock()
  250. if q.disposed {
  251. q.lock.Unlock()
  252. return nil, ErrDisposed
  253. }
  254. result := q.items.getUntil(checker)
  255. q.lock.Unlock()
  256. return result, nil
  257. }
  258. // Empty returns a bool indicating if this bool is empty.
  259. func (q *Queue) Empty() bool {
  260. q.lock.Lock()
  261. defer q.lock.Unlock()
  262. return len(q.items) == 0
  263. }
  264. // Len returns the number of items in this queue.
  265. func (q *Queue) Len() int64 {
  266. q.lock.Lock()
  267. defer q.lock.Unlock()
  268. return int64(len(q.items))
  269. }
  270. // Disposed returns a bool indicating if this queue
  271. // has had disposed called on it.
  272. func (q *Queue) Disposed() bool {
  273. q.lock.Lock()
  274. defer q.lock.Unlock()
  275. return q.disposed
  276. }
  277. // Dispose will dispose of this queue and returns
  278. // the items disposed. Any subsequent calls to Get
  279. // or Put will return an error.
  280. func (q *Queue) Dispose() []interface{} {
  281. q.lock.Lock()
  282. defer q.lock.Unlock()
  283. q.disposed = true
  284. for _, waiter := range q.waiters {
  285. waiter.response.Add(1)
  286. select {
  287. case waiter.ready <- true:
  288. // release Poll immediately
  289. default:
  290. // ignore if it's a timeout or in the get
  291. }
  292. }
  293. disposedItems := q.items
  294. q.items = nil
  295. q.waiters = nil
  296. return disposedItems
  297. }
  298. // New is a constructor for a new threadsafe queue.
  299. func New(hint int64) *Queue {
  300. return &Queue{
  301. items: make([]interface{}, 0, hint),
  302. }
  303. }
  304. // ExecuteInParallel will (in parallel) call the provided function
  305. // with each item in the queue until the queue is exhausted. When the queue
  306. // is exhausted execution is complete and all goroutines will be killed.
  307. // This means that the queue will be disposed so cannot be used again.
  308. func ExecuteInParallel(q *Queue, fn func(interface{})) {
  309. if q == nil {
  310. return
  311. }
  312. q.lock.Lock() // so no one touches anything in the middle
  313. // of this process
  314. todo, done := uint64(len(q.items)), int64(-1)
  315. // this is important or we might face an infinite loop
  316. if todo == 0 {
  317. return
  318. }
  319. numCPU := 1
  320. if runtime.NumCPU() > 1 {
  321. numCPU = runtime.NumCPU() - 1
  322. }
  323. var wg sync.WaitGroup
  324. wg.Add(numCPU)
  325. items := q.items
  326. for i := 0; i < numCPU; i++ {
  327. go func() {
  328. for {
  329. index := atomic.AddInt64(&done, 1)
  330. if index >= int64(todo) {
  331. wg.Done()
  332. break
  333. }
  334. fn(items[index])
  335. items[index] = 0
  336. }
  337. }()
  338. }
  339. wg.Wait()
  340. q.lock.Unlock()
  341. q.Dispose()
  342. }