priority_queue.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. /*
  2. Copyright 2014 Workiva, LLC
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. /*
  14. The priority queue is almost a spitting image of the logic
  15. used for a regular queue. In order to keep the logic fast,
  16. this code is repeated instead of using casts to cast to interface{}
  17. back and forth. If Go had inheritance and generics, this problem
  18. would be easier to solve.
  19. */
  20. package queue
  21. import "sync"
  22. // Item is an item that can be added to the priority queue.
  23. type Item interface {
  24. // Compare returns a bool that can be used to determine
  25. // ordering in the priority queue. Assuming the queue
  26. // is in ascending order, this should return > logic.
  27. // Return 1 to indicate this object is greater than the
  28. // the other logic, 0 to indicate equality, and -1 to indicate
  29. // less than other.
  30. Compare(other Item) int
  31. HashCode() int64
  32. }
  33. type priorityItems []Item
  34. func (items *priorityItems) swap(i, j int) {
  35. (*items)[i], (*items)[j] = (*items)[j], (*items)[i]
  36. }
  37. func (items *priorityItems) pop() Item {
  38. size := len(*items)
  39. // Move last leaf to root, and 'pop' the last item.
  40. items.swap(size-1, 0)
  41. item := (*items)[size-1] // Item to return.
  42. (*items)[size-1], *items = nil, (*items)[:size-1]
  43. // 'Bubble down' to restore heap property.
  44. index := 0
  45. childL, childR := 2*index+1, 2*index+2
  46. for len(*items) > childL {
  47. child := childL
  48. if len(*items) > childR && (*items)[childR].Compare((*items)[childL]) < 0 {
  49. child = childR
  50. }
  51. if (*items)[child].Compare((*items)[index]) < 0 {
  52. items.swap(index, child)
  53. index = child
  54. childL, childR = 2*index+1, 2*index+2
  55. } else {
  56. break
  57. }
  58. }
  59. return item
  60. }
  61. func (items *priorityItems) get(number int) []Item {
  62. returnItems := make([]Item, 0, number)
  63. for i := 0; i < number; i++ {
  64. if len(*items) == 0 {
  65. break
  66. }
  67. returnItems = append(returnItems, items.pop())
  68. }
  69. return returnItems
  70. }
  71. func (items *priorityItems) push(item Item) {
  72. // Stick the item as the end of the last level.
  73. *items = append(*items, item)
  74. // 'Bubble up' to restore heap property.
  75. index := len(*items) - 1
  76. parent := int((index - 1) / 2)
  77. for parent >= 0 && (*items)[parent].Compare(item) > 0 {
  78. items.swap(index, parent)
  79. index = parent
  80. parent = int((index - 1) / 2)
  81. }
  82. }
  83. // PriorityQueue is similar to queue except that it takes
  84. // items that implement the Item interface and adds them
  85. // to the queue in priority order.
  86. type PriorityQueue struct {
  87. waiters waiters
  88. items priorityItems
  89. itemMap map[int64]struct{}
  90. lock sync.Mutex
  91. disposeLock sync.Mutex
  92. disposed bool
  93. allowDuplicates bool
  94. }
  95. // Put adds items to the queue.
  96. func (pq *PriorityQueue) Put(items ...Item) error {
  97. if len(items) == 0 {
  98. return nil
  99. }
  100. pq.lock.Lock()
  101. defer pq.lock.Unlock()
  102. if pq.disposed {
  103. return ErrDisposed
  104. }
  105. for _, item := range items {
  106. if pq.allowDuplicates {
  107. pq.items.push(item)
  108. } else if _, ok := pq.itemMap[item.HashCode()]; !ok {
  109. pq.itemMap[item.HashCode()] = struct{}{}
  110. pq.items.push(item)
  111. }
  112. }
  113. for {
  114. sema := pq.waiters.get()
  115. if sema == nil {
  116. break
  117. }
  118. sema.response.Add(1)
  119. sema.ready <- true
  120. sema.response.Wait()
  121. if len(pq.items) == 0 {
  122. break
  123. }
  124. }
  125. return nil
  126. }
  127. // Get retrieves items from the queue. If the queue is empty,
  128. // this call blocks until the next item is added to the queue. This
  129. // will attempt to retrieve number of items.
  130. func (pq *PriorityQueue) Get(number int) ([]Item, error) {
  131. if number < 1 {
  132. return nil, nil
  133. }
  134. pq.lock.Lock()
  135. if pq.disposed {
  136. pq.lock.Unlock()
  137. return nil, ErrDisposed
  138. }
  139. var items []Item
  140. // Remove references to popped items.
  141. deleteItems := func(items []Item) {
  142. for _, item := range items {
  143. delete(pq.itemMap, item.HashCode())
  144. }
  145. }
  146. if len(pq.items) == 0 {
  147. sema := newSema()
  148. pq.waiters.put(sema)
  149. pq.lock.Unlock()
  150. <-sema.ready
  151. if pq.Disposed() {
  152. return nil, ErrDisposed
  153. }
  154. items = pq.items.get(number)
  155. if !pq.allowDuplicates {
  156. deleteItems(items)
  157. }
  158. sema.response.Done()
  159. return items, nil
  160. }
  161. items = pq.items.get(number)
  162. deleteItems(items)
  163. pq.lock.Unlock()
  164. return items, nil
  165. }
  166. // Peek will look at the next item without removing it from the queue.
  167. func (pq *PriorityQueue) Peek() Item {
  168. pq.lock.Lock()
  169. defer pq.lock.Unlock()
  170. if len(pq.items) > 0 {
  171. return pq.items[0]
  172. }
  173. return nil
  174. }
  175. // Empty returns a bool indicating if there are any items left
  176. // in the queue.
  177. func (pq *PriorityQueue) Empty() bool {
  178. pq.lock.Lock()
  179. defer pq.lock.Unlock()
  180. return len(pq.items) == 0
  181. }
  182. // Len returns a number indicating how many items are in the queue.
  183. func (pq *PriorityQueue) Len() int {
  184. pq.lock.Lock()
  185. defer pq.lock.Unlock()
  186. return len(pq.items)
  187. }
  188. // Disposed returns a bool indicating if this queue has been disposed.
  189. func (pq *PriorityQueue) Disposed() bool {
  190. pq.disposeLock.Lock()
  191. defer pq.disposeLock.Unlock()
  192. return pq.disposed
  193. }
  194. // Dispose will prevent any further reads/writes to this queue
  195. // and frees available resources.
  196. func (pq *PriorityQueue) Dispose() {
  197. pq.lock.Lock()
  198. defer pq.lock.Unlock()
  199. pq.disposeLock.Lock()
  200. defer pq.disposeLock.Unlock()
  201. pq.disposed = true
  202. for _, waiter := range pq.waiters {
  203. waiter.response.Add(1)
  204. waiter.ready <- true
  205. }
  206. pq.items = nil
  207. pq.waiters = nil
  208. }
  209. // NewPriorityQueue is the constructor for a priority queue.
  210. func NewPriorityQueue(hint int, allowDuplicates bool) *PriorityQueue {
  211. return &PriorityQueue{
  212. items: make(priorityItems, 0, hint),
  213. itemMap: make(map[int64]struct{}, hint),
  214. allowDuplicates: allowDuplicates,
  215. }
  216. }