pointwrite.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package pointwrite
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "go-common/app/service/main/dapper/model"
  9. "go-common/library/log"
  10. )
  11. // WriteFn .
  12. type WriteFn func(ctx context.Context, points []*model.SpanPoint) error
  13. // PointWriter writer span point
  14. type PointWriter interface {
  15. WriteSpan(span *model.Span) error
  16. Close() error
  17. }
  18. // New PointWriter
  19. func New(fn WriteFn, precision int64, timeout time.Duration) PointWriter {
  20. pw := &pointwriter{
  21. precision: precision,
  22. current: make(map[string]*model.SpanPoint),
  23. timeout: timeout,
  24. // TODO make it configurable
  25. tk: time.NewTicker(time.Second * 30),
  26. fn: fn,
  27. }
  28. go pw.start()
  29. return pw
  30. }
  31. type pointwriter struct {
  32. closed bool
  33. rmx sync.RWMutex
  34. precision int64
  35. timeout time.Duration
  36. current map[string]*model.SpanPoint
  37. fn WriteFn
  38. tk *time.Ticker
  39. }
  40. func (p *pointwriter) start() {
  41. for range p.tk.C {
  42. err := p.flush()
  43. if err != nil {
  44. log.Error("flush pointwriter error: %s", err)
  45. }
  46. }
  47. }
  48. func (p *pointwriter) flush() error {
  49. p.rmx.Lock()
  50. current := p.current
  51. p.current = make(map[string]*model.SpanPoint)
  52. p.rmx.Unlock()
  53. points := make([]*model.SpanPoint, 0, len(current))
  54. for _, point := range current {
  55. points = append(points, point)
  56. }
  57. if len(points) == 0 {
  58. return nil
  59. }
  60. ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
  61. defer cancel()
  62. return p.fn(ctx, points)
  63. }
  64. // WriteSpan writespan
  65. func (p *pointwriter) WriteSpan(span *model.Span) error {
  66. if p.closed {
  67. return fmt.Errorf("pointwriter already closed")
  68. }
  69. kind := "client"
  70. if span.IsServer() {
  71. kind = "server"
  72. }
  73. // NOTE: ingored sample ponit if is legacy span, DELETE it futrue
  74. if kind == "client" && !strings.Contains(span.ServiceName, ".") {
  75. return nil
  76. }
  77. peerService, ok := span.Tags["peer.service"].(string)
  78. if !ok {
  79. peerService = "unknown"
  80. }
  81. timestamp := span.StartTime.Unix() - (span.StartTime.Unix() % p.precision)
  82. key := fmt.Sprintf("%d_%s_%s_%s_%s",
  83. timestamp,
  84. span.ServiceName,
  85. span.OperationName,
  86. peerService,
  87. kind,
  88. )
  89. p.rmx.Lock()
  90. defer p.rmx.Unlock()
  91. point, ok := p.current[key]
  92. if !ok {
  93. point = &model.SpanPoint{
  94. Timestamp: timestamp,
  95. ServiceName: span.ServiceName,
  96. OperationName: span.OperationName,
  97. PeerService: peerService,
  98. SpanKind: kind,
  99. AvgDuration: model.SamplePoint{TraceID: span.TraceID, SpanID: span.SpanID, Value: int64(span.Duration)},
  100. }
  101. p.current[key] = point
  102. }
  103. duration := int64(span.Duration)
  104. if duration > point.MaxDuration.Value {
  105. point.MaxDuration.TraceID = span.TraceID
  106. point.MaxDuration.SpanID = span.SpanID
  107. point.MaxDuration.Value = duration
  108. }
  109. if point.MinDuration.Value == 0 || duration < point.MinDuration.Value {
  110. point.MinDuration.TraceID = span.TraceID
  111. point.MinDuration.SpanID = span.SpanID
  112. point.MinDuration.Value = duration
  113. }
  114. if span.IsError() {
  115. point.Errors = append(point.Errors, model.SamplePoint{
  116. TraceID: span.TraceID,
  117. SpanID: span.SpanID,
  118. Value: duration,
  119. })
  120. }
  121. return nil
  122. }
  123. // Close pointwriter
  124. func (p *pointwriter) Close() error {
  125. p.closed = true
  126. p.tk.Stop()
  127. return p.flush()
  128. }