opslog.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // Package opslog provide ops-log api
  2. package opslog
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "io"
  8. "net/http"
  9. "strconv"
  10. "time"
  11. "github.com/pkg/errors"
  12. )
  13. const (
  14. _kbnVersion = "5.4.3"
  15. _indexPrefix = "billions-"
  16. _mqueryContentType = "application/x-ndjson"
  17. _ajsSessioID = "_AJSESSIONID"
  18. )
  19. // Err errors
  20. var (
  21. ErrOverRange = errors.New("search time over range")
  22. )
  23. type response struct {
  24. Hits struct {
  25. Hits []struct {
  26. Source map[string]interface{} `json:"_source"`
  27. } `json:"hits"`
  28. } `json:"hits"`
  29. }
  30. // Record represent log record
  31. type Record struct {
  32. Time time.Time `json:"timestamp"`
  33. Fields map[string]interface{} `json:"fields"`
  34. Level string `json:"level"`
  35. Message string `json:"message"`
  36. }
  37. // Client query log from ops-log
  38. type Client interface {
  39. Query(ctx context.Context, familys []string, traceID uint64, sessionID string, start, end int64, options ...Option) ([]*Record, error)
  40. }
  41. type option struct {
  42. traceField string
  43. size int
  44. level string
  45. }
  46. var _defaultOpt = option{
  47. traceField: "traceid",
  48. size: 100,
  49. }
  50. // Option for query
  51. type Option func(opt *option)
  52. // SetTraceField default "traceid"
  53. func SetTraceField(traceField string) Option {
  54. return func(opt *option) {
  55. opt.traceField = traceField
  56. }
  57. }
  58. // SetSize default 100
  59. func SetSize(size int) Option {
  60. return func(opt *option) {
  61. opt.size = size
  62. }
  63. }
  64. // SetLevel return all if level is empty
  65. func SetLevel(level string) Option {
  66. return func(opt *option) {
  67. opt.level = level
  68. }
  69. }
  70. // New ops-log client
  71. func New(searchAPI string, httpClient *http.Client) Client {
  72. if httpClient == nil {
  73. httpClient = http.DefaultClient
  74. }
  75. return &client{
  76. searchAPI: searchAPI,
  77. httpclient: httpClient,
  78. }
  79. }
  80. type client struct {
  81. searchAPI string
  82. httpclient *http.Client
  83. }
  84. func (c *client) Query(ctx context.Context, familys []string, traceID uint64, sessionID string, start, end int64, options ...Option) ([]*Record, error) {
  85. if start <= 0 || end <= 0 {
  86. return nil, ErrOverRange
  87. }
  88. if len(familys) == 0 {
  89. return make([]*Record, 0), nil
  90. }
  91. opt := _defaultOpt
  92. for _, fn := range options {
  93. fn(&opt)
  94. }
  95. req, err := c.newReq(familys, traceID, sessionID, start, end, &opt)
  96. if err != nil {
  97. return nil, err
  98. }
  99. resp, err := c.httpclient.Do(req)
  100. if err != nil {
  101. return nil, errors.Wrapf(err, "send request to %s fail", c.searchAPI)
  102. }
  103. defer resp.Body.Close()
  104. if resp.StatusCode/100 != 2 {
  105. buf := make([]byte, 1024)
  106. n, _ := resp.Body.Read(buf)
  107. return nil, errors.Errorf("ops-log response error: status_code: %d, body: %s", resp.StatusCode, buf[:n])
  108. }
  109. return decodeRecord(resp.Body)
  110. }
  111. func (c *client) newReq(familys []string, traceID uint64, sessionID string, start, end int64, opt *option) (*http.Request, error) {
  112. prefixTraceID := strconv.FormatUint(traceID, 16)
  113. leagcyTraceID := strconv.FormatUint(traceID, 10)
  114. startMillis := start * int64((time.Second / time.Millisecond))
  115. endMillis := end * int64((time.Second / time.Millisecond))
  116. body := &bytes.Buffer{}
  117. enc := json.NewEncoder(body)
  118. header := map[string]interface{}{"index": formatIndices(familys), "ignore_unavailable": true}
  119. if err := enc.Encode(header); err != nil {
  120. return nil, err
  121. }
  122. shoulds := []map[string]interface{}{
  123. {"prefix": map[string]interface{}{opt.traceField: prefixTraceID}},
  124. {"match": map[string]interface{}{opt.traceField: leagcyTraceID}},
  125. }
  126. traceQuery := map[string]interface{}{"bool": map[string]interface{}{"should": shoulds}}
  127. rangeQuery := map[string]interface{}{
  128. "range": map[string]interface{}{
  129. "@timestamp": map[string]interface{}{"gte": startMillis, "lte": endMillis, "format": "epoch_millis"},
  130. },
  131. }
  132. musts := []map[string]interface{}{traceQuery, rangeQuery}
  133. if opt.level != "" {
  134. musts = append(musts, map[string]interface{}{"match": map[string]interface{}{"level": opt.level}})
  135. }
  136. query := map[string]interface{}{
  137. "sort": map[string]interface{}{
  138. "@timestamp": map[string]interface{}{
  139. "order": "desc",
  140. "unmapped_type": "boolean",
  141. },
  142. },
  143. "query": map[string]interface{}{
  144. "bool": map[string]interface{}{"must": musts},
  145. },
  146. "version": true,
  147. "size": opt.size,
  148. }
  149. if err := enc.Encode(query); err != nil {
  150. return nil, err
  151. }
  152. req, err := http.NewRequest(http.MethodPost, c.searchAPI, body)
  153. if err != nil {
  154. return nil, err
  155. }
  156. session := &http.Cookie{Name: _ajsSessioID, Value: sessionID}
  157. req.AddCookie(session)
  158. req.Header.Set("Content-Type", _mqueryContentType)
  159. req.Header.Set("kbn-version", _kbnVersion)
  160. return req, nil
  161. }
  162. func decodeRecord(src io.Reader) ([]*Record, error) {
  163. var resp struct {
  164. Responses []response `json:"responses"`
  165. }
  166. if err := json.NewDecoder(src).Decode(&resp); err != nil {
  167. return nil, errors.Wrap(err, "decode response error")
  168. }
  169. if len(resp.Responses) == 0 {
  170. return nil, nil
  171. }
  172. records := make([]*Record, 0, len(resp.Responses[0].Hits.Hits))
  173. for _, hit := range resp.Responses[0].Hits.Hits {
  174. record := &Record{
  175. Fields: make(map[string]interface{}),
  176. }
  177. for k, v := range hit.Source {
  178. switch k {
  179. case "@timestamp":
  180. s, _ := v.(string)
  181. record.Time, _ = time.Parse(time.RFC3339Nano, s)
  182. case "log":
  183. s, _ := v.(string)
  184. record.Message = s
  185. case "level":
  186. s, _ := v.(string)
  187. record.Level = s
  188. default:
  189. record.Fields[k] = v
  190. }
  191. }
  192. records = append(records, record)
  193. }
  194. return records, nil
  195. }
  196. func formatIndices(familys []string) []string {
  197. indices := make([]string, len(familys))
  198. for i := range familys {
  199. indices[i] = _indexPrefix + familys[i] + "*"
  200. }
  201. return indices
  202. }