package dao import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "math" "sort" "strconv" "strings" influxdb "github.com/influxdata/influxdb/client/v2" "github.com/tsuna/gohbase" "github.com/tsuna/gohbase/filter" "github.com/tsuna/gohbase/hrpc" "go-common/app/service/main/dapper-query/conf" "go-common/app/service/main/dapper-query/model" "go-common/library/log" ) // Order const ( TimeDesc = "time:desc" TimeAsc = "time:asc" DurationDesc = "duration:desc" DurationAsc = "duration:asc" ) // ErrNotFound data not found var ErrNotFound = errors.New("not found") // Selector to selector span type Selector struct { Start int64 End int64 Limit int Offset int OnlyError bool } // table name const ( DefaultHbaseNameSpace = "ugc" DefaultInfluxDatabase = "dapper" HbaseRawTraceTable = "DapperRawtrace" HbaseRawTraceFamily = "pb" HbaseListIdxTable = "DapperListidx" HbaseListIdxFamily = "kind" ServiceNameTag = "service_name" OperationNameTag = "operation_name" PeerServiceTag = "peer.service" SpanKindTag = "span.kind" MaxDurationField = "max_duration" MinDurationField = "min_duration" AvgDurationField = "avg_duration" SpanpointMeasurement = "span_point" ErrorsField = "errors" ) // Dao dapper dao type Dao interface { // list all ServiceNames ServiceNames(ctx context.Context) ([]string, error) // list OperationName for specifically service OperationNames(ctx context.Context, serviceName string) ([]string, error) // QuerySpan by family title and sel QuerySpanList(ctx context.Context, serviceName, operationName string, sel *Selector, order string) ([]model.SpanListRef, error) // Trace get trace by trace id span sort by start_time Trace(ctx context.Context, traceID uint64, spanIDs ...uint64) ([]*model.Span, error) // PeerService query all peer service depend by service PeerService(ctx context.Context, serviceName string) ([]string, error) // Ping pong Ping(ctx context.Context) error // Close dao Close(ctx context.Context) error // SupportOrder checkou order support SupportOrder(order string) bool // MeanOperationNameField MeanOperationNameField(ctx context.Context, whereMap map[string]string, field string, start, end int64, groupby []string) ([]model.MeanOperationNameValue, error) // SpanSeriesMean SpanSeriesMean(ctx context.Context, serviceName, operationName string, fields []string, start, end, interval int64) (*model.Series, error) // SpanSeriesCount SpanSeriesCount(ctx context.Context, serviceName, operationName string, fields []string, start, end, interval int64) (*model.Series, error) } type dao struct { hbaseNameSpace string hbaseClient gohbase.Client influxDatabase string influxdbClient influxdb.Client } func (d *dao) Ping(ctx context.Context) error { return nil } func (d *dao) Close(ctx context.Context) error { d.hbaseClient.Close() return d.influxdbClient.Close() } // New dao func New(cfg *conf.Config) (Dao, error) { // disable rpc queue hbaseClient := gohbase.NewClient(cfg.HBase.Addrs, gohbase.RpcQueueSize(0)) hbaseNameSpace := DefaultHbaseNameSpace if cfg.HBase.Namespace != "" { hbaseNameSpace = cfg.HBase.Namespace } influxdbCfg := influxdb.HTTPConfig{Addr: cfg.InfluxDB.Addr, Username: cfg.InfluxDB.Username, Password: cfg.InfluxDB.Password} influxdbClient, err := influxdb.NewHTTPClient(influxdbCfg) if err != nil { return nil, err } influxDatabase := DefaultInfluxDatabase if cfg.InfluxDB.Database != "" { influxDatabase = cfg.InfluxDB.Database } return &dao{ hbaseNameSpace: hbaseNameSpace, hbaseClient: hbaseClient, influxDatabase: influxDatabase, influxdbClient: influxdbClient, }, nil } func (d *dao) ServiceNames(ctx context.Context) ([]string, error) { where := fmt.Sprintf("%s = '%s'", SpanKindTag, "server") return d.showTagValues(ctx, ServiceNameTag, where) } func (d *dao) showTagValues(ctx context.Context, tag, where string) ([]string, error) { command := fmt.Sprintf(`SHOW TAG VALUES FROM "%s" WITH KEY = "%s" WHERE %s`, SpanpointMeasurement, tag, where) log.V(10).Info("query command %s", command) query := influxdb.NewQuery(command, d.influxDatabase, "1s") resp, err := d.influxdbClient.Query(query) if err != nil { return nil, err } if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 { return make([]string, 0), nil } rows := resp.Results[0].Series[0] values := make([]string, 0, len(rows.Values)) for _, kv := range rows.Values { if len(kv) != 2 { continue } if value, ok := kv[1].(string); ok { values = append(values, value) } } return values, nil } func (d *dao) OperationNames(ctx context.Context, serviceName string) ([]string, error) { where := fmt.Sprintf("%s = '%s' AND %s = '%s'", ServiceNameTag, serviceName, SpanKindTag, "server") return d.showTagValues(ctx, OperationNameTag, where) } func (d *dao) QuerySpanList(ctx context.Context, serviceName string, operationName string, sel *Selector, order string) ([]model.SpanListRef, error) { log.V(10).Info("query span list serviceName: %s, operationName: %s, sel: %+v order: %s", serviceName, operationName, sel, order) prefix := keyPrefix(serviceName, operationName) startKey, stopKey := rangeKey(prefix, sel.Start, sel.End) switch order { case TimeAsc, TimeDesc: return d.querySpanListTimeOrder(ctx, startKey, stopKey, prefix, sel.Limit, sel.Offset, order == TimeDesc, sel.OnlyError) case DurationDesc, DurationAsc: return d.querySpanListDurationOrder(ctx, startKey, stopKey, prefix, sel.Limit, sel.Offset, order == DurationDesc, sel.OnlyError) } return nil, fmt.Errorf("unsupport order") } func parseSpanListRef(cell *hrpc.Cell) (spanListRef model.SpanListRef, err error) { value := cell.Value ref := bytes.SplitN(value, []byte(":"), 2) if len(ref) != 2 { err = fmt.Errorf("invalid ref %s", value) return } if spanListRef.TraceID, err = strconv.ParseUint(string(ref[0]), 16, 64); err != nil { return } spanListRef.SpanID, err = strconv.ParseUint(string(ref[1]), 16, 64) if err != nil { return } kd := bytes.SplitN(cell.Qualifier, []byte(":"), 2) if len(kd) != 2 { err = fmt.Errorf("invalid qualifier %s", cell.Qualifier) return } if bytes.Equal(kd[0], []byte("e")) { spanListRef.IsError = true } spanListRef.Duration, err = strconv.ParseInt(string(kd[1]), 10, 64) return } type minHeapifyListRef []model.SpanListRef func (m minHeapifyListRef) push(listRef model.SpanListRef) { if m[0].Duration > listRef.Duration { return } m[0] = listRef m.minHeapify(0) } func (m minHeapifyListRef) minHeapify(i int) { var lowest int left := (i+1)*2 - 1 right := (i + 1) * 2 if left < len(m) && m[left].Duration < m[i].Duration { lowest = left } else { lowest = i } if right < len(m) && m[right].Duration < m[lowest].Duration { lowest = right } if lowest != i { m[i], m[lowest] = m[lowest], m[i] m.minHeapify(lowest) } } func (m minHeapifyListRef) Len() int { return len(m) } func (m minHeapifyListRef) Less(i, j int) bool { return m[i].Duration > m[j].Duration } func (m minHeapifyListRef) Swap(i, j int) { m[i], m[j] = m[j], m[i] } func (d *dao) querySpanListDurationOrder(ctx context.Context, startKey, stopKey, prefix string, limit, offset int, reverse bool, onlyError bool) ([]model.SpanListRef, error) { var options []func(hrpc.Call) error options = append(options, hrpc.Filters(filter.NewPrefixFilter([]byte(prefix)))) if reverse { startKey, stopKey = stopKey, startKey options = append(options, hrpc.Reversed()) } table := d.hbaseNameSpace + ":" + HbaseListIdxTable scan, err := hrpc.NewScanRangeStr(ctx, table, startKey, stopKey, options...) if err != nil { return nil, err } scanner := d.hbaseClient.Scan(scan) defer scanner.Close() spanListRefs := make(minHeapifyListRef, limit+offset) for { result, err := scanner.Next() if err != nil { if err != io.EOF { return nil, err } break } if len(result.Cells) > 0 { log.V(10).Info("scan rowkey %s", result.Cells[0].Row) } for _, cell := range result.Cells { if string(cell.Family) != HbaseListIdxFamily { continue } spanListRef, err := parseSpanListRef(cell) if err != nil { // ignored error? return nil, err } if onlyError && !spanListRef.IsError { continue } if !reverse { spanListRef.Duration = math.MaxInt64 - spanListRef.Duration } spanListRefs.push(spanListRef) } } sort.Sort(spanListRefs) for i := range spanListRefs[offset:] { if spanListRefs[offset+i].TraceID == 0 { spanListRefs = spanListRefs[:offset+i] break } } return spanListRefs[offset:], nil } func (d *dao) querySpanListTimeOrder(ctx context.Context, startKey, stopKey, prefix string, limit, offset int, reverse bool, onlyError bool) ([]model.SpanListRef, error) { var options []func(hrpc.Call) error options = append(options, hrpc.Filters(filter.NewPrefixFilter([]byte(prefix)))) if reverse { startKey, stopKey = stopKey, startKey options = append(options, hrpc.Reversed()) } table := d.hbaseNameSpace + ":" + HbaseListIdxTable scan, err := hrpc.NewScanRangeStr(ctx, table, startKey, stopKey, options...) if err != nil { return nil, err } scanner := d.hbaseClient.Scan(scan) defer scanner.Close() spanListRefs := make([]model.SpanListRef, 0, limit) for { result, err := scanner.Next() if err != nil { if err != io.EOF { return nil, err } break } if len(result.Cells) > 0 { log.V(10).Info("scan rowkey %s", result.Cells[0].Row) } for _, cell := range result.Cells { if string(cell.Family) != HbaseListIdxFamily { continue } spanListRef, err := parseSpanListRef(cell) if err != nil { // ignored error? return nil, err } if onlyError && !spanListRef.IsError { continue } if offset > 0 { offset-- continue } if limit <= 0 { break } if err != nil { // ignored error? return nil, err } spanListRefs = append(spanListRefs, spanListRef) limit-- } } return spanListRefs, nil } func (d *dao) Trace(ctx context.Context, traceID uint64, spanIDs ...uint64) ([]*model.Span, error) { table := d.hbaseNameSpace + ":" + HbaseRawTraceTable traceIDStr := strconv.FormatUint(traceID, 16) var options []func(hrpc.Call) error if len(spanIDs) != 0 { filters := make([]filter.Filter, 0, len(spanIDs)) for _, spanID := range spanIDs { spanIDStr := strconv.FormatUint(spanID, 16) filters = append(filters, filter.NewColumnPrefixFilter([]byte(spanIDStr))) } options = append(options, hrpc.Filters(filter.NewList(filter.MustPassOne, filters...))) } get, err := hrpc.NewGetStr(ctx, table, traceIDStr, options...) if err != nil { return nil, err } result, err := d.hbaseClient.Get(get) if err != nil { return nil, err } spans := make([]*model.Span, 0, len(result.Cells)) for _, cell := range result.Cells { if string(cell.Family) != HbaseRawTraceFamily { continue } span, err := model.FromProtoSpan(cell.Value) if err != nil { // TODO: ignore error? log.Error("unmarshal protobuf span data rowkey: %x, cf: %s:%s error: %s", traceID, cell.Family, cell.Qualifier, err) continue } spans = append(spans, span) } sort.Slice(spans, func(i, j int) bool { return spans[i].StartTime.UnixNano() > spans[j].StartTime.UnixNano() }) return spans, nil } func (d *dao) SupportOrder(order string) bool { switch order { case TimeAsc, TimeDesc, DurationAsc, DurationDesc: return true } return false } func (d *dao) MeanOperationNameField(ctx context.Context, whereMap map[string]string, field string, start, end int64, orderby []string) ([]model.MeanOperationNameValue, error) { var wheres []string for k, v := range whereMap { wheres = append(wheres, fmt.Sprintf(`%s='%s'`, k, v)) } command := fmt.Sprintf(`SELECT mean("%s") AS mean_%s FROM "%s" WHERE %s AND time > %ds AND time < %ds GROUP BY %s`, field, field, SpanpointMeasurement, strings.Join(wheres, " AND "), start, end, strings.Join(orderby, ", "), ) log.V(10).Info("query command %s", command) query := influxdb.NewQuery(command, d.influxDatabase, "1s") resp, err := d.influxdbClient.Query(query) if err != nil { return nil, err } if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 { return make([]model.MeanOperationNameValue, 0), nil } values := make([]model.MeanOperationNameValue, 0, len(resp.Results[0].Series)) for _, row := range resp.Results[0].Series { value := model.MeanOperationNameValue{Tag: row.Tags} if len(row.Values) == 0 || len(row.Values[0]) < 2 { continue } valStr, ok := row.Values[0][1].(json.Number) if !ok { continue } // 相信 ifluxdb 不会瞎返回的 value.Value, _ = valStr.Float64() values = append(values, value) } return values, nil } func (d *dao) SpanSeriesMean(ctx context.Context, serviceName, operationName string, fields []string, start, end, interval int64) (*model.Series, error) { return d.spanSeries(ctx, serviceName, operationName, "mean", fields, start, end, interval) } func (d *dao) SpanSeriesCount(ctx context.Context, serviceName, operationName string, fields []string, start, end, interval int64) (*model.Series, error) { return d.spanSeries(ctx, serviceName, operationName, "count", fields, start, end, interval) } func (d *dao) spanSeries(ctx context.Context, serviceName, operationName, fn string, fields []string, start, end, interval int64) (*model.Series, error) { var selects []string for _, field := range fields { selects = append(selects, fmt.Sprintf(`%s("%s") AS "%s_%s"`, fn, field, fn, field)) } command := fmt.Sprintf(`SELECT %s FROM %s WHERE "%s"='%s' AND "%s"='%s' AND time > %ds AND time < %ds GROUP BY time(%ds) FILL(null)`, strings.Join(selects, ", "), SpanpointMeasurement, ServiceNameTag, serviceName, OperationNameTag, operationName, start, end, interval) log.V(10).Info("query command %s", command) query := influxdb.NewQuery(command, d.influxDatabase, "1s") resp, err := d.influxdbClient.Query(query) if err != nil { return nil, err } if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 { return new(model.Series), nil } series := new(model.Series) fieldMap := make(map[int]*model.SeriesItem) for _, row := range resp.Results[0].Series { // first colums is time for i, name := range row.Columns { if name == "time" { continue } fieldMap[i] = &model.SeriesItem{Field: name} } for _, value := range row.Values { for i, val := range value { if i == 0 { timestamp, _ := val.(json.Number).Int64() series.Timestamps = append(series.Timestamps, timestamp) continue } n, ok := val.(json.Number) if !ok { fieldMap[i].Rows = append(fieldMap[i].Rows, nil) } v, _ := n.Float64() fieldMap[i].Rows = append(fieldMap[i].Rows, &v) } } } for _, v := range fieldMap { series.Items = append(series.Items, v) } return series, nil } func (d *dao) PeerService(ctx context.Context, serviceName string) ([]string, error) { where := fmt.Sprintf("%s = '%s' AND %s = '%s'", ServiceNameTag, serviceName, SpanKindTag, "client") return d.showTagValues(ctx, PeerServiceTag, where) }