123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- // Package timerqueue implements a priority queue for objects scheduled at a
- // particular time.
- package timerqueue
- import (
- "container/heap"
- "errors"
- "time"
- )
- // Timer is an interface that types implement to schedule and receive OnTimer
- // callbacks.
- type Timer interface {
- OnTimer(t time.Time)
- }
- //NewTimerWrapper util struct
- func NewTimerWrapper(fun TimerFunc) (result *TimerWrapper) {
- result = &TimerWrapper{fun}
- return
- }
- //TimerFunc timer function
- type TimerFunc func(t time.Time)
- //TimerWrapper just a time wrapper
- type TimerWrapper struct {
- fun TimerFunc
- }
- //OnTimer ontimer
- func (t *TimerWrapper) OnTimer(tm time.Time) {
- t.fun(tm)
- }
- // Queue is a time-sorted collection of Timer objects.
- type Queue struct {
- heap timerHeap
- table map[Timer]*timerData
- }
- type timerData struct {
- timer Timer
- time time.Time
- index int
- period time.Duration // if > 0, this will be a periodically event
- }
- // New creates a new timer priority queue.
- func New() *Queue {
- return &Queue{
- table: make(map[Timer]*timerData),
- }
- }
- // Len returns the current number of timer objects in the queue.
- func (q *Queue) Len() int {
- return len(q.heap)
- }
- // Schedule schedules a timer for exectuion at time tm. If the
- // timer was already scheduled, it is rescheduled.
- func (q *Queue) Schedule(t Timer, tm time.Time) {
- q.ScheduleRepeat(t, tm, 0)
- }
- // ScheduleRepeat give 0 duration, will not be repeatedly event
- func (q *Queue) ScheduleRepeat(t Timer, tm time.Time, period time.Duration) {
- if data, ok := q.table[t]; !ok {
- data = &timerData{t, tm, 0, period}
- heap.Push(&q.heap, data)
- q.table[t] = data
- } else {
- data.time = tm
- heap.Fix(&q.heap, data.index)
- }
- }
- // Unschedule unschedules a timer's execution.
- func (q *Queue) Unschedule(t Timer) {
- if data, ok := q.table[t]; ok {
- heap.Remove(&q.heap, data.index)
- delete(q.table, t)
- }
- }
- // GetTime returns the time at which the timer is scheduled.
- // If the timer isn't currently scheduled, an error is returned.
- func (q *Queue) GetTime(t Timer) (tm time.Time, err error) {
- if data, ok := q.table[t]; ok {
- return data.time, nil
- }
- return time.Time{}, errors.New("timerqueue: timer not scheduled")
- }
- // IsScheduled returns true if the timer is currently scheduled.
- func (q *Queue) IsScheduled(t Timer) bool {
- _, ok := q.table[t]
- return ok
- }
- // Clear unschedules all currently scheduled timers.
- func (q *Queue) Clear() {
- q.heap, q.table = nil, make(map[Timer]*timerData)
- }
- // PopFirst removes and returns the next timer to be scheduled and
- // the time at which it is scheduled to run.
- func (q *Queue) PopFirst() (t Timer, tm time.Time) {
- if len(q.heap) > 0 {
- data := heap.Pop(&q.heap).(*timerData)
- delete(q.table, data.timer)
- return data.timer, data.time
- }
- return nil, time.Time{}
- }
- // PeekFirst returns the next timer to be scheduled and the time
- // at which it is scheduled to run. It does not modify the contents
- // of the timer queue.
- func (q *Queue) PeekFirst() (t Timer, tm time.Time) {
- if len(q.heap) > 0 {
- return q.heap[0].timer, q.heap[0].time
- }
- return nil, time.Time{}
- }
- // Advance executes OnTimer callbacks for all timers scheduled to be
- // run before the time 'tm'. Executed timers are removed from the
- // timer queue.
- func (q *Queue) Advance(tm time.Time) {
- for len(q.heap) > 0 && !tm.Before(q.heap[0].time) {
- data := q.heap[0]
- heap.Remove(&q.heap, data.index)
- if data.period > 0 {
- data.time = data.time.Add(data.period)
- heap.Push(&q.heap, data)
- } else {
- delete(q.table, data.timer)
- }
- data.timer.OnTimer(data.time)
- }
- }
- /*
- * timerHeap
- */
- type timerHeap []*timerData
- //Len len interface
- func (h timerHeap) Len() int {
- return len(h)
- }
- //Less less interface
- func (h timerHeap) Less(i, j int) bool {
- return h[i].time.Before(h[j].time)
- }
- //Swap swap interface
- func (h timerHeap) Swap(i, j int) {
- h[i], h[j] = h[j], h[i]
- h[i].index, h[j].index = i, j
- }
- //Push push interface
- func (h *timerHeap) Push(x interface{}) {
- data := x.(*timerData)
- *h = append(*h, data)
- data.index = len(*h) - 1
- }
- //Pop pop interface
- func (h *timerHeap) Pop() interface{} {
- n := len(*h)
- data := (*h)[n-1]
- *h = (*h)[:n-1]
- data.index = -1
- return data
- }
|