123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- package pointwrite
- import (
- "context"
- "fmt"
- "strings"
- "sync"
- "time"
- "go-common/app/service/main/dapper/model"
- "go-common/library/log"
- )
- // WriteFn .
- type WriteFn func(ctx context.Context, points []*model.SpanPoint) error
- // PointWriter writer span point
- type PointWriter interface {
- WriteSpan(span *model.Span) error
- Close() error
- }
- // New PointWriter
- func New(fn WriteFn, precision int64, timeout time.Duration) PointWriter {
- pw := &pointwriter{
- precision: precision,
- current: make(map[string]*model.SpanPoint),
- timeout: timeout,
- // TODO make it configurable
- tk: time.NewTicker(time.Second * 30),
- fn: fn,
- }
- go pw.start()
- return pw
- }
- type pointwriter struct {
- closed bool
- rmx sync.RWMutex
- precision int64
- timeout time.Duration
- current map[string]*model.SpanPoint
- fn WriteFn
- tk *time.Ticker
- }
- func (p *pointwriter) start() {
- for range p.tk.C {
- err := p.flush()
- if err != nil {
- log.Error("flush pointwriter error: %s", err)
- }
- }
- }
- func (p *pointwriter) flush() error {
- p.rmx.Lock()
- current := p.current
- p.current = make(map[string]*model.SpanPoint)
- p.rmx.Unlock()
- points := make([]*model.SpanPoint, 0, len(current))
- for _, point := range current {
- points = append(points, point)
- }
- if len(points) == 0 {
- return nil
- }
- ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
- defer cancel()
- return p.fn(ctx, points)
- }
- // WriteSpan writespan
- func (p *pointwriter) WriteSpan(span *model.Span) error {
- if p.closed {
- return fmt.Errorf("pointwriter already closed")
- }
- kind := "client"
- if span.IsServer() {
- kind = "server"
- }
- // NOTE: ingored sample ponit if is legacy span, DELETE it futrue
- if kind == "client" && !strings.Contains(span.ServiceName, ".") {
- return nil
- }
- peerService, ok := span.Tags["peer.service"].(string)
- if !ok {
- peerService = "unknown"
- }
- timestamp := span.StartTime.Unix() - (span.StartTime.Unix() % p.precision)
- key := fmt.Sprintf("%d_%s_%s_%s_%s",
- timestamp,
- span.ServiceName,
- span.OperationName,
- peerService,
- kind,
- )
- p.rmx.Lock()
- defer p.rmx.Unlock()
- point, ok := p.current[key]
- if !ok {
- point = &model.SpanPoint{
- Timestamp: timestamp,
- ServiceName: span.ServiceName,
- OperationName: span.OperationName,
- PeerService: peerService,
- SpanKind: kind,
- AvgDuration: model.SamplePoint{TraceID: span.TraceID, SpanID: span.SpanID, Value: int64(span.Duration)},
- }
- p.current[key] = point
- }
- duration := int64(span.Duration)
- if duration > point.MaxDuration.Value {
- point.MaxDuration.TraceID = span.TraceID
- point.MaxDuration.SpanID = span.SpanID
- point.MaxDuration.Value = duration
- }
- if point.MinDuration.Value == 0 || duration < point.MinDuration.Value {
- point.MinDuration.TraceID = span.TraceID
- point.MinDuration.SpanID = span.SpanID
- point.MinDuration.Value = duration
- }
- if span.IsError() {
- point.Errors = append(point.Errors, model.SamplePoint{
- TraceID: span.TraceID,
- SpanID: span.SpanID,
- Value: duration,
- })
- }
- return nil
- }
- // Close pointwriter
- func (p *pointwriter) Close() error {
- p.closed = true
- p.tk.Stop()
- return p.flush()
- }
|