decode.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. // Copyright 2015 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package expfmt
  14. import (
  15. "fmt"
  16. "io"
  17. "math"
  18. "mime"
  19. "net/http"
  20. dto "github.com/prometheus/client_model/go"
  21. "github.com/matttproud/golang_protobuf_extensions/pbutil"
  22. "github.com/prometheus/common/model"
  23. )
  24. // Decoder types decode an input stream into metric families.
  25. type Decoder interface {
  26. Decode(*dto.MetricFamily) error
  27. }
  28. // DecodeOptions contains options used by the Decoder and in sample extraction.
  29. type DecodeOptions struct {
  30. // Timestamp is added to each value from the stream that has no explicit timestamp set.
  31. Timestamp model.Time
  32. }
  33. // ResponseFormat extracts the correct format from a HTTP response header.
  34. // If no matching format can be found FormatUnknown is returned.
  35. func ResponseFormat(h http.Header) Format {
  36. ct := h.Get(hdrContentType)
  37. mediatype, params, err := mime.ParseMediaType(ct)
  38. if err != nil {
  39. return FmtUnknown
  40. }
  41. const textType = "text/plain"
  42. switch mediatype {
  43. case ProtoType:
  44. if p, ok := params["proto"]; ok && p != ProtoProtocol {
  45. return FmtUnknown
  46. }
  47. if e, ok := params["encoding"]; ok && e != "delimited" {
  48. return FmtUnknown
  49. }
  50. return FmtProtoDelim
  51. case textType:
  52. if v, ok := params["version"]; ok && v != TextVersion {
  53. return FmtUnknown
  54. }
  55. return FmtText
  56. }
  57. return FmtUnknown
  58. }
  59. // NewDecoder returns a new decoder based on the given input format.
  60. // If the input format does not imply otherwise, a text format decoder is returned.
  61. func NewDecoder(r io.Reader, format Format) Decoder {
  62. switch format {
  63. case FmtProtoDelim:
  64. return &protoDecoder{r: r}
  65. }
  66. return &textDecoder{r: r}
  67. }
  68. // protoDecoder implements the Decoder interface for protocol buffers.
  69. type protoDecoder struct {
  70. r io.Reader
  71. }
  72. // Decode implements the Decoder interface.
  73. func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
  74. _, err := pbutil.ReadDelimited(d.r, v)
  75. if err != nil {
  76. return err
  77. }
  78. if !model.IsValidMetricName(model.LabelValue(v.GetName())) {
  79. return fmt.Errorf("invalid metric name %q", v.GetName())
  80. }
  81. for _, m := range v.GetMetric() {
  82. if m == nil {
  83. continue
  84. }
  85. for _, l := range m.GetLabel() {
  86. if l == nil {
  87. continue
  88. }
  89. if !model.LabelValue(l.GetValue()).IsValid() {
  90. return fmt.Errorf("invalid label value %q", l.GetValue())
  91. }
  92. if !model.LabelName(l.GetName()).IsValid() {
  93. return fmt.Errorf("invalid label name %q", l.GetName())
  94. }
  95. }
  96. }
  97. return nil
  98. }
  99. // textDecoder implements the Decoder interface for the text protocol.
  100. type textDecoder struct {
  101. r io.Reader
  102. p TextParser
  103. fams []*dto.MetricFamily
  104. }
  105. // Decode implements the Decoder interface.
  106. func (d *textDecoder) Decode(v *dto.MetricFamily) error {
  107. // TODO(fabxc): Wrap this as a line reader to make streaming safer.
  108. if len(d.fams) == 0 {
  109. // No cached metric families, read everything and parse metrics.
  110. fams, err := d.p.TextToMetricFamilies(d.r)
  111. if err != nil {
  112. return err
  113. }
  114. if len(fams) == 0 {
  115. return io.EOF
  116. }
  117. d.fams = make([]*dto.MetricFamily, 0, len(fams))
  118. for _, f := range fams {
  119. d.fams = append(d.fams, f)
  120. }
  121. }
  122. *v = *d.fams[0]
  123. d.fams = d.fams[1:]
  124. return nil
  125. }
  126. // SampleDecoder wraps a Decoder to extract samples from the metric families
  127. // decoded by the wrapped Decoder.
  128. type SampleDecoder struct {
  129. Dec Decoder
  130. Opts *DecodeOptions
  131. f dto.MetricFamily
  132. }
  133. // Decode calls the Decode method of the wrapped Decoder and then extracts the
  134. // samples from the decoded MetricFamily into the provided model.Vector.
  135. func (sd *SampleDecoder) Decode(s *model.Vector) error {
  136. err := sd.Dec.Decode(&sd.f)
  137. if err != nil {
  138. return err
  139. }
  140. *s, err = extractSamples(&sd.f, sd.Opts)
  141. return err
  142. }
  143. // ExtractSamples builds a slice of samples from the provided metric
  144. // families. If an error occurs during sample extraction, it continues to
  145. // extract from the remaining metric families. The returned error is the last
  146. // error that has occured.
  147. func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) (model.Vector, error) {
  148. var (
  149. all model.Vector
  150. lastErr error
  151. )
  152. for _, f := range fams {
  153. some, err := extractSamples(f, o)
  154. if err != nil {
  155. lastErr = err
  156. continue
  157. }
  158. all = append(all, some...)
  159. }
  160. return all, lastErr
  161. }
  162. func extractSamples(f *dto.MetricFamily, o *DecodeOptions) (model.Vector, error) {
  163. switch f.GetType() {
  164. case dto.MetricType_COUNTER:
  165. return extractCounter(o, f), nil
  166. case dto.MetricType_GAUGE:
  167. return extractGauge(o, f), nil
  168. case dto.MetricType_SUMMARY:
  169. return extractSummary(o, f), nil
  170. case dto.MetricType_UNTYPED:
  171. return extractUntyped(o, f), nil
  172. case dto.MetricType_HISTOGRAM:
  173. return extractHistogram(o, f), nil
  174. }
  175. return nil, fmt.Errorf("expfmt.extractSamples: unknown metric family type %v", f.GetType())
  176. }
  177. func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  178. samples := make(model.Vector, 0, len(f.Metric))
  179. for _, m := range f.Metric {
  180. if m.Counter == nil {
  181. continue
  182. }
  183. lset := make(model.LabelSet, len(m.Label)+1)
  184. for _, p := range m.Label {
  185. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  186. }
  187. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  188. smpl := &model.Sample{
  189. Metric: model.Metric(lset),
  190. Value: model.SampleValue(m.Counter.GetValue()),
  191. }
  192. if m.TimestampMs != nil {
  193. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  194. } else {
  195. smpl.Timestamp = o.Timestamp
  196. }
  197. samples = append(samples, smpl)
  198. }
  199. return samples
  200. }
  201. func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  202. samples := make(model.Vector, 0, len(f.Metric))
  203. for _, m := range f.Metric {
  204. if m.Gauge == nil {
  205. continue
  206. }
  207. lset := make(model.LabelSet, len(m.Label)+1)
  208. for _, p := range m.Label {
  209. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  210. }
  211. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  212. smpl := &model.Sample{
  213. Metric: model.Metric(lset),
  214. Value: model.SampleValue(m.Gauge.GetValue()),
  215. }
  216. if m.TimestampMs != nil {
  217. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  218. } else {
  219. smpl.Timestamp = o.Timestamp
  220. }
  221. samples = append(samples, smpl)
  222. }
  223. return samples
  224. }
  225. func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  226. samples := make(model.Vector, 0, len(f.Metric))
  227. for _, m := range f.Metric {
  228. if m.Untyped == nil {
  229. continue
  230. }
  231. lset := make(model.LabelSet, len(m.Label)+1)
  232. for _, p := range m.Label {
  233. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  234. }
  235. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  236. smpl := &model.Sample{
  237. Metric: model.Metric(lset),
  238. Value: model.SampleValue(m.Untyped.GetValue()),
  239. }
  240. if m.TimestampMs != nil {
  241. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  242. } else {
  243. smpl.Timestamp = o.Timestamp
  244. }
  245. samples = append(samples, smpl)
  246. }
  247. return samples
  248. }
  249. func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  250. samples := make(model.Vector, 0, len(f.Metric))
  251. for _, m := range f.Metric {
  252. if m.Summary == nil {
  253. continue
  254. }
  255. timestamp := o.Timestamp
  256. if m.TimestampMs != nil {
  257. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  258. }
  259. for _, q := range m.Summary.Quantile {
  260. lset := make(model.LabelSet, len(m.Label)+2)
  261. for _, p := range m.Label {
  262. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  263. }
  264. // BUG(matt): Update other names to "quantile".
  265. lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
  266. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  267. samples = append(samples, &model.Sample{
  268. Metric: model.Metric(lset),
  269. Value: model.SampleValue(q.GetValue()),
  270. Timestamp: timestamp,
  271. })
  272. }
  273. lset := make(model.LabelSet, len(m.Label)+1)
  274. for _, p := range m.Label {
  275. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  276. }
  277. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  278. samples = append(samples, &model.Sample{
  279. Metric: model.Metric(lset),
  280. Value: model.SampleValue(m.Summary.GetSampleSum()),
  281. Timestamp: timestamp,
  282. })
  283. lset = make(model.LabelSet, len(m.Label)+1)
  284. for _, p := range m.Label {
  285. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  286. }
  287. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  288. samples = append(samples, &model.Sample{
  289. Metric: model.Metric(lset),
  290. Value: model.SampleValue(m.Summary.GetSampleCount()),
  291. Timestamp: timestamp,
  292. })
  293. }
  294. return samples
  295. }
  296. func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  297. samples := make(model.Vector, 0, len(f.Metric))
  298. for _, m := range f.Metric {
  299. if m.Histogram == nil {
  300. continue
  301. }
  302. timestamp := o.Timestamp
  303. if m.TimestampMs != nil {
  304. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  305. }
  306. infSeen := false
  307. for _, q := range m.Histogram.Bucket {
  308. lset := make(model.LabelSet, len(m.Label)+2)
  309. for _, p := range m.Label {
  310. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  311. }
  312. lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
  313. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  314. if math.IsInf(q.GetUpperBound(), +1) {
  315. infSeen = true
  316. }
  317. samples = append(samples, &model.Sample{
  318. Metric: model.Metric(lset),
  319. Value: model.SampleValue(q.GetCumulativeCount()),
  320. Timestamp: timestamp,
  321. })
  322. }
  323. lset := make(model.LabelSet, len(m.Label)+1)
  324. for _, p := range m.Label {
  325. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  326. }
  327. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  328. samples = append(samples, &model.Sample{
  329. Metric: model.Metric(lset),
  330. Value: model.SampleValue(m.Histogram.GetSampleSum()),
  331. Timestamp: timestamp,
  332. })
  333. lset = make(model.LabelSet, len(m.Label)+1)
  334. for _, p := range m.Label {
  335. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  336. }
  337. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  338. count := &model.Sample{
  339. Metric: model.Metric(lset),
  340. Value: model.SampleValue(m.Histogram.GetSampleCount()),
  341. Timestamp: timestamp,
  342. }
  343. samples = append(samples, count)
  344. if !infSeen {
  345. // Append an infinity bucket sample.
  346. lset := make(model.LabelSet, len(m.Label)+2)
  347. for _, p := range m.Label {
  348. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  349. }
  350. lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
  351. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  352. samples = append(samples, &model.Sample{
  353. Metric: model.Metric(lset),
  354. Value: count.Value,
  355. Timestamp: timestamp,
  356. })
  357. }
  358. }
  359. return samples
  360. }