sre_breaker.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package breaker
  2. import (
  3. "math"
  4. "math/rand"
  5. "sync/atomic"
  6. "time"
  7. "go-common/library/ecode"
  8. "go-common/library/log"
  9. "go-common/library/stat/summary"
  10. )
  11. // sreBreaker is a sre CircuitBreaker pattern.
  12. type sreBreaker struct {
  13. stat summary.Summary
  14. k float64
  15. request int64
  16. state int32
  17. r *rand.Rand
  18. }
  19. func newSRE(c *Config) Breaker {
  20. return &sreBreaker{
  21. stat: summary.New(time.Duration(c.Window), c.Bucket),
  22. r: rand.New(rand.NewSource(time.Now().UnixNano())),
  23. request: c.Request,
  24. k: c.K,
  25. state: StateClosed,
  26. }
  27. }
  28. func (b *sreBreaker) Allow() error {
  29. success, total := b.stat.Value()
  30. k := b.k * float64(success)
  31. if log.V(5) {
  32. log.Info("breaker: request: %d, succee: %d, fail: %d", total, success, total-success)
  33. }
  34. // check overflow requests = K * success
  35. if total < b.request || float64(total) < k {
  36. if atomic.LoadInt32(&b.state) == StateOpen {
  37. atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed)
  38. }
  39. return nil
  40. }
  41. if atomic.LoadInt32(&b.state) == StateClosed {
  42. atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
  43. }
  44. dr := math.Max(0, (float64(total)-k)/float64(total+1))
  45. rr := b.r.Float64()
  46. if log.V(5) {
  47. log.Info("breaker: drop ratio: %f, real rand: %f, drop: %v", dr, rr, dr > rr)
  48. }
  49. if dr <= rr {
  50. return nil
  51. }
  52. return ecode.ServiceUnavailable
  53. }
  54. func (b *sreBreaker) MarkSuccess() {
  55. b.stat.Add(1)
  56. }
  57. func (b *sreBreaker) MarkFailed() {
  58. // NOTE: when client reject requets locally, continue add counter let the
  59. // drop ratio higher.
  60. b.stat.Add(0)
  61. }