123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- package redis
- import (
- "context"
- "fmt"
- "go-common/library/net/trace"
- )
- const (
- _traceComponentName = "library/cache/redis"
- _tracePeerService = "redis"
- _traceSpanKind = "client"
- )
- var _internalTags = []trace.Tag{
- trace.TagString(trace.TagSpanKind, _traceSpanKind),
- trace.TagString(trace.TagComponent, _traceComponentName),
- trace.TagString(trace.TagPeerService, _tracePeerService),
- }
- type traceConn struct {
- // tr for pipeline, if tr != nil meaning on pipeline
- tr trace.Trace
- ctx context.Context
- // connTag include e.g. ip,port
- connTags []trace.Tag
- // origin redis conn
- Conn
- pending int
- }
- func (t *traceConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
- root, ok := trace.FromContext(t.ctx)
- // NOTE: ignored empty commandName
- // current sdk will Do empty command after pipeline finished
- if !ok || commandName == "" {
- return t.Conn.Do(commandName, args...)
- }
- tr := root.Fork("", "Redis:"+commandName)
- tr.SetTag(_internalTags...)
- tr.SetTag(t.connTags...)
- statement := commandName
- if len(args) > 0 {
- statement += fmt.Sprintf(" %v", args[0])
- }
- tr.SetTag(trace.TagString(trace.TagDBStatement, statement))
- reply, err = t.Conn.Do(commandName, args...)
- tr.Finish(&err)
- return
- }
- func (t *traceConn) Send(commandName string, args ...interface{}) error {
- t.pending++
- root, ok := trace.FromContext(t.ctx)
- if !ok {
- return t.Conn.Send(commandName, args...)
- }
- if t.tr == nil {
- t.tr = root.Fork("", "Redis:Pipeline")
- t.tr.SetTag(_internalTags...)
- t.tr.SetTag(t.connTags...)
- }
- statement := commandName
- if len(args) > 0 {
- statement += fmt.Sprintf(" %v", args[0])
- }
- t.tr.SetLog(
- trace.Log(trace.LogEvent, "Send"),
- trace.Log("db.statement", statement),
- )
- err := t.Conn.Send(commandName, args...)
- if err != nil {
- t.tr.SetTag(trace.TagBool(trace.TagError, true))
- t.tr.SetLog(
- trace.Log(trace.LogEvent, "Send Fail"),
- trace.Log(trace.LogMessage, err.Error()),
- )
- }
- return err
- }
- func (t *traceConn) Flush() error {
- if t.tr == nil {
- return t.Conn.Flush()
- }
- t.tr.SetLog(trace.Log(trace.LogEvent, "Flush"))
- err := t.Conn.Flush()
- if err != nil {
- t.tr.SetTag(trace.TagBool(trace.TagError, true))
- t.tr.SetLog(
- trace.Log(trace.LogEvent, "Flush Fail"),
- trace.Log(trace.LogMessage, err.Error()),
- )
- }
- return err
- }
- func (t *traceConn) Receive() (reply interface{}, err error) {
- if t.tr == nil {
- return t.Conn.Receive()
- }
- t.tr.SetLog(trace.Log(trace.LogEvent, "Receive"))
- reply, err = t.Conn.Receive()
- if err != nil {
- t.tr.SetTag(trace.TagBool(trace.TagError, true))
- t.tr.SetLog(
- trace.Log(trace.LogEvent, "Receive Fail"),
- trace.Log(trace.LogMessage, err.Error()),
- )
- }
- if t.pending > 0 {
- t.pending--
- }
- if t.pending == 0 {
- t.tr.Finish(nil)
- t.tr = nil
- }
- return reply, err
- }
- func (t *traceConn) WithContext(ctx context.Context) Conn {
- t.ctx = ctx
- return t
- }
|