breaker.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // Package breaker implements the circuit-breaker resiliency pattern for Go.
  2. package breaker
  3. import (
  4. "errors"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. )
  9. // ErrBreakerOpen is the error returned from Run() when the function is not executed
  10. // because the breaker is currently open.
  11. var ErrBreakerOpen = errors.New("circuit breaker is open")
  12. const (
  13. closed uint32 = iota
  14. open
  15. halfOpen
  16. )
  17. // Breaker implements the circuit-breaker resiliency pattern
  18. type Breaker struct {
  19. errorThreshold, successThreshold int
  20. timeout time.Duration
  21. lock sync.Mutex
  22. state uint32
  23. errors, successes int
  24. lastError time.Time
  25. }
  26. // New constructs a new circuit-breaker that starts closed.
  27. // From closed, the breaker opens if "errorThreshold" errors are seen
  28. // without an error-free period of at least "timeout". From open, the
  29. // breaker half-closes after "timeout". From half-open, the breaker closes
  30. // after "successThreshold" consecutive successes, or opens on a single error.
  31. func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker {
  32. return &Breaker{
  33. errorThreshold: errorThreshold,
  34. successThreshold: successThreshold,
  35. timeout: timeout,
  36. }
  37. }
  38. // Run will either return ErrBreakerOpen immediately if the circuit-breaker is
  39. // already open, or it will run the given function and pass along its return
  40. // value. It is safe to call Run concurrently on the same Breaker.
  41. func (b *Breaker) Run(work func() error) error {
  42. state := atomic.LoadUint32(&b.state)
  43. if state == open {
  44. return ErrBreakerOpen
  45. }
  46. return b.doWork(state, work)
  47. }
  48. // Go will either return ErrBreakerOpen immediately if the circuit-breaker is
  49. // already open, or it will run the given function in a separate goroutine.
  50. // If the function is run, Go will return nil immediately, and will *not* return
  51. // the return value of the function. It is safe to call Go concurrently on the
  52. // same Breaker.
  53. func (b *Breaker) Go(work func() error) error {
  54. state := atomic.LoadUint32(&b.state)
  55. if state == open {
  56. return ErrBreakerOpen
  57. }
  58. // errcheck complains about ignoring the error return value, but
  59. // that's on purpose; if you want an error from a goroutine you have to
  60. // get it over a channel or something
  61. go b.doWork(state, work)
  62. return nil
  63. }
  64. func (b *Breaker) doWork(state uint32, work func() error) error {
  65. var panicValue interface{}
  66. result := func() error {
  67. defer func() {
  68. panicValue = recover()
  69. }()
  70. return work()
  71. }()
  72. if result == nil && panicValue == nil && state == closed {
  73. // short-circuit the normal, success path without contending
  74. // on the lock
  75. return nil
  76. }
  77. // oh well, I guess we have to contend on the lock
  78. b.processResult(result, panicValue)
  79. if panicValue != nil {
  80. // as close as Go lets us come to a "rethrow" although unfortunately
  81. // we lose the original panicing location
  82. panic(panicValue)
  83. }
  84. return result
  85. }
  86. func (b *Breaker) processResult(result error, panicValue interface{}) {
  87. b.lock.Lock()
  88. defer b.lock.Unlock()
  89. if result == nil && panicValue == nil {
  90. if b.state == halfOpen {
  91. b.successes++
  92. if b.successes == b.successThreshold {
  93. b.closeBreaker()
  94. }
  95. }
  96. } else {
  97. if b.errors > 0 {
  98. expiry := b.lastError.Add(b.timeout)
  99. if time.Now().After(expiry) {
  100. b.errors = 0
  101. }
  102. }
  103. switch b.state {
  104. case closed:
  105. b.errors++
  106. if b.errors == b.errorThreshold {
  107. b.openBreaker()
  108. } else {
  109. b.lastError = time.Now()
  110. }
  111. case halfOpen:
  112. b.openBreaker()
  113. }
  114. }
  115. }
  116. func (b *Breaker) openBreaker() {
  117. b.changeState(open)
  118. go b.timer()
  119. }
  120. func (b *Breaker) closeBreaker() {
  121. b.changeState(closed)
  122. }
  123. func (b *Breaker) timer() {
  124. time.Sleep(b.timeout)
  125. b.lock.Lock()
  126. defer b.lock.Unlock()
  127. b.changeState(halfOpen)
  128. }
  129. func (b *Breaker) changeState(newState uint32) {
  130. b.errors = 0
  131. b.successes = 0
  132. atomic.StoreUint32(&b.state, newState)
  133. }