123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- package vegas
- import (
- "math"
- "math/rand"
- "sync"
- "sync/atomic"
- "time"
- "go-common/library/rate"
- )
- const (
- _minWindowTime = int64(time.Millisecond * 500)
- _maxWindowTime = int64(time.Millisecond * 2000)
- _minLimit = 8
- _maxLimit = 2048
- )
- // Stat is the Statistics of vegas.
- type Stat struct {
- Limit int64
- InFlight int64
- MinRTT time.Duration
- LastRTT time.Duration
- }
- // New new a rate vegas.
- func New() *Vegas {
- v := &Vegas{
- probes: 100,
- limit: _minLimit,
- }
- v.sample.Store(&sample{})
- return v
- }
- // Vegas tcp vegas.
- type Vegas struct {
- limit int64
- inFlight int64
- updateTime int64
- minRTT int64
- sample atomic.Value
- mu sync.Mutex
- probes int64
- }
- // Stat return the statistics of vegas.
- func (v *Vegas) Stat() Stat {
- return Stat{
- Limit: atomic.LoadInt64(&v.limit),
- InFlight: atomic.LoadInt64(&v.inFlight),
- MinRTT: time.Duration(atomic.LoadInt64(&v.minRTT)),
- LastRTT: time.Duration(v.sample.Load().(*sample).RTT()),
- }
- }
- // Acquire No matter success or not,done() must be called at last.
- func (v *Vegas) Acquire() (done func(time.Time, rate.Op), success bool) {
- inFlight := atomic.AddInt64(&v.inFlight, 1)
- if inFlight <= atomic.LoadInt64(&v.limit) {
- success = true
- }
- return func(start time.Time, op rate.Op) {
- atomic.AddInt64(&v.inFlight, -1)
- if op == rate.Ignore {
- return
- }
- end := time.Now().UnixNano()
- rtt := end - start.UnixNano()
- s := v.sample.Load().(*sample)
- if op == rate.Drop {
- s.Add(rtt, inFlight, true)
- } else if op == rate.Success {
- s.Add(rtt, inFlight, false)
- }
- if end > atomic.LoadInt64(&v.updateTime) && s.Count() >= 16 {
- v.mu.Lock()
- defer v.mu.Unlock()
- if v.sample.Load().(*sample) != s {
- return
- }
- v.sample.Store(&sample{})
- lastRTT := s.RTT()
- if lastRTT <= 0 {
- return
- }
- updateTime := end + lastRTT*5
- if lastRTT*5 < _minWindowTime {
- updateTime = end + _minWindowTime
- } else if lastRTT*5 > _maxWindowTime {
- updateTime = end + _maxWindowTime
- }
- atomic.StoreInt64(&v.updateTime, updateTime)
- limit := atomic.LoadInt64(&v.limit)
- queue := float64(limit) * (1 - float64(v.minRTT)/float64(lastRTT))
- v.probes--
- if v.probes <= 0 {
- maxFlight := s.MaxInFlight()
- if maxFlight*2 < v.limit || maxFlight <= _minLimit {
- v.probes = 3*limit + rand.Int63n(3*limit)
- v.minRTT = lastRTT
- }
- }
- if v.minRTT == 0 || lastRTT < v.minRTT {
- v.minRTT = lastRTT
- }
- var newLimit float64
- threshold := math.Sqrt(float64(limit)) / 2
- if s.Drop() {
- newLimit = float64(limit) - threshold
- } else if s.MaxInFlight()*2 < v.limit {
- return
- } else {
- if queue < threshold {
- newLimit = float64(limit) + 6*threshold
- } else if queue < 2*threshold {
- newLimit = float64(limit) + 3*threshold
- } else if queue < 3*threshold {
- newLimit = float64(limit) + threshold
- } else if queue > 6*threshold {
- newLimit = float64(limit) - threshold
- } else {
- return
- }
- }
- newLimit = math.Max(_minLimit, math.Min(_maxLimit, newLimit))
- atomic.StoreInt64(&v.limit, int64(newLimit))
- }
- }, success
- }
|