dao.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/dgryski/go-farm"
  9. influxdb "github.com/influxdata/influxdb/client/v2"
  10. "github.com/tsuna/gohbase"
  11. "github.com/tsuna/gohbase/hrpc"
  12. "go-common/app/service/main/dapper/conf"
  13. "go-common/app/service/main/dapper/model"
  14. "go-common/library/log"
  15. )
  16. const (
  17. defaultHbaseNameSpace = "ugc"
  18. defaultInfluxDatabase = "dapper"
  19. hbaseRawTraceTable = "DapperRawtrace"
  20. hbaseRawTraceFamily = "pb"
  21. hbaseListIdxTable = "DapperListidx"
  22. hbaseListIdxFamily = "kind"
  23. serviceNameTag = "service_name"
  24. operationNameTag = "operation_name"
  25. peerServiceTag = "peer.service"
  26. spanKindTag = "span.kind"
  27. maxDurationField = "max_duration"
  28. minDurationField = "min_duration"
  29. avgDurationField = "avg_duration"
  30. spanpointMeasurement = "span_point"
  31. errorsField = "errors"
  32. )
  33. // Dao interface
  34. type Dao interface {
  35. // WriteRawSpan to hbase
  36. WriteRawTrace(ctx context.Context, rowKey string, values map[string][]byte) error
  37. // BatchWriteSpanPoint
  38. BatchWriteSpanPoint(ctx context.Context, spanPoints []*model.SpanPoint) error
  39. // Fetch ServiceName
  40. FetchServiceName(ctx context.Context) ([]string, error)
  41. // Fetch OperationName
  42. FetchOperationName(ctx context.Context, serviceName string) ([]string, error)
  43. }
  44. // New dao
  45. func New(cfg *conf.Config) (Dao, error) {
  46. // disable rpc queue
  47. hbaseClient := gohbase.NewClient(cfg.HBase.Addrs, gohbase.RpcQueueSize(0))
  48. hbaseNameSpace := defaultHbaseNameSpace
  49. if cfg.HBase.Namespace != "" {
  50. hbaseNameSpace = cfg.HBase.Namespace
  51. }
  52. influxdbCfg := influxdb.HTTPConfig{Addr: cfg.InfluxDB.Addr, Username: cfg.InfluxDB.Username, Password: cfg.InfluxDB.Password}
  53. influxdbClient, err := influxdb.NewHTTPClient(influxdbCfg)
  54. if err != nil {
  55. return nil, err
  56. }
  57. influxDatabase := defaultInfluxDatabase
  58. if cfg.InfluxDB.Database != "" {
  59. influxDatabase = cfg.InfluxDB.Database
  60. }
  61. return &dao{
  62. hbaseNameSpace: hbaseNameSpace,
  63. hbaseClient: hbaseClient,
  64. influxDatabase: influxDatabase,
  65. influxdbClient: influxdbClient,
  66. }, nil
  67. }
  68. var _ Dao = &dao{}
  69. type dao struct {
  70. hbaseNameSpace string
  71. hbaseClient gohbase.Client
  72. influxDatabase string
  73. influxdbClient influxdb.Client
  74. }
  75. func (d *dao) WriteRawTrace(ctx context.Context, rowKey string, values map[string][]byte) error {
  76. table := d.hbaseNameSpace + ":" + hbaseRawTraceTable
  77. put, err := hrpc.NewPutStr(ctx, table, rowKey, map[string]map[string][]byte{hbaseRawTraceFamily: values})
  78. if err != nil {
  79. return err
  80. }
  81. _, err = d.hbaseClient.Put(put)
  82. return err
  83. }
  84. func (d *dao) BatchWriteSpanPoint(ctx context.Context, spanPoints []*model.SpanPoint) error {
  85. var messages []string
  86. batchPoint, err := influxdb.NewBatchPoints(influxdb.BatchPointsConfig{Database: d.influxDatabase, Precision: "1s"})
  87. if err != nil {
  88. return err
  89. }
  90. for _, spanPoint := range spanPoints {
  91. if err := d.writeSamplePoint(ctx, spanPoint); err != nil {
  92. messages = append(messages, err.Error())
  93. }
  94. if point, err := toInfluxDBPoint(spanPoint); err != nil {
  95. messages = append(messages, err.Error())
  96. } else {
  97. batchPoint.AddPoint(point)
  98. }
  99. }
  100. if err := d.influxdbClient.Write(batchPoint); err != nil {
  101. messages = append(messages, err.Error())
  102. }
  103. if len(messages) != 0 {
  104. return fmt.Errorf("%s", strings.Join(messages, "\n"))
  105. }
  106. return nil
  107. }
  108. func (d *dao) FetchServiceName(ctx context.Context) ([]string, error) {
  109. command := fmt.Sprintf("SHOW TAG VALUES FROM span_point WITH KEY = %s", serviceNameTag)
  110. log.V(10).Info("query command %s", command)
  111. query := influxdb.NewQuery(command, d.influxDatabase, "1s")
  112. resp, err := d.influxdbClient.Query(query)
  113. if err != nil {
  114. return nil, err
  115. }
  116. if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 {
  117. return make([]string, 0), nil
  118. }
  119. rows := resp.Results[0].Series[0]
  120. serviceNames := make([]string, 0, len(rows.Values))
  121. for _, kv := range rows.Values {
  122. if len(kv) != 2 {
  123. continue
  124. }
  125. if serviceName, ok := kv[1].(string); ok {
  126. serviceNames = append(serviceNames, serviceName)
  127. }
  128. }
  129. return serviceNames, nil
  130. }
  131. func (d *dao) FetchOperationName(ctx context.Context, serviceName string) ([]string, error) {
  132. command := fmt.Sprintf("SHOW TAG VALUES FROM %s WITH KEY = %s WHERE %s = '%s' AND %s = '%s'",
  133. spanpointMeasurement, operationNameTag, serviceNameTag, serviceName, spanKindTag, "server")
  134. log.V(10).Info("query command %s", command)
  135. query := influxdb.NewQuery(command, d.influxDatabase, "1s")
  136. resp, err := d.influxdbClient.Query(query)
  137. if err != nil {
  138. return nil, err
  139. }
  140. if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 {
  141. return make([]string, 0), nil
  142. }
  143. rows := resp.Results[0].Series[0]
  144. operationNames := make([]string, 0, len(rows.Values))
  145. for _, kv := range rows.Values {
  146. if len(kv) != 2 {
  147. continue
  148. }
  149. if operationName, ok := kv[1].(string); ok {
  150. operationNames = append(operationNames, operationName)
  151. }
  152. }
  153. return operationNames, nil
  154. }
  155. func (d *dao) writeSamplePoint(ctx context.Context, spanPoint *model.SpanPoint) error {
  156. table := d.hbaseNameSpace + ":" + hbaseListIdxTable
  157. rowKey := listIdxKey(spanPoint)
  158. values := make(map[string][]byte)
  159. values = fuelDurationSamplePoint(values, spanPoint.MaxDuration, spanPoint.AvgDuration, spanPoint.MinDuration)
  160. values = fuelErrrorSamplePoint(values, spanPoint.Errors...)
  161. put, err := hrpc.NewPutStr(ctx, table, rowKey, map[string]map[string][]byte{hbaseListIdxFamily: values})
  162. if err != nil {
  163. return err
  164. }
  165. _, err = d.hbaseClient.Put(put)
  166. return err
  167. }
  168. func listIdxKey(spanPoint *model.SpanPoint) string {
  169. serviceNameHash := farm.Hash32([]byte(spanPoint.ServiceName))
  170. operationNameHash := farm.Hash32([]byte(spanPoint.OperationName))
  171. return fmt.Sprintf("%x%x%d", serviceNameHash, operationNameHash, spanPoint.Timestamp)
  172. }
  173. func fuelDurationSamplePoint(values map[string][]byte, points ...model.SamplePoint) map[string][]byte {
  174. for i := range points {
  175. key := "d:" + strconv.FormatInt(points[i].Value, 10)
  176. values[key] = []byte(fmt.Sprintf("%x:%x", points[i].TraceID, points[i].SpanID))
  177. }
  178. return values
  179. }
  180. func fuelErrrorSamplePoint(values map[string][]byte, points ...model.SamplePoint) map[string][]byte {
  181. for i := range points {
  182. key := "e:" + strconv.FormatInt(points[i].Value, 10)
  183. values[key] = []byte(fmt.Sprintf("%x:%x", points[i].TraceID, points[i].SpanID))
  184. }
  185. return values
  186. }
  187. func toInfluxDBPoint(spanPoint *model.SpanPoint) (*influxdb.Point, error) {
  188. tags := map[string]string{
  189. serviceNameTag: spanPoint.ServiceName,
  190. operationNameTag: spanPoint.OperationName,
  191. spanKindTag: spanPoint.SpanKind,
  192. peerServiceTag: spanPoint.PeerService,
  193. }
  194. fields := map[string]interface{}{
  195. maxDurationField: spanPoint.MaxDuration.Value,
  196. minDurationField: spanPoint.MinDuration.Value,
  197. avgDurationField: spanPoint.AvgDuration.Value,
  198. errorsField: len(spanPoint.Errors),
  199. }
  200. return influxdb.NewPoint(spanpointMeasurement, tags, fields, time.Unix(spanPoint.Timestamp, 0))
  201. }