123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- // Package breaker implements the circuit-breaker resiliency pattern for Go.
- package breaker
- import (
- "errors"
- "sync"
- "sync/atomic"
- "time"
- )
- // ErrBreakerOpen is the error returned from Run() when the function is not executed
- // because the breaker is currently open.
- var ErrBreakerOpen = errors.New("circuit breaker is open")
- const (
- closed uint32 = iota
- open
- halfOpen
- )
- // Breaker implements the circuit-breaker resiliency pattern
- type Breaker struct {
- errorThreshold, successThreshold int
- timeout time.Duration
- lock sync.Mutex
- state uint32
- errors, successes int
- lastError time.Time
- }
- // New constructs a new circuit-breaker that starts closed.
- // From closed, the breaker opens if "errorThreshold" errors are seen
- // without an error-free period of at least "timeout". From open, the
- // breaker half-closes after "timeout". From half-open, the breaker closes
- // after "successThreshold" consecutive successes, or opens on a single error.
- func New(errorThreshold, successThreshold int, timeout time.Duration) *Breaker {
- return &Breaker{
- errorThreshold: errorThreshold,
- successThreshold: successThreshold,
- timeout: timeout,
- }
- }
- // Run will either return ErrBreakerOpen immediately if the circuit-breaker is
- // already open, or it will run the given function and pass along its return
- // value. It is safe to call Run concurrently on the same Breaker.
- func (b *Breaker) Run(work func() error) error {
- state := atomic.LoadUint32(&b.state)
- if state == open {
- return ErrBreakerOpen
- }
- return b.doWork(state, work)
- }
- // Go will either return ErrBreakerOpen immediately if the circuit-breaker is
- // already open, or it will run the given function in a separate goroutine.
- // If the function is run, Go will return nil immediately, and will *not* return
- // the return value of the function. It is safe to call Go concurrently on the
- // same Breaker.
- func (b *Breaker) Go(work func() error) error {
- state := atomic.LoadUint32(&b.state)
- if state == open {
- return ErrBreakerOpen
- }
- // errcheck complains about ignoring the error return value, but
- // that's on purpose; if you want an error from a goroutine you have to
- // get it over a channel or something
- go b.doWork(state, work)
- return nil
- }
- func (b *Breaker) doWork(state uint32, work func() error) error {
- var panicValue interface{}
- result := func() error {
- defer func() {
- panicValue = recover()
- }()
- return work()
- }()
- if result == nil && panicValue == nil && state == closed {
- // short-circuit the normal, success path without contending
- // on the lock
- return nil
- }
- // oh well, I guess we have to contend on the lock
- b.processResult(result, panicValue)
- if panicValue != nil {
- // as close as Go lets us come to a "rethrow" although unfortunately
- // we lose the original panicing location
- panic(panicValue)
- }
- return result
- }
- func (b *Breaker) processResult(result error, panicValue interface{}) {
- b.lock.Lock()
- defer b.lock.Unlock()
- if result == nil && panicValue == nil {
- if b.state == halfOpen {
- b.successes++
- if b.successes == b.successThreshold {
- b.closeBreaker()
- }
- }
- } else {
- if b.errors > 0 {
- expiry := b.lastError.Add(b.timeout)
- if time.Now().After(expiry) {
- b.errors = 0
- }
- }
- switch b.state {
- case closed:
- b.errors++
- if b.errors == b.errorThreshold {
- b.openBreaker()
- } else {
- b.lastError = time.Now()
- }
- case halfOpen:
- b.openBreaker()
- }
- }
- }
- func (b *Breaker) openBreaker() {
- b.changeState(open)
- go b.timer()
- }
- func (b *Breaker) closeBreaker() {
- b.changeState(closed)
- }
- func (b *Breaker) timer() {
- time.Sleep(b.timeout)
- b.lock.Lock()
- defer b.lock.Unlock()
- b.changeState(halfOpen)
- }
- func (b *Breaker) changeState(newState uint32) {
- b.errors = 0
- b.successes = 0
- atomic.StoreUint32(&b.state, newState)
- }
|