breaker.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package collector
  2. import (
  3. "fmt"
  4. "sync"
  5. "go-common/app/service/main/dapper/model"
  6. )
  7. type countBreaker struct {
  8. rmx sync.RWMutex
  9. n int
  10. slot map[string]struct{}
  11. }
  12. func (c *countBreaker) Break(key string) error {
  13. c.rmx.Lock()
  14. _, ok := c.slot[key]
  15. c.rmx.Unlock()
  16. if ok {
  17. return nil
  18. }
  19. c.rmx.Lock()
  20. c.slot[key] = struct{}{}
  21. l := len(c.slot)
  22. c.rmx.Unlock()
  23. if l <= c.n {
  24. return nil
  25. }
  26. return fmt.Errorf("%s reach limit number %d breaked", key, c.n)
  27. }
  28. func newCountBreaker(n int) *countBreaker {
  29. return &countBreaker{n: n, slot: make(map[string]struct{})}
  30. }
  31. type serviceBreaker struct {
  32. rmx sync.RWMutex
  33. n int
  34. slot map[string]*countBreaker
  35. }
  36. func (s *serviceBreaker) Process(span *model.Span) error {
  37. s.rmx.RLock()
  38. operationNameBreaker, ok1 := s.slot[span.ServiceName+"_o"]
  39. peerServiceBreaker, ok2 := s.slot[span.ServiceName+"_p"]
  40. s.rmx.RUnlock()
  41. if !ok1 || !ok2 {
  42. s.rmx.Lock()
  43. if !ok1 {
  44. operationNameBreaker = newCountBreaker(s.n)
  45. s.slot[span.ServiceName+"_o"] = operationNameBreaker
  46. }
  47. if !ok2 {
  48. peerServiceBreaker = newCountBreaker(s.n)
  49. s.slot[span.ServiceName+"_p"] = peerServiceBreaker
  50. }
  51. s.rmx.Unlock()
  52. }
  53. if err := operationNameBreaker.Break(span.OperationName); err != nil {
  54. return err
  55. }
  56. return peerServiceBreaker.Break(span.StringTag("peer.service"))
  57. }
  58. // NewServiceBreakerProcess .
  59. func NewServiceBreakerProcess(n int) Processer {
  60. return &serviceBreaker{n: n, slot: make(map[string]*countBreaker)}
  61. }