codel.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package aqm
  2. import (
  3. "context"
  4. "math"
  5. "sync"
  6. "time"
  7. "go-common/library/ecode"
  8. )
  9. // Config codel config.
  10. type Config struct {
  11. Target int64 // target queue delay (default 20 ms).
  12. Internal int64 // sliding minimum time window width (default 500 ms)
  13. }
  14. // Stat is the Statistics of codel.
  15. type Stat struct {
  16. Dropping bool
  17. FaTime int64
  18. DropNext int64
  19. Packets int
  20. }
  21. type packet struct {
  22. ch chan bool
  23. ts int64
  24. }
  25. var defaultConf = &Config{
  26. Target: 50,
  27. Internal: 500,
  28. }
  29. // Queue queue is CoDel req buffer queue.
  30. type Queue struct {
  31. pool sync.Pool
  32. packets chan packet
  33. mux sync.RWMutex
  34. conf *Config
  35. count int64
  36. dropping bool // Equal to 1 if in drop state
  37. faTime int64 // Time when we'll declare we're above target (0 if below)
  38. dropNext int64 // Packets dropped since going into drop state
  39. }
  40. // Default new a default codel queue.
  41. func Default() *Queue {
  42. return New(defaultConf)
  43. }
  44. // New new codel queue.
  45. func New(conf *Config) *Queue {
  46. if conf == nil {
  47. conf = defaultConf
  48. }
  49. q := &Queue{
  50. packets: make(chan packet, 2048),
  51. conf: conf,
  52. }
  53. q.pool.New = func() interface{} {
  54. return make(chan bool)
  55. }
  56. return q
  57. }
  58. // Reload set queue config.
  59. func (q *Queue) Reload(c *Config) {
  60. if c == nil || c.Internal <= 0 || c.Target <= 0 {
  61. return
  62. }
  63. // TODO codel queue size
  64. q.mux.Lock()
  65. q.conf = c
  66. q.mux.Unlock()
  67. }
  68. // Stat return the statistics of codel
  69. func (q *Queue) Stat() Stat {
  70. q.mux.Lock()
  71. defer q.mux.Unlock()
  72. return Stat{
  73. Dropping: q.dropping,
  74. FaTime: q.faTime,
  75. DropNext: q.dropNext,
  76. Packets: len(q.packets),
  77. }
  78. }
  79. // Push req into CoDel request buffer queue.
  80. // if return error is nil,the caller must call q.Done() after finish request handling
  81. func (q *Queue) Push(ctx context.Context) (err error) {
  82. r := packet{
  83. ch: q.pool.Get().(chan bool),
  84. ts: time.Now().UnixNano() / int64(time.Millisecond),
  85. }
  86. select {
  87. case q.packets <- r:
  88. default:
  89. err = ecode.LimitExceed
  90. q.pool.Put(r.ch)
  91. }
  92. if err == nil {
  93. select {
  94. case drop := <-r.ch:
  95. if drop {
  96. err = ecode.LimitExceed
  97. }
  98. q.pool.Put(r.ch)
  99. case <-ctx.Done():
  100. err = ecode.Deadline
  101. }
  102. }
  103. return
  104. }
  105. // Pop req from CoDel request buffer queue.
  106. func (q *Queue) Pop() {
  107. for {
  108. select {
  109. case p := <-q.packets:
  110. drop := q.judge(p)
  111. select {
  112. case p.ch <- drop:
  113. if !drop {
  114. return
  115. }
  116. default:
  117. q.pool.Put(p.ch)
  118. }
  119. default:
  120. return
  121. }
  122. }
  123. }
  124. func (q *Queue) controlLaw(now int64) int64 {
  125. q.dropNext = now + int64(float64(q.conf.Internal)/math.Sqrt(float64(q.count)))
  126. return q.dropNext
  127. }
  128. // judge decide if the packet should drop or not.
  129. func (q *Queue) judge(p packet) (drop bool) {
  130. now := time.Now().UnixNano() / int64(time.Millisecond)
  131. sojurn := now - p.ts
  132. q.mux.Lock()
  133. defer q.mux.Unlock()
  134. if sojurn < q.conf.Target {
  135. q.faTime = 0
  136. } else if q.faTime == 0 {
  137. q.faTime = now + q.conf.Internal
  138. } else if now >= q.faTime {
  139. drop = true
  140. }
  141. if q.dropping {
  142. if !drop {
  143. // sojourn time below target - leave dropping state
  144. q.dropping = false
  145. } else if now > q.dropNext {
  146. q.count++
  147. q.dropNext = q.controlLaw(q.dropNext)
  148. drop = true
  149. return
  150. }
  151. } else if drop && (now-q.dropNext < q.conf.Internal || now-q.faTime >= q.conf.Internal) {
  152. q.dropping = true
  153. // If we're in a drop cycle, the drop rate that controlled the queue
  154. // on the last cycle is a good starting point to control it now.
  155. if now-q.dropNext < q.conf.Internal {
  156. if q.count > 2 {
  157. q.count = q.count - 2
  158. } else {
  159. q.count = 1
  160. }
  161. } else {
  162. q.count = 1
  163. }
  164. q.dropNext = q.controlLaw(now)
  165. drop = true
  166. return
  167. }
  168. return
  169. }