dao.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "math"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. influxdb "github.com/influxdata/influxdb/client/v2"
  14. "github.com/tsuna/gohbase"
  15. "github.com/tsuna/gohbase/filter"
  16. "github.com/tsuna/gohbase/hrpc"
  17. "go-common/app/service/main/dapper-query/conf"
  18. "go-common/app/service/main/dapper-query/model"
  19. "go-common/library/log"
  20. )
  21. // Order
  22. const (
  23. TimeDesc = "time:desc"
  24. TimeAsc = "time:asc"
  25. DurationDesc = "duration:desc"
  26. DurationAsc = "duration:asc"
  27. )
  28. // ErrNotFound data not found
  29. var ErrNotFound = errors.New("not found")
  30. // Selector to selector span
  31. type Selector struct {
  32. Start int64
  33. End int64
  34. Limit int
  35. Offset int
  36. OnlyError bool
  37. }
  38. // table name
  39. const (
  40. DefaultHbaseNameSpace = "ugc"
  41. DefaultInfluxDatabase = "dapper"
  42. HbaseRawTraceTable = "DapperRawtrace"
  43. HbaseRawTraceFamily = "pb"
  44. HbaseListIdxTable = "DapperListidx"
  45. HbaseListIdxFamily = "kind"
  46. ServiceNameTag = "service_name"
  47. OperationNameTag = "operation_name"
  48. PeerServiceTag = "peer.service"
  49. SpanKindTag = "span.kind"
  50. MaxDurationField = "max_duration"
  51. MinDurationField = "min_duration"
  52. AvgDurationField = "avg_duration"
  53. SpanpointMeasurement = "span_point"
  54. ErrorsField = "errors"
  55. )
  56. // Dao dapper dao
  57. type Dao interface {
  58. // list all ServiceNames
  59. ServiceNames(ctx context.Context) ([]string, error)
  60. // list OperationName for specifically service
  61. OperationNames(ctx context.Context, serviceName string) ([]string, error)
  62. // QuerySpan by family title and sel
  63. QuerySpanList(ctx context.Context, serviceName, operationName string, sel *Selector, order string) ([]model.SpanListRef, error)
  64. // Trace get trace by trace id span sort by start_time
  65. Trace(ctx context.Context, traceID uint64, spanIDs ...uint64) ([]*model.Span, error)
  66. // PeerService query all peer service depend by service
  67. PeerService(ctx context.Context, serviceName string) ([]string, error)
  68. // Ping pong
  69. Ping(ctx context.Context) error
  70. // Close dao
  71. Close(ctx context.Context) error
  72. // SupportOrder checkou order support
  73. SupportOrder(order string) bool
  74. // MeanOperationNameField
  75. MeanOperationNameField(ctx context.Context, whereMap map[string]string, field string, start, end int64, groupby []string) ([]model.MeanOperationNameValue, error)
  76. // SpanSeriesMean
  77. SpanSeriesMean(ctx context.Context, serviceName, operationName string, fields []string, start, end, interval int64) (*model.Series, error)
  78. // SpanSeriesCount
  79. SpanSeriesCount(ctx context.Context, serviceName, operationName string, fields []string, start, end, interval int64) (*model.Series, error)
  80. }
  81. type dao struct {
  82. hbaseNameSpace string
  83. hbaseClient gohbase.Client
  84. influxDatabase string
  85. influxdbClient influxdb.Client
  86. }
  87. func (d *dao) Ping(ctx context.Context) error {
  88. return nil
  89. }
  90. func (d *dao) Close(ctx context.Context) error {
  91. d.hbaseClient.Close()
  92. return d.influxdbClient.Close()
  93. }
  94. // New dao
  95. func New(cfg *conf.Config) (Dao, error) {
  96. // disable rpc queue
  97. hbaseClient := gohbase.NewClient(cfg.HBase.Addrs, gohbase.RpcQueueSize(0))
  98. hbaseNameSpace := DefaultHbaseNameSpace
  99. if cfg.HBase.Namespace != "" {
  100. hbaseNameSpace = cfg.HBase.Namespace
  101. }
  102. influxdbCfg := influxdb.HTTPConfig{Addr: cfg.InfluxDB.Addr, Username: cfg.InfluxDB.Username, Password: cfg.InfluxDB.Password}
  103. influxdbClient, err := influxdb.NewHTTPClient(influxdbCfg)
  104. if err != nil {
  105. return nil, err
  106. }
  107. influxDatabase := DefaultInfluxDatabase
  108. if cfg.InfluxDB.Database != "" {
  109. influxDatabase = cfg.InfluxDB.Database
  110. }
  111. return &dao{
  112. hbaseNameSpace: hbaseNameSpace,
  113. hbaseClient: hbaseClient,
  114. influxDatabase: influxDatabase,
  115. influxdbClient: influxdbClient,
  116. }, nil
  117. }
  118. func (d *dao) ServiceNames(ctx context.Context) ([]string, error) {
  119. where := fmt.Sprintf("%s = '%s'", SpanKindTag, "server")
  120. return d.showTagValues(ctx, ServiceNameTag, where)
  121. }
  122. func (d *dao) showTagValues(ctx context.Context, tag, where string) ([]string, error) {
  123. command := fmt.Sprintf(`SHOW TAG VALUES FROM "%s" WITH KEY = "%s" WHERE %s`,
  124. SpanpointMeasurement, tag, where)
  125. log.V(10).Info("query command %s", command)
  126. query := influxdb.NewQuery(command, d.influxDatabase, "1s")
  127. resp, err := d.influxdbClient.Query(query)
  128. if err != nil {
  129. return nil, err
  130. }
  131. if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 {
  132. return make([]string, 0), nil
  133. }
  134. rows := resp.Results[0].Series[0]
  135. values := make([]string, 0, len(rows.Values))
  136. for _, kv := range rows.Values {
  137. if len(kv) != 2 {
  138. continue
  139. }
  140. if value, ok := kv[1].(string); ok {
  141. values = append(values, value)
  142. }
  143. }
  144. return values, nil
  145. }
  146. func (d *dao) OperationNames(ctx context.Context, serviceName string) ([]string, error) {
  147. where := fmt.Sprintf("%s = '%s' AND %s = '%s'", ServiceNameTag, serviceName, SpanKindTag, "server")
  148. return d.showTagValues(ctx, OperationNameTag, where)
  149. }
  150. func (d *dao) QuerySpanList(ctx context.Context, serviceName string, operationName string, sel *Selector, order string) ([]model.SpanListRef, error) {
  151. log.V(10).Info("query span list serviceName: %s, operationName: %s, sel: %+v order: %s", serviceName, operationName, sel, order)
  152. prefix := keyPrefix(serviceName, operationName)
  153. startKey, stopKey := rangeKey(prefix, sel.Start, sel.End)
  154. switch order {
  155. case TimeAsc, TimeDesc:
  156. return d.querySpanListTimeOrder(ctx, startKey, stopKey, prefix, sel.Limit, sel.Offset, order == TimeDesc, sel.OnlyError)
  157. case DurationDesc, DurationAsc:
  158. return d.querySpanListDurationOrder(ctx, startKey, stopKey, prefix, sel.Limit, sel.Offset, order == DurationDesc, sel.OnlyError)
  159. }
  160. return nil, fmt.Errorf("unsupport order")
  161. }
  162. func parseSpanListRef(cell *hrpc.Cell) (spanListRef model.SpanListRef, err error) {
  163. value := cell.Value
  164. ref := bytes.SplitN(value, []byte(":"), 2)
  165. if len(ref) != 2 {
  166. err = fmt.Errorf("invalid ref %s", value)
  167. return
  168. }
  169. if spanListRef.TraceID, err = strconv.ParseUint(string(ref[0]), 16, 64); err != nil {
  170. return
  171. }
  172. spanListRef.SpanID, err = strconv.ParseUint(string(ref[1]), 16, 64)
  173. if err != nil {
  174. return
  175. }
  176. kd := bytes.SplitN(cell.Qualifier, []byte(":"), 2)
  177. if len(kd) != 2 {
  178. err = fmt.Errorf("invalid qualifier %s", cell.Qualifier)
  179. return
  180. }
  181. if bytes.Equal(kd[0], []byte("e")) {
  182. spanListRef.IsError = true
  183. }
  184. spanListRef.Duration, err = strconv.ParseInt(string(kd[1]), 10, 64)
  185. return
  186. }
  187. type minHeapifyListRef []model.SpanListRef
  188. func (m minHeapifyListRef) push(listRef model.SpanListRef) {
  189. if m[0].Duration > listRef.Duration {
  190. return
  191. }
  192. m[0] = listRef
  193. m.minHeapify(0)
  194. }
  195. func (m minHeapifyListRef) minHeapify(i int) {
  196. var lowest int
  197. left := (i+1)*2 - 1
  198. right := (i + 1) * 2
  199. if left < len(m) && m[left].Duration < m[i].Duration {
  200. lowest = left
  201. } else {
  202. lowest = i
  203. }
  204. if right < len(m) && m[right].Duration < m[lowest].Duration {
  205. lowest = right
  206. }
  207. if lowest != i {
  208. m[i], m[lowest] = m[lowest], m[i]
  209. m.minHeapify(lowest)
  210. }
  211. }
  212. func (m minHeapifyListRef) Len() int {
  213. return len(m)
  214. }
  215. func (m minHeapifyListRef) Less(i, j int) bool {
  216. return m[i].Duration > m[j].Duration
  217. }
  218. func (m minHeapifyListRef) Swap(i, j int) {
  219. m[i], m[j] = m[j], m[i]
  220. }
  221. func (d *dao) querySpanListDurationOrder(ctx context.Context, startKey, stopKey, prefix string, limit, offset int, reverse bool, onlyError bool) ([]model.SpanListRef, error) {
  222. var options []func(hrpc.Call) error
  223. options = append(options, hrpc.Filters(filter.NewPrefixFilter([]byte(prefix))))
  224. if reverse {
  225. startKey, stopKey = stopKey, startKey
  226. options = append(options, hrpc.Reversed())
  227. }
  228. table := d.hbaseNameSpace + ":" + HbaseListIdxTable
  229. scan, err := hrpc.NewScanRangeStr(ctx, table, startKey, stopKey, options...)
  230. if err != nil {
  231. return nil, err
  232. }
  233. scanner := d.hbaseClient.Scan(scan)
  234. defer scanner.Close()
  235. spanListRefs := make(minHeapifyListRef, limit+offset)
  236. for {
  237. result, err := scanner.Next()
  238. if err != nil {
  239. if err != io.EOF {
  240. return nil, err
  241. }
  242. break
  243. }
  244. if len(result.Cells) > 0 {
  245. log.V(10).Info("scan rowkey %s", result.Cells[0].Row)
  246. }
  247. for _, cell := range result.Cells {
  248. if string(cell.Family) != HbaseListIdxFamily {
  249. continue
  250. }
  251. spanListRef, err := parseSpanListRef(cell)
  252. if err != nil {
  253. // ignored error?
  254. return nil, err
  255. }
  256. if onlyError && !spanListRef.IsError {
  257. continue
  258. }
  259. if !reverse {
  260. spanListRef.Duration = math.MaxInt64 - spanListRef.Duration
  261. }
  262. spanListRefs.push(spanListRef)
  263. }
  264. }
  265. sort.Sort(spanListRefs)
  266. for i := range spanListRefs[offset:] {
  267. if spanListRefs[offset+i].TraceID == 0 {
  268. spanListRefs = spanListRefs[:offset+i]
  269. break
  270. }
  271. }
  272. return spanListRefs[offset:], nil
  273. }
  274. func (d *dao) querySpanListTimeOrder(ctx context.Context, startKey, stopKey, prefix string, limit, offset int, reverse bool, onlyError bool) ([]model.SpanListRef, error) {
  275. var options []func(hrpc.Call) error
  276. options = append(options, hrpc.Filters(filter.NewPrefixFilter([]byte(prefix))))
  277. if reverse {
  278. startKey, stopKey = stopKey, startKey
  279. options = append(options, hrpc.Reversed())
  280. }
  281. table := d.hbaseNameSpace + ":" + HbaseListIdxTable
  282. scan, err := hrpc.NewScanRangeStr(ctx, table, startKey, stopKey, options...)
  283. if err != nil {
  284. return nil, err
  285. }
  286. scanner := d.hbaseClient.Scan(scan)
  287. defer scanner.Close()
  288. spanListRefs := make([]model.SpanListRef, 0, limit)
  289. for {
  290. result, err := scanner.Next()
  291. if err != nil {
  292. if err != io.EOF {
  293. return nil, err
  294. }
  295. break
  296. }
  297. if len(result.Cells) > 0 {
  298. log.V(10).Info("scan rowkey %s", result.Cells[0].Row)
  299. }
  300. for _, cell := range result.Cells {
  301. if string(cell.Family) != HbaseListIdxFamily {
  302. continue
  303. }
  304. spanListRef, err := parseSpanListRef(cell)
  305. if err != nil {
  306. // ignored error?
  307. return nil, err
  308. }
  309. if onlyError && !spanListRef.IsError {
  310. continue
  311. }
  312. if offset > 0 {
  313. offset--
  314. continue
  315. }
  316. if limit <= 0 {
  317. break
  318. }
  319. if err != nil {
  320. // ignored error?
  321. return nil, err
  322. }
  323. spanListRefs = append(spanListRefs, spanListRef)
  324. limit--
  325. }
  326. }
  327. return spanListRefs, nil
  328. }
  329. func (d *dao) Trace(ctx context.Context, traceID uint64, spanIDs ...uint64) ([]*model.Span, error) {
  330. table := d.hbaseNameSpace + ":" + HbaseRawTraceTable
  331. traceIDStr := strconv.FormatUint(traceID, 16)
  332. var options []func(hrpc.Call) error
  333. if len(spanIDs) != 0 {
  334. filters := make([]filter.Filter, 0, len(spanIDs))
  335. for _, spanID := range spanIDs {
  336. spanIDStr := strconv.FormatUint(spanID, 16)
  337. filters = append(filters, filter.NewColumnPrefixFilter([]byte(spanIDStr)))
  338. }
  339. options = append(options, hrpc.Filters(filter.NewList(filter.MustPassOne, filters...)))
  340. }
  341. get, err := hrpc.NewGetStr(ctx, table, traceIDStr, options...)
  342. if err != nil {
  343. return nil, err
  344. }
  345. result, err := d.hbaseClient.Get(get)
  346. if err != nil {
  347. return nil, err
  348. }
  349. spans := make([]*model.Span, 0, len(result.Cells))
  350. for _, cell := range result.Cells {
  351. if string(cell.Family) != HbaseRawTraceFamily {
  352. continue
  353. }
  354. span, err := model.FromProtoSpan(cell.Value)
  355. if err != nil {
  356. // TODO: ignore error?
  357. log.Error("unmarshal protobuf span data rowkey: %x, cf: %s:%s error: %s", traceID, cell.Family, cell.Qualifier, err)
  358. continue
  359. }
  360. spans = append(spans, span)
  361. }
  362. sort.Slice(spans, func(i, j int) bool {
  363. return spans[i].StartTime.UnixNano() > spans[j].StartTime.UnixNano()
  364. })
  365. return spans, nil
  366. }
  367. func (d *dao) SupportOrder(order string) bool {
  368. switch order {
  369. case TimeAsc, TimeDesc, DurationAsc, DurationDesc:
  370. return true
  371. }
  372. return false
  373. }
  374. func (d *dao) MeanOperationNameField(ctx context.Context, whereMap map[string]string, field string, start, end int64, orderby []string) ([]model.MeanOperationNameValue, error) {
  375. var wheres []string
  376. for k, v := range whereMap {
  377. wheres = append(wheres, fmt.Sprintf(`%s='%s'`, k, v))
  378. }
  379. command := fmt.Sprintf(`SELECT mean("%s") AS mean_%s FROM "%s" WHERE %s AND time > %ds AND time < %ds GROUP BY %s`,
  380. field,
  381. field,
  382. SpanpointMeasurement,
  383. strings.Join(wheres, " AND "),
  384. start, end,
  385. strings.Join(orderby, ", "),
  386. )
  387. log.V(10).Info("query command %s", command)
  388. query := influxdb.NewQuery(command, d.influxDatabase, "1s")
  389. resp, err := d.influxdbClient.Query(query)
  390. if err != nil {
  391. return nil, err
  392. }
  393. if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 {
  394. return make([]model.MeanOperationNameValue, 0), nil
  395. }
  396. values := make([]model.MeanOperationNameValue, 0, len(resp.Results[0].Series))
  397. for _, row := range resp.Results[0].Series {
  398. value := model.MeanOperationNameValue{Tag: row.Tags}
  399. if len(row.Values) == 0 || len(row.Values[0]) < 2 {
  400. continue
  401. }
  402. valStr, ok := row.Values[0][1].(json.Number)
  403. if !ok {
  404. continue
  405. }
  406. // 相信 ifluxdb 不会瞎返回的
  407. value.Value, _ = valStr.Float64()
  408. values = append(values, value)
  409. }
  410. return values, nil
  411. }
  412. func (d *dao) SpanSeriesMean(ctx context.Context, serviceName, operationName string, fields []string, start, end, interval int64) (*model.Series, error) {
  413. return d.spanSeries(ctx, serviceName, operationName, "mean", fields, start, end, interval)
  414. }
  415. func (d *dao) SpanSeriesCount(ctx context.Context, serviceName, operationName string, fields []string, start, end, interval int64) (*model.Series, error) {
  416. return d.spanSeries(ctx, serviceName, operationName, "count", fields, start, end, interval)
  417. }
  418. func (d *dao) spanSeries(ctx context.Context, serviceName, operationName, fn string, fields []string, start, end, interval int64) (*model.Series, error) {
  419. var selects []string
  420. for _, field := range fields {
  421. selects = append(selects, fmt.Sprintf(`%s("%s") AS "%s_%s"`, fn, field, fn, field))
  422. }
  423. command := fmt.Sprintf(`SELECT %s FROM %s WHERE "%s"='%s' AND "%s"='%s' AND time > %ds AND time < %ds GROUP BY time(%ds) FILL(null)`,
  424. strings.Join(selects, ", "),
  425. SpanpointMeasurement,
  426. ServiceNameTag, serviceName,
  427. OperationNameTag, operationName,
  428. start, end, interval)
  429. log.V(10).Info("query command %s", command)
  430. query := influxdb.NewQuery(command, d.influxDatabase, "1s")
  431. resp, err := d.influxdbClient.Query(query)
  432. if err != nil {
  433. return nil, err
  434. }
  435. if len(resp.Results) == 0 || len(resp.Results[0].Series) == 0 {
  436. return new(model.Series), nil
  437. }
  438. series := new(model.Series)
  439. fieldMap := make(map[int]*model.SeriesItem)
  440. for _, row := range resp.Results[0].Series {
  441. // first colums is time
  442. for i, name := range row.Columns {
  443. if name == "time" {
  444. continue
  445. }
  446. fieldMap[i] = &model.SeriesItem{Field: name}
  447. }
  448. for _, value := range row.Values {
  449. for i, val := range value {
  450. if i == 0 {
  451. timestamp, _ := val.(json.Number).Int64()
  452. series.Timestamps = append(series.Timestamps, timestamp)
  453. continue
  454. }
  455. n, ok := val.(json.Number)
  456. if !ok {
  457. fieldMap[i].Rows = append(fieldMap[i].Rows, nil)
  458. }
  459. v, _ := n.Float64()
  460. fieldMap[i].Rows = append(fieldMap[i].Rows, &v)
  461. }
  462. }
  463. }
  464. for _, v := range fieldMap {
  465. series.Items = append(series.Items, v)
  466. }
  467. return series, nil
  468. }
  469. func (d *dao) PeerService(ctx context.Context, serviceName string) ([]string, error) {
  470. where := fmt.Sprintf("%s = '%s' AND %s = '%s'", ServiceNameTag, serviceName, SpanKindTag, "client")
  471. return d.showTagValues(ctx, PeerServiceTag, where)
  472. }