123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- package limit
- import (
- "context"
- "time"
- "go-common/library/container/queue/aqm"
- "go-common/library/log"
- "go-common/library/rate"
- "go-common/library/rate/vegas"
- )
- var _ rate.Limiter = &Limiter{}
- // New returns a new Limiter that allows events up to adaptive rtt.
- func New(c *aqm.Config) *Limiter {
- l := &Limiter{
- rate: vegas.New(),
- queue: aqm.New(c),
- }
- go func() {
- ticker := time.NewTicker(time.Second * 1)
- defer ticker.Stop()
- for {
- <-ticker.C
- v := l.rate.Stat()
- q := l.queue.Stat()
- log.Info("rate/limit: limit(%d) inFlight(%d) minRtt(%v) rtt(%v) codel packets(%d)", v.Limit, v.InFlight, v.MinRTT, v.LastRTT, q.Packets)
- }
- }()
- return l
- }
- // Limiter use tcp vegas + codel for adaptive limit.
- type Limiter struct {
- rate *vegas.Vegas
- queue *aqm.Queue
- }
- // Allow immplemnet rate.Limiter.
- // if error is returned,no need to call done()
- func (l *Limiter) Allow(ctx context.Context) (func(rate.Op), error) {
- var (
- done func(time.Time, rate.Op)
- err error
- ok bool
- )
- if done, ok = l.rate.Acquire(); !ok {
- // NOTE exceed max inflight, use queue
- if err = l.queue.Push(ctx); err != nil {
- done(time.Time{}, rate.Ignore)
- return func(rate.Op) {}, err
- }
- }
- start := time.Now()
- return func(op rate.Op) {
- done(start, op)
- l.queue.Pop()
- }, nil
- }
|