decode.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. // Copyright 2015 The Prometheus Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package expfmt
  14. import (
  15. "fmt"
  16. "io"
  17. "math"
  18. "mime"
  19. "net/http"
  20. dto "github.com/prometheus/client_model/go"
  21. "github.com/matttproud/golang_protobuf_extensions/pbutil"
  22. "github.com/prometheus/common/model"
  23. )
  24. // Decoder types decode an input stream into metric families.
  25. type Decoder interface {
  26. Decode(*dto.MetricFamily) error
  27. }
  28. type DecodeOptions struct {
  29. // Timestamp is added to each value from the stream that has no explicit timestamp set.
  30. Timestamp model.Time
  31. }
  32. // ResponseFormat extracts the correct format from a HTTP response header.
  33. // If no matching format can be found FormatUnknown is returned.
  34. func ResponseFormat(h http.Header) Format {
  35. ct := h.Get(hdrContentType)
  36. mediatype, params, err := mime.ParseMediaType(ct)
  37. if err != nil {
  38. return FmtUnknown
  39. }
  40. const textType = "text/plain"
  41. switch mediatype {
  42. case ProtoType:
  43. if p, ok := params["proto"]; ok && p != ProtoProtocol {
  44. return FmtUnknown
  45. }
  46. if e, ok := params["encoding"]; ok && e != "delimited" {
  47. return FmtUnknown
  48. }
  49. return FmtProtoDelim
  50. case textType:
  51. if v, ok := params["version"]; ok && v != TextVersion {
  52. return FmtUnknown
  53. }
  54. return FmtText
  55. }
  56. return FmtUnknown
  57. }
  58. // NewDecoder returns a new decoder based on the given input format.
  59. // If the input format does not imply otherwise, a text format decoder is returned.
  60. func NewDecoder(r io.Reader, format Format) Decoder {
  61. switch format {
  62. case FmtProtoDelim:
  63. return &protoDecoder{r: r}
  64. }
  65. return &textDecoder{r: r}
  66. }
  67. // protoDecoder implements the Decoder interface for protocol buffers.
  68. type protoDecoder struct {
  69. r io.Reader
  70. }
  71. // Decode implements the Decoder interface.
  72. func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
  73. _, err := pbutil.ReadDelimited(d.r, v)
  74. if err != nil {
  75. return err
  76. }
  77. if !model.IsValidMetricName(model.LabelValue(v.GetName())) {
  78. return fmt.Errorf("invalid metric name %q", v.GetName())
  79. }
  80. for _, m := range v.GetMetric() {
  81. if m == nil {
  82. continue
  83. }
  84. for _, l := range m.GetLabel() {
  85. if l == nil {
  86. continue
  87. }
  88. if !model.LabelValue(l.GetValue()).IsValid() {
  89. return fmt.Errorf("invalid label value %q", l.GetValue())
  90. }
  91. if !model.LabelName(l.GetName()).IsValid() {
  92. return fmt.Errorf("invalid label name %q", l.GetName())
  93. }
  94. }
  95. }
  96. return nil
  97. }
  98. // textDecoder implements the Decoder interface for the text protocol.
  99. type textDecoder struct {
  100. r io.Reader
  101. p TextParser
  102. fams []*dto.MetricFamily
  103. }
  104. // Decode implements the Decoder interface.
  105. func (d *textDecoder) Decode(v *dto.MetricFamily) error {
  106. // TODO(fabxc): Wrap this as a line reader to make streaming safer.
  107. if len(d.fams) == 0 {
  108. // No cached metric families, read everything and parse metrics.
  109. fams, err := d.p.TextToMetricFamilies(d.r)
  110. if err != nil {
  111. return err
  112. }
  113. if len(fams) == 0 {
  114. return io.EOF
  115. }
  116. d.fams = make([]*dto.MetricFamily, 0, len(fams))
  117. for _, f := range fams {
  118. d.fams = append(d.fams, f)
  119. }
  120. }
  121. *v = *d.fams[0]
  122. d.fams = d.fams[1:]
  123. return nil
  124. }
  125. type SampleDecoder struct {
  126. Dec Decoder
  127. Opts *DecodeOptions
  128. f dto.MetricFamily
  129. }
  130. func (sd *SampleDecoder) Decode(s *model.Vector) error {
  131. if err := sd.Dec.Decode(&sd.f); err != nil {
  132. return err
  133. }
  134. *s = extractSamples(&sd.f, sd.Opts)
  135. return nil
  136. }
  137. // Extract samples builds a slice of samples from the provided metric families.
  138. func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) model.Vector {
  139. var all model.Vector
  140. for _, f := range fams {
  141. all = append(all, extractSamples(f, o)...)
  142. }
  143. return all
  144. }
  145. func extractSamples(f *dto.MetricFamily, o *DecodeOptions) model.Vector {
  146. switch f.GetType() {
  147. case dto.MetricType_COUNTER:
  148. return extractCounter(o, f)
  149. case dto.MetricType_GAUGE:
  150. return extractGauge(o, f)
  151. case dto.MetricType_SUMMARY:
  152. return extractSummary(o, f)
  153. case dto.MetricType_UNTYPED:
  154. return extractUntyped(o, f)
  155. case dto.MetricType_HISTOGRAM:
  156. return extractHistogram(o, f)
  157. }
  158. panic("expfmt.extractSamples: unknown metric family type")
  159. }
  160. func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  161. samples := make(model.Vector, 0, len(f.Metric))
  162. for _, m := range f.Metric {
  163. if m.Counter == nil {
  164. continue
  165. }
  166. lset := make(model.LabelSet, len(m.Label)+1)
  167. for _, p := range m.Label {
  168. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  169. }
  170. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  171. smpl := &model.Sample{
  172. Metric: model.Metric(lset),
  173. Value: model.SampleValue(m.Counter.GetValue()),
  174. }
  175. if m.TimestampMs != nil {
  176. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  177. } else {
  178. smpl.Timestamp = o.Timestamp
  179. }
  180. samples = append(samples, smpl)
  181. }
  182. return samples
  183. }
  184. func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  185. samples := make(model.Vector, 0, len(f.Metric))
  186. for _, m := range f.Metric {
  187. if m.Gauge == nil {
  188. continue
  189. }
  190. lset := make(model.LabelSet, len(m.Label)+1)
  191. for _, p := range m.Label {
  192. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  193. }
  194. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  195. smpl := &model.Sample{
  196. Metric: model.Metric(lset),
  197. Value: model.SampleValue(m.Gauge.GetValue()),
  198. }
  199. if m.TimestampMs != nil {
  200. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  201. } else {
  202. smpl.Timestamp = o.Timestamp
  203. }
  204. samples = append(samples, smpl)
  205. }
  206. return samples
  207. }
  208. func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  209. samples := make(model.Vector, 0, len(f.Metric))
  210. for _, m := range f.Metric {
  211. if m.Untyped == nil {
  212. continue
  213. }
  214. lset := make(model.LabelSet, len(m.Label)+1)
  215. for _, p := range m.Label {
  216. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  217. }
  218. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  219. smpl := &model.Sample{
  220. Metric: model.Metric(lset),
  221. Value: model.SampleValue(m.Untyped.GetValue()),
  222. }
  223. if m.TimestampMs != nil {
  224. smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  225. } else {
  226. smpl.Timestamp = o.Timestamp
  227. }
  228. samples = append(samples, smpl)
  229. }
  230. return samples
  231. }
  232. func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  233. samples := make(model.Vector, 0, len(f.Metric))
  234. for _, m := range f.Metric {
  235. if m.Summary == nil {
  236. continue
  237. }
  238. timestamp := o.Timestamp
  239. if m.TimestampMs != nil {
  240. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  241. }
  242. for _, q := range m.Summary.Quantile {
  243. lset := make(model.LabelSet, len(m.Label)+2)
  244. for _, p := range m.Label {
  245. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  246. }
  247. // BUG(matt): Update other names to "quantile".
  248. lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
  249. lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
  250. samples = append(samples, &model.Sample{
  251. Metric: model.Metric(lset),
  252. Value: model.SampleValue(q.GetValue()),
  253. Timestamp: timestamp,
  254. })
  255. }
  256. lset := make(model.LabelSet, len(m.Label)+1)
  257. for _, p := range m.Label {
  258. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  259. }
  260. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  261. samples = append(samples, &model.Sample{
  262. Metric: model.Metric(lset),
  263. Value: model.SampleValue(m.Summary.GetSampleSum()),
  264. Timestamp: timestamp,
  265. })
  266. lset = make(model.LabelSet, len(m.Label)+1)
  267. for _, p := range m.Label {
  268. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  269. }
  270. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  271. samples = append(samples, &model.Sample{
  272. Metric: model.Metric(lset),
  273. Value: model.SampleValue(m.Summary.GetSampleCount()),
  274. Timestamp: timestamp,
  275. })
  276. }
  277. return samples
  278. }
  279. func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
  280. samples := make(model.Vector, 0, len(f.Metric))
  281. for _, m := range f.Metric {
  282. if m.Histogram == nil {
  283. continue
  284. }
  285. timestamp := o.Timestamp
  286. if m.TimestampMs != nil {
  287. timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
  288. }
  289. infSeen := false
  290. for _, q := range m.Histogram.Bucket {
  291. lset := make(model.LabelSet, len(m.Label)+2)
  292. for _, p := range m.Label {
  293. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  294. }
  295. lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
  296. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  297. if math.IsInf(q.GetUpperBound(), +1) {
  298. infSeen = true
  299. }
  300. samples = append(samples, &model.Sample{
  301. Metric: model.Metric(lset),
  302. Value: model.SampleValue(q.GetCumulativeCount()),
  303. Timestamp: timestamp,
  304. })
  305. }
  306. lset := make(model.LabelSet, len(m.Label)+1)
  307. for _, p := range m.Label {
  308. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  309. }
  310. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
  311. samples = append(samples, &model.Sample{
  312. Metric: model.Metric(lset),
  313. Value: model.SampleValue(m.Histogram.GetSampleSum()),
  314. Timestamp: timestamp,
  315. })
  316. lset = make(model.LabelSet, len(m.Label)+1)
  317. for _, p := range m.Label {
  318. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  319. }
  320. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
  321. count := &model.Sample{
  322. Metric: model.Metric(lset),
  323. Value: model.SampleValue(m.Histogram.GetSampleCount()),
  324. Timestamp: timestamp,
  325. }
  326. samples = append(samples, count)
  327. if !infSeen {
  328. // Append an infinity bucket sample.
  329. lset := make(model.LabelSet, len(m.Label)+2)
  330. for _, p := range m.Label {
  331. lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
  332. }
  333. lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
  334. lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
  335. samples = append(samples, &model.Sample{
  336. Metric: model.Metric(lset),
  337. Value: count.Value,
  338. Timestamp: timestamp,
  339. })
  340. }
  341. }
  342. return samples
  343. }