limit.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package limit
  2. import (
  3. "context"
  4. "time"
  5. "go-common/library/container/queue/aqm"
  6. "go-common/library/log"
  7. "go-common/library/rate"
  8. "go-common/library/rate/vegas"
  9. )
  10. var _ rate.Limiter = &Limiter{}
  11. // New returns a new Limiter that allows events up to adaptive rtt.
  12. func New(c *aqm.Config) *Limiter {
  13. l := &Limiter{
  14. rate: vegas.New(),
  15. queue: aqm.New(c),
  16. }
  17. go func() {
  18. ticker := time.NewTicker(time.Second * 1)
  19. defer ticker.Stop()
  20. for {
  21. <-ticker.C
  22. v := l.rate.Stat()
  23. q := l.queue.Stat()
  24. log.Info("rate/limit: limit(%d) inFlight(%d) minRtt(%v) rtt(%v) codel packets(%d)", v.Limit, v.InFlight, v.MinRTT, v.LastRTT, q.Packets)
  25. }
  26. }()
  27. return l
  28. }
  29. // Limiter use tcp vegas + codel for adaptive limit.
  30. type Limiter struct {
  31. rate *vegas.Vegas
  32. queue *aqm.Queue
  33. }
  34. // Allow immplemnet rate.Limiter.
  35. // if error is returned,no need to call done()
  36. func (l *Limiter) Allow(ctx context.Context) (func(rate.Op), error) {
  37. var (
  38. done func(time.Time, rate.Op)
  39. err error
  40. ok bool
  41. )
  42. if done, ok = l.rate.Acquire(); !ok {
  43. // NOTE exceed max inflight, use queue
  44. if err = l.queue.Push(ctx); err != nil {
  45. done(time.Time{}, rate.Ignore)
  46. return func(rate.Op) {}, err
  47. }
  48. }
  49. start := time.Now()
  50. return func(op rate.Op) {
  51. done(start, op)
  52. l.queue.Pop()
  53. }, nil
  54. }