123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- package dao
- import (
- "context"
- "fmt"
- "strconv"
- "strings"
- "time"
- "github.com/dgryski/go-farm"
- influxdb "github.com/influxdata/influxdb/client/v2"
- "github.com/tsuna/gohbase"
- "github.com/tsuna/gohbase/hrpc"
- "go-common/app/service/main/dapper/conf"
- "go-common/app/service/main/dapper/model"
- "go-common/library/log"
- )
- const (
- defaultHbaseNameSpace = "ugc"
- defaultInfluxDatabase = "dapper"
- hbaseRawTraceTable = "DapperRawtrace"
- hbaseRawTraceFamily = "pb"
- hbaseListIdxTable = "DapperListidx"
- hbaseListIdxFamily = "kind"
- serviceNameTag = "service_name"
- operationNameTag = "operation_name"
- peerServiceTag = "peer.service"
- spanKindTag = "span.kind"
- maxDurationField = "max_duration"
- minDurationField = "min_duration"
- avgDurationField = "avg_duration"
- spanpointMeasurement = "span_point"
- errorsField = "errors"
- )
- // Dao interface
- type Dao interface {
- // WriteRawSpan to hbase
- WriteRawTrace(ctx context.Context, rowKey string, values map[string][]byte) error
- // BatchWriteSpanPoint
- BatchWriteSpanPoint(ctx context.Context, spanPoints []*model.SpanPoint) error
- // Fetch ServiceName
- FetchServiceName(ctx context.Context) ([]string, error)
- // Fetch OperationName
- FetchOperationName(ctx context.Context, serviceName string) ([]string, error)
- }
- // New dao
- func New(cfg *conf.Config) (Dao, error) {
- // disable rpc queue
- hbaseClient := gohbase.NewClient(cfg.HBase.Addrs, gohbase.RpcQueueSize(0))
- hbaseNameSpace := defaultHbaseNameSpace
- if cfg.HBase.Namespace != "" {
- hbaseNameSpace = cfg.HBase.Namespace
- }
- influxdbCfg := influxdb.HTTPConfig{Addr: cfg.InfluxDB.Addr, Username: cfg.InfluxDB.Username, Password: cfg.InfluxDB.Password}
- influxdbClient, err := influxdb.NewHTTPClient(influxdbCfg)
- if err != nil {
- return nil, err
- }
- influxDatabase := defaultInfluxDatabase
- if cfg.InfluxDB.Database != "" {
- influxDatabase = cfg.InfluxDB.Database
- }
- return &dao{
- hbaseNameSpace: hbaseNameSpace,
- hbaseClient: hbaseClient,
- influxDatabase: influxDatabase,
- influxdbClient: influxdbClient,
- }, nil
- }
- var _ Dao = &dao{}
- type dao struct {
- hbaseNameSpace string
- hbaseClient gohbase.Client
- influxDatabase string
- influxdbClient influxdb.Client
- }
- func (d *dao) WriteRawTrace(ctx context.Context, rowKey string, values map[string][]byte) error {
- table := d.hbaseNameSpace + ":" + hbaseRawTraceTable
- put, err := hrpc.NewPutStr(ctx, table, rowKey, map[string]map[string][]byte{hbaseRawTraceFamily: values})
- if err != nil {
- return err
- }
- _, err = d.hbaseClient.Put(put)
- return err
- }
- func (d *dao) BatchWriteSpanPoint(ctx context.Context, spanPoints []*model.SpanPoint) error {
- var messages []string
- batchPoint, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{Database: d.influxDatabase, Precision: "1s"})
- if err != nil {
- return err
- }
- for _, spanPoint := range spanPoints {
- if err := d.writeSamplePoint(ctx, spanPoint); err != nil {
- messages = append(messages, err.Error())
- }
- if point, err := toInfluxDBPoint(spanPoint); err != nil {
- messages = append(messages, err.Error())
- } else {
- batchPoint.AddPoint(point)
- }
- }
- if err := d.influxdbClient.Write(batchPoint); err != nil {
- messages = append(messages, err.Error())
- }
- if len(messages) != 0 {
- return fmt.Errorf("%s", strings.Join(messages, "\n"))
- }
- return nil
- }
- func (d *dao) FetchServiceName(ctx context.Context) ([]string, error) {
- command := fmt.Sprintf("SHOW TAG VALUES FROM span_point WITH KEY = %s", serviceNameTag)
- log.V(10).Info("query command %s", command)
- query := influxdb.NewQuery(command, d.influxDatabase, "1s")
- resp, err := d.influxdbClient.Query(query)
- if err != nil {
- return nil, err
- }
- if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 {
- return make([]string, 0), nil
- }
- rows := resp.Results[0].Series[0]
- serviceNames := make([]string, 0, len(rows.Values))
- for _, kv := range rows.Values {
- if len(kv) != 2 {
- continue
- }
- if serviceName, ok := kv[1].(string); ok {
- serviceNames = append(serviceNames, serviceName)
- }
- }
- return serviceNames, nil
- }
- func (d *dao) FetchOperationName(ctx context.Context, serviceName string) ([]string, error) {
- command := fmt.Sprintf("SHOW TAG VALUES FROM %s WITH KEY = %s WHERE %s = '%s' AND %s = '%s'",
- spanpointMeasurement, operationNameTag, serviceNameTag, serviceName, spanKindTag, "server")
- log.V(10).Info("query command %s", command)
- query := influxdb.NewQuery(command, d.influxDatabase, "1s")
- resp, err := d.influxdbClient.Query(query)
- if err != nil {
- return nil, err
- }
- if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 {
- return make([]string, 0), nil
- }
- rows := resp.Results[0].Series[0]
- operationNames := make([]string, 0, len(rows.Values))
- for _, kv := range rows.Values {
- if len(kv) != 2 {
- continue
- }
- if operationName, ok := kv[1].(string); ok {
- operationNames = append(operationNames, operationName)
- }
- }
- return operationNames, nil
- }
- func (d *dao) writeSamplePoint(ctx context.Context, spanPoint *model.SpanPoint) error {
- table := d.hbaseNameSpace + ":" + hbaseListIdxTable
- rowKey := listIdxKey(spanPoint)
- values := make(map[string][]byte)
- values = fuelDurationSamplePoint(values, spanPoint.MaxDuration, spanPoint.AvgDuration, spanPoint.MinDuration)
- values = fuelErrrorSamplePoint(values, spanPoint.Errors...)
- put, err := hrpc.NewPutStr(ctx, table, rowKey, map[string]map[string][]byte{hbaseListIdxFamily: values})
- if err != nil {
- return err
- }
- _, err = d.hbaseClient.Put(put)
- return err
- }
- func listIdxKey(spanPoint *model.SpanPoint) string {
- serviceNameHash := farm.Hash32([]byte(spanPoint.ServiceName))
- operationNameHash := farm.Hash32([]byte(spanPoint.OperationName))
- return fmt.Sprintf("%x%x%d", serviceNameHash, operationNameHash, spanPoint.Timestamp)
- }
- func fuelDurationSamplePoint(values map[string][]byte, points ...model.SamplePoint) map[string][]byte {
- for i := range points {
- key := "d:" + strconv.FormatInt(points[i].Value, 10)
- values[key] = []byte(fmt.Sprintf("%x:%x", points[i].TraceID, points[i].SpanID))
- }
- return values
- }
- func fuelErrrorSamplePoint(values map[string][]byte, points ...model.SamplePoint) map[string][]byte {
- for i := range points {
- key := "e:" + strconv.FormatInt(points[i].Value, 10)
- values[key] = []byte(fmt.Sprintf("%x:%x", points[i].TraceID, points[i].SpanID))
- }
- return values
- }
- func toInfluxDBPoint(spanPoint *model.SpanPoint) (*influxdb.Point, error) {
- tags := map[string]string{
- serviceNameTag: spanPoint.ServiceName,
- operationNameTag: spanPoint.OperationName,
- spanKindTag: spanPoint.SpanKind,
- peerServiceTag: spanPoint.PeerService,
- }
- fields := map[string]interface{}{
- maxDurationField: spanPoint.MaxDuration.Value,
- minDurationField: spanPoint.MinDuration.Value,
- avgDurationField: spanPoint.AvgDuration.Value,
- errorsField: len(spanPoint.Errors),
- }
- return influxdb.NewPoint(spanpointMeasurement, tags, fields, time.Unix(spanPoint.Timestamp, 0))
- }
|