dapper.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package trace
  2. import (
  3. "log"
  4. "os"
  5. "strconv"
  6. "sync"
  7. "time"
  8. )
  9. const _maxLevel = 64
  10. func newTracer(serviceName string, report reporter, cfg *Config) Tracer {
  11. // hard code reset probability at 0.00025, 1/4000
  12. cfg.Probability = 0.00025
  13. sampler := newSampler(cfg.Probability)
  14. // default internal tags
  15. tags := extendTag()
  16. stdlog := log.New(os.Stderr, "trace", log.LstdFlags)
  17. return &dapper{
  18. cfg: cfg,
  19. serviceName: serviceName,
  20. propagators: map[interface{}]propagator{
  21. HTTPFormat: httpPropagator{},
  22. GRPCFormat: grpcPropagator{},
  23. },
  24. reporter: report,
  25. sampler: sampler,
  26. tags: tags,
  27. pool: &sync.Pool{New: func() interface{} { return new(span) }},
  28. stdlog: stdlog,
  29. }
  30. }
  31. type dapper struct {
  32. cfg *Config
  33. serviceName string
  34. tags []Tag
  35. reporter reporter
  36. propagators map[interface{}]propagator
  37. pool *sync.Pool
  38. stdlog *log.Logger
  39. sampler sampler
  40. }
  41. func (d *dapper) New(operationName string, opts ...Option) Trace {
  42. opt := defaultOption
  43. for _, fn := range opts {
  44. fn(&opt)
  45. }
  46. traceID := genID()
  47. var sampled bool
  48. var probability float32
  49. if d.cfg.DisableSample {
  50. sampled = true
  51. probability = 1
  52. } else {
  53. sampled, probability = d.sampler.IsSampled(traceID, operationName)
  54. }
  55. pctx := spanContext{traceID: traceID}
  56. if sampled {
  57. pctx.flags = flagSampled
  58. pctx.probability = probability
  59. }
  60. if opt.Debug {
  61. pctx.flags |= flagDebug
  62. return d.newSpanWithContext(operationName, pctx).SetTag(TagString(TagSpanKind, "server")).SetTag(TagBool("debug", true))
  63. }
  64. // 为了兼容临时为 New 的 Span 设置 span.kind
  65. return d.newSpanWithContext(operationName, pctx).SetTag(TagString(TagSpanKind, "server"))
  66. }
  67. func (d *dapper) newSpanWithContext(operationName string, pctx spanContext) Trace {
  68. sp := d.getSpan()
  69. // is span is not sampled just return a span with this context, no need clear it
  70. //if !pctx.isSampled() {
  71. // sp.context = pctx
  72. // return sp
  73. //}
  74. if pctx.level > _maxLevel {
  75. // if span reach max limit level return noopspan
  76. return noopspan{}
  77. }
  78. level := pctx.level + 1
  79. nctx := spanContext{
  80. traceID: pctx.traceID,
  81. parentID: pctx.spanID,
  82. flags: pctx.flags,
  83. level: level,
  84. }
  85. if pctx.spanID == 0 {
  86. nctx.spanID = pctx.traceID
  87. } else {
  88. nctx.spanID = genID()
  89. }
  90. sp.operationName = operationName
  91. sp.context = nctx
  92. sp.startTime = time.Now()
  93. sp.tags = append(sp.tags, d.tags...)
  94. return sp
  95. }
  96. func (d *dapper) Inject(t Trace, format interface{}, carrier interface{}) error {
  97. // if carrier implement Carrier use direct, ignore format
  98. carr, ok := carrier.(Carrier)
  99. if ok {
  100. t.Visit(carr.Set)
  101. return nil
  102. }
  103. // use Built-in propagators
  104. pp, ok := d.propagators[format]
  105. if !ok {
  106. return ErrUnsupportedFormat
  107. }
  108. carr, err := pp.Inject(carrier)
  109. if err != nil {
  110. return err
  111. }
  112. if t != nil {
  113. t.Visit(carr.Set)
  114. }
  115. return nil
  116. }
  117. func (d *dapper) Extract(format interface{}, carrier interface{}) (Trace, error) {
  118. sp, err := d.extract(format, carrier)
  119. if err != nil {
  120. return sp, err
  121. }
  122. // 为了兼容临时为 New 的 Span 设置 span.kind
  123. return sp.SetTag(TagString(TagSpanKind, "server")), nil
  124. }
  125. func (d *dapper) extract(format interface{}, carrier interface{}) (Trace, error) {
  126. // if carrier implement Carrier use direct, ignore format
  127. carr, ok := carrier.(Carrier)
  128. if !ok {
  129. // use Built-in propagators
  130. pp, ok := d.propagators[format]
  131. if !ok {
  132. return nil, ErrUnsupportedFormat
  133. }
  134. var err error
  135. if carr, err = pp.Extract(carrier); err != nil {
  136. return nil, err
  137. }
  138. }
  139. contextStr := carr.Get(BiliTraceID)
  140. if contextStr == "" {
  141. return d.legacyExtract(carr)
  142. }
  143. pctx, err := contextFromString(contextStr)
  144. if err != nil {
  145. return nil, err
  146. }
  147. // NOTE: call SetTitle after extract trace
  148. return d.newSpanWithContext("", pctx), nil
  149. }
  150. func (d *dapper) legacyExtract(carr Carrier) (Trace, error) {
  151. traceIDstr := carr.Get(KeyTraceID)
  152. if traceIDstr == "" {
  153. return nil, ErrTraceNotFound
  154. }
  155. traceID, err := strconv.ParseUint(traceIDstr, 10, 64)
  156. if err != nil {
  157. return nil, ErrTraceCorrupted
  158. }
  159. sampled, _ := strconv.ParseBool(carr.Get(KeyTraceSampled))
  160. spanID, _ := strconv.ParseUint(carr.Get(KeyTraceSpanID), 10, 64)
  161. parentID, _ := strconv.ParseUint(carr.Get(KeyTraceSpanID), 10, 64)
  162. pctx := spanContext{traceID: traceID, spanID: spanID, parentID: parentID}
  163. if sampled {
  164. pctx.flags = flagSampled
  165. }
  166. return d.newSpanWithContext("", pctx), nil
  167. }
  168. func (d *dapper) Close() error {
  169. return d.reporter.Close()
  170. }
  171. func (d *dapper) report(sp *span) {
  172. if sp.context.isSampled() {
  173. if err := d.reporter.WriteSpan(sp); err != nil {
  174. d.stdlog.Printf("marshal trace span error: %s", err)
  175. }
  176. }
  177. d.putSpan(sp)
  178. }
  179. func (d *dapper) putSpan(sp *span) {
  180. if len(sp.tags) > 32 {
  181. sp.tags = nil
  182. }
  183. if len(sp.logs) > 32 {
  184. sp.logs = nil
  185. }
  186. d.pool.Put(sp)
  187. }
  188. func (d *dapper) getSpan() *span {
  189. sp := d.pool.Get().(*span)
  190. sp.dapper = d
  191. sp.childs = 0
  192. sp.tags = sp.tags[:0]
  193. sp.logs = sp.logs[:0]
  194. return sp
  195. }