trace.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package redis
  2. import (
  3. "context"
  4. "fmt"
  5. "go-common/library/net/trace"
  6. )
  7. const (
  8. _traceComponentName = "library/cache/redis"
  9. _tracePeerService = "redis"
  10. _traceSpanKind = "client"
  11. )
  12. var _internalTags = []trace.Tag{
  13. trace.TagString(trace.TagSpanKind, _traceSpanKind),
  14. trace.TagString(trace.TagComponent, _traceComponentName),
  15. trace.TagString(trace.TagPeerService, _tracePeerService),
  16. }
  17. type traceConn struct {
  18. // tr for pipeline, if tr != nil meaning on pipeline
  19. tr trace.Trace
  20. ctx context.Context
  21. // connTag include e.g. ip,port
  22. connTags []trace.Tag
  23. // origin redis conn
  24. Conn
  25. pending int
  26. }
  27. func (t *traceConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
  28. root, ok := trace.FromContext(t.ctx)
  29. // NOTE: ignored empty commandName
  30. // current sdk will Do empty command after pipeline finished
  31. if !ok || commandName == "" {
  32. return t.Conn.Do(commandName, args...)
  33. }
  34. tr := root.Fork("", "Redis:"+commandName)
  35. tr.SetTag(_internalTags...)
  36. tr.SetTag(t.connTags...)
  37. statement := commandName
  38. if len(args) > 0 {
  39. statement += fmt.Sprintf(" %v", args[0])
  40. }
  41. tr.SetTag(trace.TagString(trace.TagDBStatement, statement))
  42. reply, err = t.Conn.Do(commandName, args...)
  43. tr.Finish(&err)
  44. return
  45. }
  46. func (t *traceConn) Send(commandName string, args ...interface{}) error {
  47. t.pending++
  48. root, ok := trace.FromContext(t.ctx)
  49. if !ok {
  50. return t.Conn.Send(commandName, args...)
  51. }
  52. if t.tr == nil {
  53. t.tr = root.Fork("", "Redis:Pipeline")
  54. t.tr.SetTag(_internalTags...)
  55. t.tr.SetTag(t.connTags...)
  56. }
  57. statement := commandName
  58. if len(args) > 0 {
  59. statement += fmt.Sprintf(" %v", args[0])
  60. }
  61. t.tr.SetLog(
  62. trace.Log(trace.LogEvent, "Send"),
  63. trace.Log("db.statement", statement),
  64. )
  65. err := t.Conn.Send(commandName, args...)
  66. if err != nil {
  67. t.tr.SetTag(trace.TagBool(trace.TagError, true))
  68. t.tr.SetLog(
  69. trace.Log(trace.LogEvent, "Send Fail"),
  70. trace.Log(trace.LogMessage, err.Error()),
  71. )
  72. }
  73. return err
  74. }
  75. func (t *traceConn) Flush() error {
  76. if t.tr == nil {
  77. return t.Conn.Flush()
  78. }
  79. t.tr.SetLog(trace.Log(trace.LogEvent, "Flush"))
  80. err := t.Conn.Flush()
  81. if err != nil {
  82. t.tr.SetTag(trace.TagBool(trace.TagError, true))
  83. t.tr.SetLog(
  84. trace.Log(trace.LogEvent, "Flush Fail"),
  85. trace.Log(trace.LogMessage, err.Error()),
  86. )
  87. }
  88. return err
  89. }
  90. func (t *traceConn) Receive() (reply interface{}, err error) {
  91. if t.tr == nil {
  92. return t.Conn.Receive()
  93. }
  94. t.tr.SetLog(trace.Log(trace.LogEvent, "Receive"))
  95. reply, err = t.Conn.Receive()
  96. if err != nil {
  97. t.tr.SetTag(trace.TagBool(trace.TagError, true))
  98. t.tr.SetLog(
  99. trace.Log(trace.LogEvent, "Receive Fail"),
  100. trace.Log(trace.LogMessage, err.Error()),
  101. )
  102. }
  103. if t.pending > 0 {
  104. t.pending--
  105. }
  106. if t.pending == 0 {
  107. t.tr.Finish(nil)
  108. t.tr = nil
  109. }
  110. return reply, err
  111. }
  112. func (t *traceConn) WithContext(ctx context.Context) Conn {
  113. t.ctx = ctx
  114. return t
  115. }