decode.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // Copyright 2015 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package expfmt
  14. import (
  15. "fmt"
  16. "io"
  17. "math"
  18. "mime"
  19. "net/http"
  20. dto "github.com/prometheus/client_model/go"
  21. "github.com/matttproud/golang_protobuf_extensions/pbutil"
  22. "github.com/prometheus/common/model"
  23. )
  24. // Decoder types decode an input stream into metric families.
  25. type Decoder interface {
  26. Decode(*dto.MetricFamily) error
  27. }
  28. // DecodeOptions contains options used by the Decoder and in sample extraction.
  29. type DecodeOptions struct {
  30. // Timestamp is added to each value from the stream that has no explicit timestamp set.
  31. Timestamp model.Time
  32. }
  33. // ResponseFormat extracts the correct format from a HTTP response header.
  34. // If no matching format can be found FormatUnknown is returned.
  35. func ResponseFormat(h http.Header) Format {
  36. ct := h.Get(hdrContentType)
  37. mediatype, params, err := mime.ParseMediaType(ct)
  38. if err != nil {
  39. return FmtUnknown
  40. }
  41. const textType = "text/plain"
  42. switch mediatype {
  43. case ProtoType:
  44. if p, ok := params["proto"]; ok && p != ProtoProtocol {
  45. return FmtUnknown
  46. }
  47. if e, ok := params["encoding"]; ok && e != "delimited" {
  48. return FmtUnknown
  49. }
  50. return FmtProtoDelim
  51. case textType:
  52. if v, ok := params["version"]; ok && v != TextVersion {
  53. return FmtUnknown
  54. }
  55. return FmtText
  56. }
  57. return FmtUnknown
  58. }
  59. // NewDecoder returns a new decoder based on the given input format.
  60. // If the input format does not imply otherwise, a text format decoder is returned.
  61. func NewDecoder(r io.Reader, format Format) Decoder {
  62. switch format {
  63. case FmtProtoDelim:
  64. return &protoDecoder{r: r}
  65. }
  66. return &textDecoder{r: r}
  67. }
  68. // protoDecoder implements the Decoder interface for protocol buffers.
  69. type protoDecoder struct {
  70. r io.Reader
  71. }
  72. // Decode implements the Decoder interface.
  73. func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
  74. _, err := pbutil.ReadDelimited(d.r, v)
  75. if err != nil {
  76. return err
  77. }
  78. if !model.IsValidMetricName(model.LabelValue(v.GetName())) {
  79. return fmt.Errorf("invalid metric name %q", v.GetName())
  80. }
  81. for _, m := range v.GetMetric() {
  82. if m == nil {
  83. continue
  84. }
  85. for _, l := range m.GetLabel() {
  86. if l == nil {
  87. continue
  88. }
  89. if !model.LabelValue(l.GetValue()).IsValid() {
  90. return fmt.Errorf("invalid label value %q", l.GetValue())
  91. }
  92. if !model.LabelName(l.GetName()).IsValid() {
  93. return fmt.Errorf("invalid label name %q", l.GetName())
  94. }
  95. }
  96. }
  97. return nil
  98. }
  99. // textDecoder implements the Decoder interface for the text protocol.
  100. type textDecoder struct {
  101. r io.Reader
  102. fams map[string]*dto.MetricFamily
  103. err error
  104. }
  105. // Decode implements the Decoder interface.
  106. func (d *textDecoder) Decode(v *dto.MetricFamily) error {
  107. if d.err == nil {
  108. // Read all metrics in one shot.
  109. var p TextParser
  110. d.fams, d.err = p.TextToMetricFamilies(d.r)
  111. // If we don't get an error, store io.EOF for the end.
  112. if d.err == nil {
  113. d.err = io.EOF
  114. }
  115. }
  116. // Pick off one MetricFamily per Decode until there's nothing left.
  117. for key, fam := range d.fams {
  118. *v = *fam
  119. delete(d.fams, key)
  120. return nil
  121. }
  122. return d.err
  123. }
  124. // SampleDecoder wraps a Decoder to extract samples from the metric families
  125. // decoded by the wrapped Decoder.
  126. type SampleDecoder struct {
  127. Dec Decoder
  128. Opts *DecodeOptions
  129. f dto.MetricFamily
  130. }
  131. // Decode calls the Decode method of the wrapped Decoder and then extracts the
  132. // samples from the decoded MetricFamily into the provided model.Vector.
  133. func (sd *SampleDecoder) Decode(s *model.Vector) error {
  134. err := sd.Dec.Decode(&sd.f)
  135. if err != nil {
  136. return err
  137. }
  138. *s, err = extractSamples(&sd.f, sd.Opts)
  139. return err
  140. }
  141. // ExtractSamples builds a slice of samples from the provided metric
  142. // families. If an error occurs during sample extraction, it continues to
  143. // extract from the remaining metric families. The returned error is the last
  144. // error that has occurred.
  145. func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) (model.Vector, error) {
  146. var (
  147. all model.Vector
  148. lastErr error
  149. )
  150. for _, f := range fams {
  151. some, err := extractSamples(f, o)
  152. if err != nil {
  153. lastErr = err
  154. continue
  155. }
  156. all = append(all, some...)
  157. }
  158. return all, lastErr
  159. }
  160. func extractSamples(f *dto.MetricFamily, o *DecodeOptions) (model.Vector, error) {
  161. switch f.GetType() {
  162. case dto.MetricType_COUNTER:
  163. return extractCounter(o, f), nil
  164. case dto.MetricType_GAUGE:
  165. return extractGauge(o, f), nil
  166. case dto.MetricType_SUMMARY:
  167. return extractSummary(o, f), nil
  168. case dto.MetricType_UNTYPED:
  169. return extractUntyped(o, f), nil
  170. case dto.MetricType_HISTOGRAM:
  171. return extractHistogram(o, f), nil
  172. }
  173. return nil, fmt.Errorf("expfmt.extractSamples: unknown metric family type %v", f.GetType())
  174. }
  175. func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  176. samples := make(model.Vector, 0, len(f.Metric))
  177. for _, m := range f.Metric {
  178. if m.Counter == nil {
  179. continue
  180. }
  181. lset := make(model.LabelSet, len(m.Label)+1)
  182. for _, p := range m.Label {
  183. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  184. }
  185. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  186. smpl := &model.Sample{
  187. Metric: model.Metric(lset),
  188. Value: model.SampleValue(m.Counter.GetValue()),
  189. }
  190. if m.TimestampMs != nil {
  191. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  192. } else {
  193. smpl.Timestamp = o.Timestamp
  194. }
  195. samples = append(samples, smpl)
  196. }
  197. return samples
  198. }
  199. func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  200. samples := make(model.Vector, 0, len(f.Metric))
  201. for _, m := range f.Metric {
  202. if m.Gauge == nil {
  203. continue
  204. }
  205. lset := make(model.LabelSet, len(m.Label)+1)
  206. for _, p := range m.Label {
  207. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  208. }
  209. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  210. smpl := &model.Sample{
  211. Metric: model.Metric(lset),
  212. Value: model.SampleValue(m.Gauge.GetValue()),
  213. }
  214. if m.TimestampMs != nil {
  215. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  216. } else {
  217. smpl.Timestamp = o.Timestamp
  218. }
  219. samples = append(samples, smpl)
  220. }
  221. return samples
  222. }
  223. func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  224. samples := make(model.Vector, 0, len(f.Metric))
  225. for _, m := range f.Metric {
  226. if m.Untyped == nil {
  227. continue
  228. }
  229. lset := make(model.LabelSet, len(m.Label)+1)
  230. for _, p := range m.Label {
  231. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  232. }
  233. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  234. smpl := &model.Sample{
  235. Metric: model.Metric(lset),
  236. Value: model.SampleValue(m.Untyped.GetValue()),
  237. }
  238. if m.TimestampMs != nil {
  239. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  240. } else {
  241. smpl.Timestamp = o.Timestamp
  242. }
  243. samples = append(samples, smpl)
  244. }
  245. return samples
  246. }
  247. func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  248. samples := make(model.Vector, 0, len(f.Metric))
  249. for _, m := range f.Metric {
  250. if m.Summary == nil {
  251. continue
  252. }
  253. timestamp := o.Timestamp
  254. if m.TimestampMs != nil {
  255. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  256. }
  257. for _, q := range m.Summary.Quantile {
  258. lset := make(model.LabelSet, len(m.Label)+2)
  259. for _, p := range m.Label {
  260. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  261. }
  262. // BUG(matt): Update other names to "quantile".
  263. lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
  264. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  265. samples = append(samples, &model.Sample{
  266. Metric: model.Metric(lset),
  267. Value: model.SampleValue(q.GetValue()),
  268. Timestamp: timestamp,
  269. })
  270. }
  271. lset := make(model.LabelSet, len(m.Label)+1)
  272. for _, p := range m.Label {
  273. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  274. }
  275. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  276. samples = append(samples, &model.Sample{
  277. Metric: model.Metric(lset),
  278. Value: model.SampleValue(m.Summary.GetSampleSum()),
  279. Timestamp: timestamp,
  280. })
  281. lset = make(model.LabelSet, len(m.Label)+1)
  282. for _, p := range m.Label {
  283. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  284. }
  285. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  286. samples = append(samples, &model.Sample{
  287. Metric: model.Metric(lset),
  288. Value: model.SampleValue(m.Summary.GetSampleCount()),
  289. Timestamp: timestamp,
  290. })
  291. }
  292. return samples
  293. }
  294. func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  295. samples := make(model.Vector, 0, len(f.Metric))
  296. for _, m := range f.Metric {
  297. if m.Histogram == nil {
  298. continue
  299. }
  300. timestamp := o.Timestamp
  301. if m.TimestampMs != nil {
  302. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  303. }
  304. infSeen := false
  305. for _, q := range m.Histogram.Bucket {
  306. lset := make(model.LabelSet, len(m.Label)+2)
  307. for _, p := range m.Label {
  308. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  309. }
  310. lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
  311. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  312. if math.IsInf(q.GetUpperBound(), +1) {
  313. infSeen = true
  314. }
  315. samples = append(samples, &model.Sample{
  316. Metric: model.Metric(lset),
  317. Value: model.SampleValue(q.GetCumulativeCount()),
  318. Timestamp: timestamp,
  319. })
  320. }
  321. lset := make(model.LabelSet, len(m.Label)+1)
  322. for _, p := range m.Label {
  323. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  324. }
  325. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  326. samples = append(samples, &model.Sample{
  327. Metric: model.Metric(lset),
  328. Value: model.SampleValue(m.Histogram.GetSampleSum()),
  329. Timestamp: timestamp,
  330. })
  331. lset = make(model.LabelSet, len(m.Label)+1)
  332. for _, p := range m.Label {
  333. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  334. }
  335. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  336. count := &model.Sample{
  337. Metric: model.Metric(lset),
  338. Value: model.SampleValue(m.Histogram.GetSampleCount()),
  339. Timestamp: timestamp,
  340. }
  341. samples = append(samples, count)
  342. if !infSeen {
  343. // Append an infinity bucket sample.
  344. lset := make(model.LabelSet, len(m.Label)+2)
  345. for _, p := range m.Label {
  346. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  347. }
  348. lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
  349. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  350. samples = append(samples, &model.Sample{
  351. Metric: model.Metric(lset),
  352. Value: count.Value,
  353. Timestamp: timestamp,
  354. })
  355. }
  356. }
  357. return samples
  358. }