timer.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package time
  2. import (
  3. "sync"
  4. itime "time"
  5. "go-common/library/log"
  6. )
  7. const (
  8. timerFormat = "2006-01-02 15:04:05"
  9. infiniteDuration = itime.Duration(1<<63 - 1)
  10. )
  11. var (
  12. timerLazyDelay = 300 * itime.Millisecond
  13. )
  14. // TimerData timer data.
  15. type TimerData struct {
  16. Key string
  17. expire itime.Time
  18. fn func()
  19. index int
  20. next *TimerData
  21. }
  22. // Delay delay duration.
  23. func (td *TimerData) Delay() itime.Duration {
  24. return td.expire.Sub(itime.Now())
  25. }
  26. // ExpireString expire string.
  27. func (td *TimerData) ExpireString() string {
  28. return td.expire.Format(timerFormat)
  29. }
  30. // Timer timer.
  31. type Timer struct {
  32. lock sync.Mutex
  33. free *TimerData
  34. timers []*TimerData
  35. signal *itime.Timer
  36. num int
  37. }
  38. // NewTimer new a timer.
  39. // A heap must be initialized before any of the heap operations
  40. // can be used. Init is idempotent with respect to the heap invariants
  41. // and may be called whenever the heap invariants may have been invalidated.
  42. // Its complexity is O(n) where n = h.Len().
  43. //
  44. func NewTimer(num int) (t *Timer) {
  45. t = new(Timer)
  46. t.init(num)
  47. return t
  48. }
  49. // Init init the timer.
  50. func (t *Timer) Init(num int) {
  51. t.init(num)
  52. }
  53. func (t *Timer) init(num int) {
  54. t.signal = itime.NewTimer(infiniteDuration)
  55. t.timers = make([]*TimerData, 0, num)
  56. t.num = num
  57. t.grow()
  58. go t.start()
  59. }
  60. func (t *Timer) grow() {
  61. var (
  62. i int
  63. td *TimerData
  64. tds = make([]TimerData, t.num)
  65. )
  66. t.free = &(tds[0])
  67. td = t.free
  68. for i = 1; i < t.num; i++ {
  69. td.next = &(tds[i])
  70. td = td.next
  71. }
  72. td.next = nil
  73. }
  74. // get get a free timer data.
  75. func (t *Timer) get() (td *TimerData) {
  76. if td = t.free; td == nil {
  77. t.grow()
  78. td = t.free
  79. }
  80. t.free = td.next
  81. return
  82. }
  83. // put put back a timer data.
  84. func (t *Timer) put(td *TimerData) {
  85. td.fn = nil
  86. td.next = t.free
  87. t.free = td
  88. }
  89. // Add add the element x onto the heap. The complexity is
  90. // O(log(n)) where n = h.Len().
  91. func (t *Timer) Add(expire itime.Duration, fn func()) (td *TimerData) {
  92. t.lock.Lock()
  93. td = t.get()
  94. td.expire = itime.Now().Add(expire)
  95. td.fn = fn
  96. t.add(td)
  97. t.lock.Unlock()
  98. return
  99. }
  100. // Del removes the element at index i from the heap.
  101. // The complexity is O(log(n)) where n = h.Len().
  102. func (t *Timer) Del(td *TimerData) {
  103. t.lock.Lock()
  104. t.del(td)
  105. t.put(td)
  106. t.lock.Unlock()
  107. }
  108. // Push pushes the element x onto the heap. The complexity is
  109. // O(log(n)) where n = h.Len().
  110. func (t *Timer) add(td *TimerData) {
  111. var d itime.Duration
  112. td.index = len(t.timers)
  113. // add to the minheap last node
  114. t.timers = append(t.timers, td)
  115. t.up(td.index)
  116. if td.index == 0 {
  117. // if first node, signal start goroutine
  118. d = td.Delay()
  119. t.signal.Reset(d)
  120. if Debug {
  121. log.Info("timer: add reset delay %d ms", int64(d)/int64(itime.Millisecond))
  122. }
  123. }
  124. if Debug {
  125. log.Info("timer: push item key: %s, expire: %s, index: %d", td.Key, td.ExpireString(), td.index)
  126. }
  127. }
  128. func (t *Timer) del(td *TimerData) {
  129. var (
  130. i = td.index
  131. last = len(t.timers) - 1
  132. )
  133. if i < 0 || i > last || t.timers[i] != td {
  134. // already remove, usually by expire
  135. if Debug {
  136. log.Info("timer del i: %d, last: %d, %p", i, last, td)
  137. }
  138. return
  139. }
  140. if i != last {
  141. t.swap(i, last)
  142. t.down(i, last)
  143. t.up(i)
  144. }
  145. // remove item is the last node
  146. t.timers[last].index = -1 // for safety
  147. t.timers = t.timers[:last]
  148. if Debug {
  149. log.Info("timer: remove item key: %s, expire: %s, index: %d", td.Key, td.ExpireString(), td.index)
  150. }
  151. }
  152. // Set update timer data.
  153. func (t *Timer) Set(td *TimerData, expire itime.Duration) {
  154. t.lock.Lock()
  155. t.del(td)
  156. td.expire = itime.Now().Add(expire)
  157. t.add(td)
  158. t.lock.Unlock()
  159. }
  160. // start start the timer.
  161. func (t *Timer) start() {
  162. for {
  163. t.expire()
  164. <-t.signal.C
  165. }
  166. }
  167. // expire removes the minimum element (according to Less) from the heap.
  168. // The complexity is O(log(n)) where n = max.
  169. // It is equivalent to Del(0).
  170. func (t *Timer) expire() {
  171. var (
  172. fn func()
  173. td *TimerData
  174. d itime.Duration
  175. )
  176. t.lock.Lock()
  177. for {
  178. if len(t.timers) == 0 {
  179. d = infiniteDuration
  180. if Debug {
  181. log.Info("timer: no other instance")
  182. }
  183. break
  184. }
  185. td = t.timers[0]
  186. if d = td.Delay(); d > 0 {
  187. break
  188. }
  189. fn = td.fn
  190. // let caller put back
  191. t.del(td)
  192. t.lock.Unlock()
  193. if fn == nil {
  194. log.Warn("expire timer no fn")
  195. } else {
  196. if Debug {
  197. log.Info("timer key: %s, expire: %s, index: %d expired, call fn", td.Key, td.ExpireString(), td.index)
  198. }
  199. fn()
  200. }
  201. t.lock.Lock()
  202. }
  203. t.signal.Reset(d)
  204. if Debug {
  205. log.Info("timer: expier reset delay %d ms", int64(d)/int64(itime.Millisecond))
  206. }
  207. t.lock.Unlock()
  208. }
  209. func (t *Timer) up(j int) {
  210. for {
  211. i := (j - 1) / 2 // parent
  212. if i <= j || !t.less(j, i) {
  213. break
  214. }
  215. t.swap(i, j)
  216. j = i
  217. }
  218. }
  219. func (t *Timer) down(i, n int) {
  220. for {
  221. j1 := 2*i + 1
  222. if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
  223. break
  224. }
  225. j := j1 // left child
  226. if j2 := j1 + 1; j2 < n && !t.less(j1, j2) {
  227. j = j2 // = 2*i + 2 // right child
  228. }
  229. if !t.less(j, i) {
  230. break
  231. }
  232. t.swap(i, j)
  233. i = j
  234. }
  235. }
  236. func (t *Timer) less(i, j int) bool {
  237. return t.timers[i].expire.Before(t.timers[j].expire)
  238. }
  239. func (t *Timer) swap(i, j int) {
  240. t.timers[i], t.timers[j] = t.timers[j], t.timers[i]
  241. t.timers[i].index = i
  242. t.timers[j].index = j
  243. }