vegas.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package vegas
  2. import (
  3. "math"
  4. "math/rand"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "go-common/library/rate"
  9. )
  10. const (
  11. _minWindowTime = int64(time.Millisecond * 500)
  12. _maxWindowTime = int64(time.Millisecond * 2000)
  13. _minLimit = 8
  14. _maxLimit = 2048
  15. )
  16. // Stat is the Statistics of vegas.
  17. type Stat struct {
  18. Limit int64
  19. InFlight int64
  20. MinRTT time.Duration
  21. LastRTT time.Duration
  22. }
  23. // New new a rate vegas.
  24. func New() *Vegas {
  25. v := &Vegas{
  26. probes: 100,
  27. limit: _minLimit,
  28. }
  29. v.sample.Store(&sample{})
  30. return v
  31. }
  32. // Vegas tcp vegas.
  33. type Vegas struct {
  34. limit int64
  35. inFlight int64
  36. updateTime int64
  37. minRTT int64
  38. sample atomic.Value
  39. mu sync.Mutex
  40. probes int64
  41. }
  42. // Stat return the statistics of vegas.
  43. func (v *Vegas) Stat() Stat {
  44. return Stat{
  45. Limit: atomic.LoadInt64(&v.limit),
  46. InFlight: atomic.LoadInt64(&v.inFlight),
  47. MinRTT: time.Duration(atomic.LoadInt64(&v.minRTT)),
  48. LastRTT: time.Duration(v.sample.Load().(*sample).RTT()),
  49. }
  50. }
  51. // Acquire No matter success or not,done() must be called at last.
  52. func (v *Vegas) Acquire() (done func(time.Time, rate.Op), success bool) {
  53. inFlight := atomic.AddInt64(&v.inFlight, 1)
  54. if inFlight <= atomic.LoadInt64(&v.limit) {
  55. success = true
  56. }
  57. return func(start time.Time, op rate.Op) {
  58. atomic.AddInt64(&v.inFlight, -1)
  59. if op == rate.Ignore {
  60. return
  61. }
  62. end := time.Now().UnixNano()
  63. rtt := end - start.UnixNano()
  64. s := v.sample.Load().(*sample)
  65. if op == rate.Drop {
  66. s.Add(rtt, inFlight, true)
  67. } else if op == rate.Success {
  68. s.Add(rtt, inFlight, false)
  69. }
  70. if end > atomic.LoadInt64(&v.updateTime) && s.Count() >= 16 {
  71. v.mu.Lock()
  72. defer v.mu.Unlock()
  73. if v.sample.Load().(*sample) != s {
  74. return
  75. }
  76. v.sample.Store(&sample{})
  77. lastRTT := s.RTT()
  78. if lastRTT <= 0 {
  79. return
  80. }
  81. updateTime := end + lastRTT*5
  82. if lastRTT*5 < _minWindowTime {
  83. updateTime = end + _minWindowTime
  84. } else if lastRTT*5 > _maxWindowTime {
  85. updateTime = end + _maxWindowTime
  86. }
  87. atomic.StoreInt64(&v.updateTime, updateTime)
  88. limit := atomic.LoadInt64(&v.limit)
  89. queue := float64(limit) * (1 - float64(v.minRTT)/float64(lastRTT))
  90. v.probes--
  91. if v.probes <= 0 {
  92. maxFlight := s.MaxInFlight()
  93. if maxFlight*2 < v.limit || maxFlight <= _minLimit {
  94. v.probes = 3*limit + rand.Int63n(3*limit)
  95. v.minRTT = lastRTT
  96. }
  97. }
  98. if v.minRTT == 0 || lastRTT < v.minRTT {
  99. v.minRTT = lastRTT
  100. }
  101. var newLimit float64
  102. threshold := math.Sqrt(float64(limit)) / 2
  103. if s.Drop() {
  104. newLimit = float64(limit) - threshold
  105. } else if s.MaxInFlight()*2 < v.limit {
  106. return
  107. } else {
  108. if queue < threshold {
  109. newLimit = float64(limit) + 6*threshold
  110. } else if queue < 2*threshold {
  111. newLimit = float64(limit) + 3*threshold
  112. } else if queue < 3*threshold {
  113. newLimit = float64(limit) + threshold
  114. } else if queue > 6*threshold {
  115. newLimit = float64(limit) - threshold
  116. } else {
  117. return
  118. }
  119. }
  120. newLimit = math.Max(_minLimit, math.Min(_maxLimit, newLimit))
  121. atomic.StoreInt64(&v.limit, int64(newLimit))
  122. }
  123. }, success
  124. }