queue.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // Package timerqueue implements a priority queue for objects scheduled at a
  2. // particular time.
  3. package timerqueue
  4. import (
  5. "container/heap"
  6. "errors"
  7. "time"
  8. )
  9. // Timer is an interface that types implement to schedule and receive OnTimer
  10. // callbacks.
  11. type Timer interface {
  12. OnTimer(t time.Time)
  13. }
  14. //NewTimerWrapper util struct
  15. func NewTimerWrapper(fun TimerFunc) (result *TimerWrapper) {
  16. result = &TimerWrapper{fun}
  17. return
  18. }
  19. //TimerFunc timer function
  20. type TimerFunc func(t time.Time)
  21. //TimerWrapper just a time wrapper
  22. type TimerWrapper struct {
  23. fun TimerFunc
  24. }
  25. //OnTimer ontimer
  26. func (t *TimerWrapper) OnTimer(tm time.Time) {
  27. t.fun(tm)
  28. }
  29. // Queue is a time-sorted collection of Timer objects.
  30. type Queue struct {
  31. heap timerHeap
  32. table map[Timer]*timerData
  33. }
  34. type timerData struct {
  35. timer Timer
  36. time time.Time
  37. index int
  38. period time.Duration // if > 0, this will be a periodically event
  39. }
  40. // New creates a new timer priority queue.
  41. func New() *Queue {
  42. return &Queue{
  43. table: make(map[Timer]*timerData),
  44. }
  45. }
  46. // Len returns the current number of timer objects in the queue.
  47. func (q *Queue) Len() int {
  48. return len(q.heap)
  49. }
  50. // Schedule schedules a timer for exectuion at time tm. If the
  51. // timer was already scheduled, it is rescheduled.
  52. func (q *Queue) Schedule(t Timer, tm time.Time) {
  53. q.ScheduleRepeat(t, tm, 0)
  54. }
  55. // ScheduleRepeat give 0 duration, will not be repeatedly event
  56. func (q *Queue) ScheduleRepeat(t Timer, tm time.Time, period time.Duration) {
  57. if data, ok := q.table[t]; !ok {
  58. data = &timerData{t, tm, 0, period}
  59. heap.Push(&q.heap, data)
  60. q.table[t] = data
  61. } else {
  62. data.time = tm
  63. heap.Fix(&q.heap, data.index)
  64. }
  65. }
  66. // Unschedule unschedules a timer's execution.
  67. func (q *Queue) Unschedule(t Timer) {
  68. if data, ok := q.table[t]; ok {
  69. heap.Remove(&q.heap, data.index)
  70. delete(q.table, t)
  71. }
  72. }
  73. // GetTime returns the time at which the timer is scheduled.
  74. // If the timer isn't currently scheduled, an error is returned.
  75. func (q *Queue) GetTime(t Timer) (tm time.Time, err error) {
  76. if data, ok := q.table[t]; ok {
  77. return data.time, nil
  78. }
  79. return time.Time{}, errors.New("timerqueue: timer not scheduled")
  80. }
  81. // IsScheduled returns true if the timer is currently scheduled.
  82. func (q *Queue) IsScheduled(t Timer) bool {
  83. _, ok := q.table[t]
  84. return ok
  85. }
  86. // Clear unschedules all currently scheduled timers.
  87. func (q *Queue) Clear() {
  88. q.heap, q.table = nil, make(map[Timer]*timerData)
  89. }
  90. // PopFirst removes and returns the next timer to be scheduled and
  91. // the time at which it is scheduled to run.
  92. func (q *Queue) PopFirst() (t Timer, tm time.Time) {
  93. if len(q.heap) > 0 {
  94. data := heap.Pop(&q.heap).(*timerData)
  95. delete(q.table, data.timer)
  96. return data.timer, data.time
  97. }
  98. return nil, time.Time{}
  99. }
  100. // PeekFirst returns the next timer to be scheduled and the time
  101. // at which it is scheduled to run. It does not modify the contents
  102. // of the timer queue.
  103. func (q *Queue) PeekFirst() (t Timer, tm time.Time) {
  104. if len(q.heap) > 0 {
  105. return q.heap[0].timer, q.heap[0].time
  106. }
  107. return nil, time.Time{}
  108. }
  109. // Advance executes OnTimer callbacks for all timers scheduled to be
  110. // run before the time 'tm'. Executed timers are removed from the
  111. // timer queue.
  112. func (q *Queue) Advance(tm time.Time) {
  113. for len(q.heap) > 0 && !tm.Before(q.heap[0].time) {
  114. data := q.heap[0]
  115. heap.Remove(&q.heap, data.index)
  116. if data.period > 0 {
  117. data.time = data.time.Add(data.period)
  118. heap.Push(&q.heap, data)
  119. } else {
  120. delete(q.table, data.timer)
  121. }
  122. data.timer.OnTimer(data.time)
  123. }
  124. }
  125. /*
  126. * timerHeap
  127. */
  128. type timerHeap []*timerData
  129. //Len len interface
  130. func (h timerHeap) Len() int {
  131. return len(h)
  132. }
  133. //Less less interface
  134. func (h timerHeap) Less(i, j int) bool {
  135. return h[i].time.Before(h[j].time)
  136. }
  137. //Swap swap interface
  138. func (h timerHeap) Swap(i, j int) {
  139. h[i], h[j] = h[j], h[i]
  140. h[i].index, h[j].index = i, j
  141. }
  142. //Push push interface
  143. func (h *timerHeap) Push(x interface{}) {
  144. data := x.(*timerData)
  145. *h = append(*h, data)
  146. data.index = len(*h) - 1
  147. }
  148. //Pop pop interface
  149. func (h *timerHeap) Pop() interface{} {
  150. n := len(*h)
  151. data := (*h)[n-1]
  152. *h = (*h)[:n-1]
  153. data.index = -1
  154. return data
  155. }