sample.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. package metrics
  2. import (
  3. "math"
  4. "math/rand"
  5. "sort"
  6. "sync"
  7. "time"
  8. )
  9. const rescaleThreshold = time.Hour
  10. // Samples maintain a statistically-significant selection of values from
  11. // a stream.
  12. type Sample interface {
  13. Clear()
  14. Count() int64
  15. Max() int64
  16. Mean() float64
  17. Min() int64
  18. Percentile(float64) float64
  19. Percentiles([]float64) []float64
  20. Size() int
  21. Snapshot() Sample
  22. StdDev() float64
  23. Sum() int64
  24. Update(int64)
  25. Values() []int64
  26. Variance() float64
  27. }
  28. // ExpDecaySample is an exponentially-decaying sample using a forward-decaying
  29. // priority reservoir. See Cormode et al's "Forward Decay: A Practical Time
  30. // Decay Model for Streaming Systems".
  31. //
  32. // <http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf>
  33. type ExpDecaySample struct {
  34. alpha float64
  35. count int64
  36. mutex sync.Mutex
  37. reservoirSize int
  38. t0, t1 time.Time
  39. values *expDecaySampleHeap
  40. }
  41. // NewExpDecaySample constructs a new exponentially-decaying sample with the
  42. // given reservoir size and alpha.
  43. func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
  44. if UseNilMetrics {
  45. return NilSample{}
  46. }
  47. s := &ExpDecaySample{
  48. alpha: alpha,
  49. reservoirSize: reservoirSize,
  50. t0: time.Now(),
  51. values: newExpDecaySampleHeap(reservoirSize),
  52. }
  53. s.t1 = s.t0.Add(rescaleThreshold)
  54. return s
  55. }
  56. // Clear clears all samples.
  57. func (s *ExpDecaySample) Clear() {
  58. s.mutex.Lock()
  59. defer s.mutex.Unlock()
  60. s.count = 0
  61. s.t0 = time.Now()
  62. s.t1 = s.t0.Add(rescaleThreshold)
  63. s.values.Clear()
  64. }
  65. // Count returns the number of samples recorded, which may exceed the
  66. // reservoir size.
  67. func (s *ExpDecaySample) Count() int64 {
  68. s.mutex.Lock()
  69. defer s.mutex.Unlock()
  70. return s.count
  71. }
  72. // Max returns the maximum value in the sample, which may not be the maximum
  73. // value ever to be part of the sample.
  74. func (s *ExpDecaySample) Max() int64 {
  75. return SampleMax(s.Values())
  76. }
  77. // Mean returns the mean of the values in the sample.
  78. func (s *ExpDecaySample) Mean() float64 {
  79. return SampleMean(s.Values())
  80. }
  81. // Min returns the minimum value in the sample, which may not be the minimum
  82. // value ever to be part of the sample.
  83. func (s *ExpDecaySample) Min() int64 {
  84. return SampleMin(s.Values())
  85. }
  86. // Percentile returns an arbitrary percentile of values in the sample.
  87. func (s *ExpDecaySample) Percentile(p float64) float64 {
  88. return SamplePercentile(s.Values(), p)
  89. }
  90. // Percentiles returns a slice of arbitrary percentiles of values in the
  91. // sample.
  92. func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
  93. return SamplePercentiles(s.Values(), ps)
  94. }
  95. // Size returns the size of the sample, which is at most the reservoir size.
  96. func (s *ExpDecaySample) Size() int {
  97. s.mutex.Lock()
  98. defer s.mutex.Unlock()
  99. return s.values.Size()
  100. }
  101. // Snapshot returns a read-only copy of the sample.
  102. func (s *ExpDecaySample) Snapshot() Sample {
  103. s.mutex.Lock()
  104. defer s.mutex.Unlock()
  105. vals := s.values.Values()
  106. values := make([]int64, len(vals))
  107. for i, v := range vals {
  108. values[i] = v.v
  109. }
  110. return &SampleSnapshot{
  111. count: s.count,
  112. values: values,
  113. }
  114. }
  115. // StdDev returns the standard deviation of the values in the sample.
  116. func (s *ExpDecaySample) StdDev() float64 {
  117. return SampleStdDev(s.Values())
  118. }
  119. // Sum returns the sum of the values in the sample.
  120. func (s *ExpDecaySample) Sum() int64 {
  121. return SampleSum(s.Values())
  122. }
  123. // Update samples a new value.
  124. func (s *ExpDecaySample) Update(v int64) {
  125. s.update(time.Now(), v)
  126. }
  127. // Values returns a copy of the values in the sample.
  128. func (s *ExpDecaySample) Values() []int64 {
  129. s.mutex.Lock()
  130. defer s.mutex.Unlock()
  131. vals := s.values.Values()
  132. values := make([]int64, len(vals))
  133. for i, v := range vals {
  134. values[i] = v.v
  135. }
  136. return values
  137. }
  138. // Variance returns the variance of the values in the sample.
  139. func (s *ExpDecaySample) Variance() float64 {
  140. return SampleVariance(s.Values())
  141. }
  142. // update samples a new value at a particular timestamp. This is a method all
  143. // its own to facilitate testing.
  144. func (s *ExpDecaySample) update(t time.Time, v int64) {
  145. s.mutex.Lock()
  146. defer s.mutex.Unlock()
  147. s.count++
  148. if s.values.Size() == s.reservoirSize {
  149. s.values.Pop()
  150. }
  151. s.values.Push(expDecaySample{
  152. k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
  153. v: v,
  154. })
  155. if t.After(s.t1) {
  156. values := s.values.Values()
  157. t0 := s.t0
  158. s.values.Clear()
  159. s.t0 = t
  160. s.t1 = s.t0.Add(rescaleThreshold)
  161. for _, v := range values {
  162. v.k = v.k * math.Exp(-s.alpha*s.t0.Sub(t0).Seconds())
  163. s.values.Push(v)
  164. }
  165. }
  166. }
  167. // NilSample is a no-op Sample.
  168. type NilSample struct{}
  169. // Clear is a no-op.
  170. func (NilSample) Clear() {}
  171. // Count is a no-op.
  172. func (NilSample) Count() int64 { return 0 }
  173. // Max is a no-op.
  174. func (NilSample) Max() int64 { return 0 }
  175. // Mean is a no-op.
  176. func (NilSample) Mean() float64 { return 0.0 }
  177. // Min is a no-op.
  178. func (NilSample) Min() int64 { return 0 }
  179. // Percentile is a no-op.
  180. func (NilSample) Percentile(p float64) float64 { return 0.0 }
  181. // Percentiles is a no-op.
  182. func (NilSample) Percentiles(ps []float64) []float64 {
  183. return make([]float64, len(ps))
  184. }
  185. // Size is a no-op.
  186. func (NilSample) Size() int { return 0 }
  187. // Sample is a no-op.
  188. func (NilSample) Snapshot() Sample { return NilSample{} }
  189. // StdDev is a no-op.
  190. func (NilSample) StdDev() float64 { return 0.0 }
  191. // Sum is a no-op.
  192. func (NilSample) Sum() int64 { return 0 }
  193. // Update is a no-op.
  194. func (NilSample) Update(v int64) {}
  195. // Values is a no-op.
  196. func (NilSample) Values() []int64 { return []int64{} }
  197. // Variance is a no-op.
  198. func (NilSample) Variance() float64 { return 0.0 }
  199. // SampleMax returns the maximum value of the slice of int64.
  200. func SampleMax(values []int64) int64 {
  201. if 0 == len(values) {
  202. return 0
  203. }
  204. var max int64 = math.MinInt64
  205. for _, v := range values {
  206. if max < v {
  207. max = v
  208. }
  209. }
  210. return max
  211. }
  212. // SampleMean returns the mean value of the slice of int64.
  213. func SampleMean(values []int64) float64 {
  214. if 0 == len(values) {
  215. return 0.0
  216. }
  217. return float64(SampleSum(values)) / float64(len(values))
  218. }
  219. // SampleMin returns the minimum value of the slice of int64.
  220. func SampleMin(values []int64) int64 {
  221. if 0 == len(values) {
  222. return 0
  223. }
  224. var min int64 = math.MaxInt64
  225. for _, v := range values {
  226. if min > v {
  227. min = v
  228. }
  229. }
  230. return min
  231. }
  232. // SamplePercentiles returns an arbitrary percentile of the slice of int64.
  233. func SamplePercentile(values int64Slice, p float64) float64 {
  234. return SamplePercentiles(values, []float64{p})[0]
  235. }
  236. // SamplePercentiles returns a slice of arbitrary percentiles of the slice of
  237. // int64.
  238. func SamplePercentiles(values int64Slice, ps []float64) []float64 {
  239. scores := make([]float64, len(ps))
  240. size := len(values)
  241. if size > 0 {
  242. sort.Sort(values)
  243. for i, p := range ps {
  244. pos := p * float64(size+1)
  245. if pos < 1.0 {
  246. scores[i] = float64(values[0])
  247. } else if pos >= float64(size) {
  248. scores[i] = float64(values[size-1])
  249. } else {
  250. lower := float64(values[int(pos)-1])
  251. upper := float64(values[int(pos)])
  252. scores[i] = lower + (pos-math.Floor(pos))*(upper-lower)
  253. }
  254. }
  255. }
  256. return scores
  257. }
  258. // SampleSnapshot is a read-only copy of another Sample.
  259. type SampleSnapshot struct {
  260. count int64
  261. values []int64
  262. }
  263. func NewSampleSnapshot(count int64, values []int64) *SampleSnapshot {
  264. return &SampleSnapshot{
  265. count: count,
  266. values: values,
  267. }
  268. }
  269. // Clear panics.
  270. func (*SampleSnapshot) Clear() {
  271. panic("Clear called on a SampleSnapshot")
  272. }
  273. // Count returns the count of inputs at the time the snapshot was taken.
  274. func (s *SampleSnapshot) Count() int64 { return s.count }
  275. // Max returns the maximal value at the time the snapshot was taken.
  276. func (s *SampleSnapshot) Max() int64 { return SampleMax(s.values) }
  277. // Mean returns the mean value at the time the snapshot was taken.
  278. func (s *SampleSnapshot) Mean() float64 { return SampleMean(s.values) }
  279. // Min returns the minimal value at the time the snapshot was taken.
  280. func (s *SampleSnapshot) Min() int64 { return SampleMin(s.values) }
  281. // Percentile returns an arbitrary percentile of values at the time the
  282. // snapshot was taken.
  283. func (s *SampleSnapshot) Percentile(p float64) float64 {
  284. return SamplePercentile(s.values, p)
  285. }
  286. // Percentiles returns a slice of arbitrary percentiles of values at the time
  287. // the snapshot was taken.
  288. func (s *SampleSnapshot) Percentiles(ps []float64) []float64 {
  289. return SamplePercentiles(s.values, ps)
  290. }
  291. // Size returns the size of the sample at the time the snapshot was taken.
  292. func (s *SampleSnapshot) Size() int { return len(s.values) }
  293. // Snapshot returns the snapshot.
  294. func (s *SampleSnapshot) Snapshot() Sample { return s }
  295. // StdDev returns the standard deviation of values at the time the snapshot was
  296. // taken.
  297. func (s *SampleSnapshot) StdDev() float64 { return SampleStdDev(s.values) }
  298. // Sum returns the sum of values at the time the snapshot was taken.
  299. func (s *SampleSnapshot) Sum() int64 { return SampleSum(s.values) }
  300. // Update panics.
  301. func (*SampleSnapshot) Update(int64) {
  302. panic("Update called on a SampleSnapshot")
  303. }
  304. // Values returns a copy of the values in the sample.
  305. func (s *SampleSnapshot) Values() []int64 {
  306. values := make([]int64, len(s.values))
  307. copy(values, s.values)
  308. return values
  309. }
  310. // Variance returns the variance of values at the time the snapshot was taken.
  311. func (s *SampleSnapshot) Variance() float64 { return SampleVariance(s.values) }
  312. // SampleStdDev returns the standard deviation of the slice of int64.
  313. func SampleStdDev(values []int64) float64 {
  314. return math.Sqrt(SampleVariance(values))
  315. }
  316. // SampleSum returns the sum of the slice of int64.
  317. func SampleSum(values []int64) int64 {
  318. var sum int64
  319. for _, v := range values {
  320. sum += v
  321. }
  322. return sum
  323. }
  324. // SampleVariance returns the variance of the slice of int64.
  325. func SampleVariance(values []int64) float64 {
  326. if 0 == len(values) {
  327. return 0.0
  328. }
  329. m := SampleMean(values)
  330. var sum float64
  331. for _, v := range values {
  332. d := float64(v) - m
  333. sum += d * d
  334. }
  335. return sum / float64(len(values))
  336. }
  337. // A uniform sample using Vitter's Algorithm R.
  338. //
  339. // <http://www.cs.umd.edu/~samir/498/vitter.pdf>
  340. type UniformSample struct {
  341. count int64
  342. mutex sync.Mutex
  343. reservoirSize int
  344. values []int64
  345. }
  346. // NewUniformSample constructs a new uniform sample with the given reservoir
  347. // size.
  348. func NewUniformSample(reservoirSize int) Sample {
  349. if UseNilMetrics {
  350. return NilSample{}
  351. }
  352. return &UniformSample{
  353. reservoirSize: reservoirSize,
  354. values: make([]int64, 0, reservoirSize),
  355. }
  356. }
  357. // Clear clears all samples.
  358. func (s *UniformSample) Clear() {
  359. s.mutex.Lock()
  360. defer s.mutex.Unlock()
  361. s.count = 0
  362. s.values = make([]int64, 0, s.reservoirSize)
  363. }
  364. // Count returns the number of samples recorded, which may exceed the
  365. // reservoir size.
  366. func (s *UniformSample) Count() int64 {
  367. s.mutex.Lock()
  368. defer s.mutex.Unlock()
  369. return s.count
  370. }
  371. // Max returns the maximum value in the sample, which may not be the maximum
  372. // value ever to be part of the sample.
  373. func (s *UniformSample) Max() int64 {
  374. s.mutex.Lock()
  375. defer s.mutex.Unlock()
  376. return SampleMax(s.values)
  377. }
  378. // Mean returns the mean of the values in the sample.
  379. func (s *UniformSample) Mean() float64 {
  380. s.mutex.Lock()
  381. defer s.mutex.Unlock()
  382. return SampleMean(s.values)
  383. }
  384. // Min returns the minimum value in the sample, which may not be the minimum
  385. // value ever to be part of the sample.
  386. func (s *UniformSample) Min() int64 {
  387. s.mutex.Lock()
  388. defer s.mutex.Unlock()
  389. return SampleMin(s.values)
  390. }
  391. // Percentile returns an arbitrary percentile of values in the sample.
  392. func (s *UniformSample) Percentile(p float64) float64 {
  393. s.mutex.Lock()
  394. defer s.mutex.Unlock()
  395. return SamplePercentile(s.values, p)
  396. }
  397. // Percentiles returns a slice of arbitrary percentiles of values in the
  398. // sample.
  399. func (s *UniformSample) Percentiles(ps []float64) []float64 {
  400. s.mutex.Lock()
  401. defer s.mutex.Unlock()
  402. return SamplePercentiles(s.values, ps)
  403. }
  404. // Size returns the size of the sample, which is at most the reservoir size.
  405. func (s *UniformSample) Size() int {
  406. s.mutex.Lock()
  407. defer s.mutex.Unlock()
  408. return len(s.values)
  409. }
  410. // Snapshot returns a read-only copy of the sample.
  411. func (s *UniformSample) Snapshot() Sample {
  412. s.mutex.Lock()
  413. defer s.mutex.Unlock()
  414. values := make([]int64, len(s.values))
  415. copy(values, s.values)
  416. return &SampleSnapshot{
  417. count: s.count,
  418. values: values,
  419. }
  420. }
  421. // StdDev returns the standard deviation of the values in the sample.
  422. func (s *UniformSample) StdDev() float64 {
  423. s.mutex.Lock()
  424. defer s.mutex.Unlock()
  425. return SampleStdDev(s.values)
  426. }
  427. // Sum returns the sum of the values in the sample.
  428. func (s *UniformSample) Sum() int64 {
  429. s.mutex.Lock()
  430. defer s.mutex.Unlock()
  431. return SampleSum(s.values)
  432. }
  433. // Update samples a new value.
  434. func (s *UniformSample) Update(v int64) {
  435. s.mutex.Lock()
  436. defer s.mutex.Unlock()
  437. s.count++
  438. if len(s.values) < s.reservoirSize {
  439. s.values = append(s.values, v)
  440. } else {
  441. r := rand.Int63n(s.count)
  442. if r < int64(len(s.values)) {
  443. s.values[int(r)] = v
  444. }
  445. }
  446. }
  447. // Values returns a copy of the values in the sample.
  448. func (s *UniformSample) Values() []int64 {
  449. s.mutex.Lock()
  450. defer s.mutex.Unlock()
  451. values := make([]int64, len(s.values))
  452. copy(values, s.values)
  453. return values
  454. }
  455. // Variance returns the variance of the values in the sample.
  456. func (s *UniformSample) Variance() float64 {
  457. s.mutex.Lock()
  458. defer s.mutex.Unlock()
  459. return SampleVariance(s.values)
  460. }
  461. // expDecaySample represents an individual sample in a heap.
  462. type expDecaySample struct {
  463. k float64
  464. v int64
  465. }
  466. func newExpDecaySampleHeap(reservoirSize int) *expDecaySampleHeap {
  467. return &expDecaySampleHeap{make([]expDecaySample, 0, reservoirSize)}
  468. }
  469. // expDecaySampleHeap is a min-heap of expDecaySamples.
  470. // The internal implementation is copied from the standard library's container/heap
  471. type expDecaySampleHeap struct {
  472. s []expDecaySample
  473. }
  474. func (h *expDecaySampleHeap) Clear() {
  475. h.s = h.s[:0]
  476. }
  477. func (h *expDecaySampleHeap) Push(s expDecaySample) {
  478. n := len(h.s)
  479. h.s = h.s[0 : n+1]
  480. h.s[n] = s
  481. h.up(n)
  482. }
  483. func (h *expDecaySampleHeap) Pop() expDecaySample {
  484. n := len(h.s) - 1
  485. h.s[0], h.s[n] = h.s[n], h.s[0]
  486. h.down(0, n)
  487. n = len(h.s)
  488. s := h.s[n-1]
  489. h.s = h.s[0 : n-1]
  490. return s
  491. }
  492. func (h *expDecaySampleHeap) Size() int {
  493. return len(h.s)
  494. }
  495. func (h *expDecaySampleHeap) Values() []expDecaySample {
  496. return h.s
  497. }
  498. func (h *expDecaySampleHeap) up(j int) {
  499. for {
  500. i := (j - 1) / 2 // parent
  501. if i == j || !(h.s[j].k < h.s[i].k) {
  502. break
  503. }
  504. h.s[i], h.s[j] = h.s[j], h.s[i]
  505. j = i
  506. }
  507. }
  508. func (h *expDecaySampleHeap) down(i, n int) {
  509. for {
  510. j1 := 2*i + 1
  511. if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
  512. break
  513. }
  514. j := j1 // left child
  515. if j2 := j1 + 1; j2 < n && !(h.s[j1].k < h.s[j2].k) {
  516. j = j2 // = 2*i + 2 // right child
  517. }
  518. if !(h.s[j].k < h.s[i].k) {
  519. break
  520. }
  521. h.s[i], h.s[j] = h.s[j], h.s[i]
  522. i = j
  523. }
  524. }
  525. type int64Slice []int64
  526. func (p int64Slice) Len() int { return len(p) }
  527. func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
  528. func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }