rolling.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package counter
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. type bucket struct {
  7. val int64
  8. next *bucket
  9. }
  10. func (b *bucket) Add(val int64) {
  11. b.val += val
  12. }
  13. func (b *bucket) Value() int64 {
  14. return b.val
  15. }
  16. func (b *bucket) Reset() {
  17. b.val = 0
  18. }
  19. var _ Counter = new(rollingCounter)
  20. type rollingCounter struct {
  21. mu sync.RWMutex
  22. buckets []bucket
  23. bucketTime int64
  24. lastAccess int64
  25. cur *bucket
  26. }
  27. // NewRolling creates a new window. windowTime is the time covering the entire
  28. // window. windowBuckets is the number of buckets the window is divided into.
  29. // An example: a 10 second window with 10 buckets will have 10 buckets covering
  30. // 1 second each.
  31. func NewRolling(window time.Duration, winBucket int) Counter {
  32. buckets := make([]bucket, winBucket)
  33. bucket := &buckets[0]
  34. for i := 1; i < winBucket; i++ {
  35. bucket.next = &buckets[i]
  36. bucket = bucket.next
  37. }
  38. bucket.next = &buckets[0]
  39. bucketTime := time.Duration(window.Nanoseconds() / int64(winBucket))
  40. return &rollingCounter{
  41. cur: &buckets[0],
  42. buckets: buckets,
  43. bucketTime: int64(bucketTime),
  44. lastAccess: time.Now().UnixNano(),
  45. }
  46. }
  47. // Add increments the counter by value and return new value.
  48. func (r *rollingCounter) Add(val int64) {
  49. r.mu.Lock()
  50. r.lastBucket().Add(val)
  51. r.mu.Unlock()
  52. }
  53. // Value get the counter value.
  54. func (r *rollingCounter) Value() (sum int64) {
  55. now := time.Now().UnixNano()
  56. r.mu.RLock()
  57. b := r.cur
  58. i := r.elapsed(now)
  59. for j := 0; j < len(r.buckets); j++ {
  60. // skip all future reset bucket.
  61. if i > 0 {
  62. i--
  63. } else {
  64. sum += b.Value()
  65. }
  66. b = b.next
  67. }
  68. r.mu.RUnlock()
  69. return
  70. }
  71. // Reset reset the counter.
  72. func (r *rollingCounter) Reset() {
  73. r.mu.Lock()
  74. for i := range r.buckets {
  75. r.buckets[i].Reset()
  76. }
  77. r.mu.Unlock()
  78. }
  79. func (r *rollingCounter) elapsed(now int64) (i int) {
  80. var e int64
  81. if e = now - r.lastAccess; e <= r.bucketTime {
  82. return
  83. }
  84. if i = int(e / r.bucketTime); i > len(r.buckets) {
  85. i = len(r.buckets)
  86. }
  87. return
  88. }
  89. func (r *rollingCounter) lastBucket() (b *bucket) {
  90. now := time.Now().UnixNano()
  91. b = r.cur
  92. // reset the buckets between now and number of buckets ago. If
  93. // that is more that the existing buckets, reset all.
  94. if i := r.elapsed(now); i > 0 {
  95. r.lastAccess = now
  96. for ; i > 0; i-- {
  97. // replace the next used bucket.
  98. b = b.next
  99. b.Reset()
  100. }
  101. }
  102. r.cur = b
  103. return
  104. }