summary.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package summary
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. type bucket struct {
  7. val int64
  8. count int64
  9. next *bucket
  10. }
  11. func (b *bucket) Add(val int64) {
  12. b.val += val
  13. b.count++
  14. }
  15. func (b *bucket) Value() (int64, int64) {
  16. return b.val, b.count
  17. }
  18. func (b *bucket) Reset() {
  19. b.val = 0
  20. b.count = 0
  21. }
  22. // Summary is a summary interface.
  23. type Summary interface {
  24. Add(int64)
  25. Reset()
  26. Value() (val int64, cnt int64)
  27. }
  28. type summary struct {
  29. mu sync.RWMutex
  30. buckets []bucket
  31. bucketTime int64
  32. lastAccess int64
  33. cur *bucket
  34. }
  35. // New new a summary.
  36. //
  37. // use RollingCounter creates a new window. windowTime is the time covering the entire
  38. // window. windowBuckets is the number of buckets the window is divided into.
  39. // An example: a 10 second window with 10 buckets will have 10 buckets covering
  40. // 1 second each.
  41. func New(window time.Duration, winBucket int) Summary {
  42. buckets := make([]bucket, winBucket)
  43. bucket := &buckets[0]
  44. for i := 1; i < winBucket; i++ {
  45. bucket.next = &buckets[i]
  46. bucket = bucket.next
  47. }
  48. bucket.next = &buckets[0]
  49. bucketTime := time.Duration(window.Nanoseconds() / int64(winBucket))
  50. return &summary{
  51. cur: &buckets[0],
  52. buckets: buckets,
  53. bucketTime: int64(bucketTime),
  54. lastAccess: time.Now().UnixNano(),
  55. }
  56. }
  57. // Add increments the summary by value.
  58. func (s *summary) Add(val int64) {
  59. s.mu.Lock()
  60. s.lastBucket().Add(val)
  61. s.mu.Unlock()
  62. }
  63. // Value get the summary value and count.
  64. func (s *summary) Value() (val int64, cnt int64) {
  65. now := time.Now().UnixNano()
  66. s.mu.RLock()
  67. b := s.cur
  68. i := s.elapsed(now)
  69. for j := 0; j < len(s.buckets); j++ {
  70. // skip all future reset bucket.
  71. if i > 0 {
  72. i--
  73. } else {
  74. v, c := b.Value()
  75. val += v
  76. cnt += c
  77. }
  78. b = b.next
  79. }
  80. s.mu.RUnlock()
  81. return
  82. }
  83. // Reset reset the counter.
  84. func (s *summary) Reset() {
  85. s.mu.Lock()
  86. for i := range s.buckets {
  87. s.buckets[i].Reset()
  88. }
  89. s.mu.Unlock()
  90. }
  91. func (s *summary) elapsed(now int64) (i int) {
  92. var e int64
  93. if e = now - s.lastAccess; e <= s.bucketTime {
  94. return
  95. }
  96. if i = int(e / s.bucketTime); i > len(s.buckets) {
  97. i = len(s.buckets)
  98. }
  99. return
  100. }
  101. func (s *summary) lastBucket() (b *bucket) {
  102. now := time.Now().UnixNano()
  103. b = s.cur
  104. // reset the buckets between now and number of buckets ago. If
  105. // that is more that the existing buckets, reset all.
  106. if i := s.elapsed(now); i > 0 {
  107. s.lastAccess = now
  108. for ; i > 0; i-- {
  109. // replace the next used bucket.
  110. b = b.next
  111. b.Reset()
  112. }
  113. }
  114. s.cur = b
  115. return
  116. }