diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index 62c795d998..56a630643e 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -242,6 +242,15 @@
io.prometheus
simpleclient_common
+
+ io.prometheus
+ simpleclient_pushgateway
+
+
+ org.xerial.snappy
+ snappy-java
+ 1.1.9.1
+
org.codehaus.groovy
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
index e1217197e0..0b6e116c14 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
@@ -43,7 +43,7 @@ public class ClustersProperties {
KsqldbServerAuth ksqldbServerAuth;
KeystoreConfig ksqldbServerSsl;
List kafkaConnect;
- MetricsConfigData metrics;
+ MetricsConfig metrics;
Map properties;
boolean readOnly = false;
List serde;
@@ -66,7 +66,7 @@ public class ClustersProperties {
@Data
@ToString(exclude = {"password", "keystorePassword"})
- public static class MetricsConfigData {
+ public static class MetricsConfig {
String type;
Integer port;
Boolean ssl;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java
index b2f1f945a3..8811ef29b4 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java
@@ -12,13 +12,16 @@ import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetricsScr
import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsRetriever;
import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsScraper;
import com.provectus.kafka.ui.service.metrics.scrape.prometheus.PrometheusScraper;
+import com.provectus.kafka.ui.service.metrics.sink.MetricsSink;
import jakarta.annotation.Nullable;
import java.util.Collection;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.Node;
import reactor.core.publisher.Mono;
+@Slf4j
@RequiredArgsConstructor
public class MetricsScrapping {
@@ -30,10 +33,13 @@ public class MetricsScrapping {
@Nullable
private final PrometheusScraper prometheusScraper;
+ private final MetricsSink sink;
+
public static MetricsScrapping create(Cluster cluster,
JmxMetricsRetriever jmxMetricsRetriever) {
JmxMetricsScraper jmxMetricsScraper = null;
PrometheusScraper prometheusScraper = null;
+ MetricsSink sink = MetricsSink.noop();
var metrics = cluster.getMetrics();
if (cluster.getMetrics() != null) {
@@ -43,8 +49,14 @@ public class MetricsScrapping {
} else if (metrics.getType().equalsIgnoreCase(PROMETHEUS_METRICS_TYPE)) {
prometheusScraper = new PrometheusScraper(scrapeProperties);
}
+ sink = MetricsSink.create(cluster.getMetrics());
}
- return new MetricsScrapping(new InferredMetricsScraper(), jmxMetricsScraper, prometheusScraper);
+ return new MetricsScrapping(
+ new InferredMetricsScraper(),
+ jmxMetricsScraper,
+ prometheusScraper,
+ sink
+ );
}
private static MetricsScrapeProperties createScrapeProps(Cluster cluster) {
@@ -73,6 +85,13 @@ public class MetricsScrapping {
.ioRates(ext.ioRates())
.perBrokerScrapedMetrics(ext.perBrokerMetrics())
.build()
+ ).flatMap(metrics ->
+ sink.send(metrics.getSummarizedMetrics())
+ .onErrorResume(th -> {
+ log.warn("Error sending metrics to metrics sink", th);
+ return Mono.empty();
+ }
+ ).thenReturn(metrics)
);
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java
new file mode 100644
index 0000000000..841787b9a1
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java
@@ -0,0 +1,21 @@
+package com.provectus.kafka.ui.service.metrics.sink;
+
+import static io.prometheus.client.Collector.MetricFamilySamples;
+
+import com.provectus.kafka.ui.config.ClustersProperties;
+import java.util.stream.Stream;
+import reactor.core.publisher.Mono;
+
+public interface MetricsSink {
+
+ static MetricsSink noop() {
+ return m -> Mono.empty();
+ }
+
+ static MetricsSink create(ClustersProperties.MetricsConfig metricsConfig) {
+
+ }
+
+ Mono send(Stream metrics);
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusPushGatewaySink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusPushGatewaySink.java
new file mode 100644
index 0000000000..ae50b5e8a7
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusPushGatewaySink.java
@@ -0,0 +1,65 @@
+package com.provectus.kafka.ui.service.metrics.sink;
+
+import static io.prometheus.client.Collector.MetricFamilySamples;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory;
+import io.prometheus.client.exporter.PushGateway;
+import jakarta.annotation.Nullable;
+import java.net.URL;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+@RequiredArgsConstructor
+class PrometheusPushGatewaySink implements MetricsSink {
+
+ private final static String DEFAULT_PGW_JOBNAME = "kafkaui";
+
+ private final PushGateway pushGateway;
+ private final String job;
+ //TODO: read about grouping rules
+
+ @SneakyThrows
+ static PrometheusPushGatewaySink create(String url,
+ @Nullable String job,
+ @Nullable String username,
+ @Nullable String passw) {
+ var pushGateway = new PushGateway(new URL(url));
+ if (StringUtils.hasText(username) && StringUtils.hasText(passw)) {
+ pushGateway.setConnectionFactory(new BasicAuthHttpConnectionFactory(username, passw));
+ }
+ return new PrometheusPushGatewaySink(
+ pushGateway,
+ Optional.ofNullable(job).orElse(DEFAULT_PGW_JOBNAME)
+ );
+ }
+
+ @Override
+ public Mono send(Stream metrics) {
+ return Mono.fromRunnable(() -> pushSync(metrics.toList()))
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ @SneakyThrows
+ private void pushSync(List metricsToPush) {
+ if (metricsToPush.isEmpty()) {
+ return;
+ }
+ pushGateway.push(
+ new Collector() {
+ @Override
+ public List collect() {
+ return metricsToPush;
+ }
+ },
+ job
+ );
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java
new file mode 100644
index 0000000000..bf767de891
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java
@@ -0,0 +1,121 @@
+package com.provectus.kafka.ui.service.metrics.sink;
+
+import static io.prometheus.client.Collector.*;
+import static prometheus.Types.*;
+
+import com.google.common.base.Enums;
+import com.provectus.kafka.ui.util.WebClientConfigurator;
+import groovy.lang.Tuple;
+import io.prometheus.client.Collector;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import org.springframework.http.MediaType;
+import org.springframework.util.unit.DataSize;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.xerial.snappy.Snappy;
+import prometheus.Remote;
+import prometheus.Types;
+import prometheus.Types.MetricMetadata.MetricType;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+@RequiredArgsConstructor
+class PrometheusRemoteWriteSink implements MetricsSink {
+
+ private final WebClient webClient;
+ private final String writeEndpoint;
+
+ PrometheusRemoteWriteSink(String prometheusUrl) {
+ this.writeEndpoint = prometheusUrl + "/api/v1/write";
+ this.webClient = new WebClientConfigurator().configureBufferSize(DataSize.ofMegabytes(20)).build();
+ }
+
+ @SneakyThrows
+ @Override
+ public Mono send(Stream metrics) {
+ byte[] bytesToWrite = Snappy.compress(createWriteRequest(metrics).toByteArray());
+ return webClient.post()
+ .uri(writeEndpoint)
+ //.contentType(MediaType.APPLICATION_FORM_URLENCODED)
+ .contentType(MediaType.parseMediaType("application/x-protobuf")) //???
+ .header("User-Agent", "promremote-kui/0.1.0")
+ .header("Content-Encoding", "snappy")
+ .header("X-Prometheus-Remote-Write-Version", "0.1.0")
+ .bodyValue(bytesToWrite)
+ .retrieve()
+ .toBodilessEntity()
+ .then();
+ }
+
+ private Remote.WriteRequest createWriteRequest(Stream metrics) {
+ var tsAndMeta = createTimeSeries(metrics);
+ return Remote.WriteRequest.newBuilder()
+ .addAllTimeseries(tsAndMeta.getT1())
+ .addAllMetadata(tsAndMeta.getT2())
+ .build();
+ }
+
+ public Tuple2, List> createTimeSeries(Stream metrics) {
+ long currentTs = System.currentTimeMillis();
+ List timeSeriesList = new ArrayList<>();
+ List metadatasList = new ArrayList<>();
+ metrics.forEach(mfs -> {
+ for (MetricFamilySamples.Sample sample : mfs.samples) {
+ TimeSeries.Builder timeSeriesBuilder = TimeSeries.newBuilder();
+ timeSeriesBuilder.addLabels(
+ Label.newBuilder()
+ .setName("__name__")
+ .setValue(escapedLabelValue(sample.name))
+ .build()
+ );
+ for (int i = 0; i < sample.labelNames.size(); i++) {
+ timeSeriesBuilder.addLabels(
+ Label.newBuilder()
+ .setName(sample.labelNames.get(i))
+ .setValue(escapedLabelValue(sample.labelValues.get(i)))
+ .build()
+ );
+ }
+ timeSeriesBuilder.addSamples(
+ Sample.newBuilder()
+ .setValue(sample.value)
+ .setTimestamp(currentTs)
+ .build()
+ );
+ timeSeriesList.add(timeSeriesBuilder.build());
+ metadatasList.add(
+ MetricMetadata.newBuilder()
+ .setType(Enums.getIfPresent(MetricType.class, mfs.type.toString()).or(MetricType.UNKNOWN))
+ .setHelp(mfs.help)
+ .setUnit(mfs.unit)
+ .build()
+ );
+ }
+ });
+ return Tuples.of(timeSeriesList, metadatasList);
+ }
+
+ private static String escapedLabelValue(String s) {
+ //TODO: refactor
+ StringWriter writer = new StringWriter(s.length());
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ switch (c) {
+ case '\\' -> writer.append("\\\\");
+ case '\"' -> writer.append("\\\"");
+ case '\n' -> writer.append("\\n");
+ default -> writer.append(c);
+ }
+ }
+ return writer.toString();
+ }
+
+}
diff --git a/kafka-ui-contract/pom.xml b/kafka-ui-contract/pom.xml
index 0d8e238368..b6b433f431 100644
--- a/kafka-ui-contract/pom.xml
+++ b/kafka-ui-contract/pom.xml
@@ -46,6 +46,11 @@
javax.annotation-api
1.3.2
+
+ com.google.protobuf
+ protobuf-java
+ 3.22.4
+
@@ -229,7 +234,40 @@
-
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.7.0
+
+
+ initialize
+
+ detect
+
+
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+
+ generate-sources
+
+ compile
+
+
+
+
+ false
+ ${project.basedir}/src/main/resources/proto
+
+ **/*.proto
+
+ com.google.protobuf:protoc:3.21.12:exe:${os.detected.classifier}
+
diff --git a/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/gogoproto/gogo.proto b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/gogoproto/gogo.proto
new file mode 100644
index 0000000000..2f0a3c76bd
--- /dev/null
+++ b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/gogoproto/gogo.proto
@@ -0,0 +1,133 @@
+// Protocol Buffers for Go with Gadgets
+//
+// Copyright (c) 2013, The GoGo Authors. All rights reserved.
+// http://github.com/gogo/protobuf
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto2";
+package gogoproto;
+
+import "google/protobuf/descriptor.proto";
+
+option java_package = "com.google.protobuf";
+option java_outer_classname = "GoGoProtos";
+option go_package = "github.com/gogo/protobuf/gogoproto";
+
+extend google.protobuf.EnumOptions {
+ optional bool goproto_enum_prefix = 62001;
+ optional bool goproto_enum_stringer = 62021;
+ optional bool enum_stringer = 62022;
+ optional string enum_customname = 62023;
+ optional bool enumdecl = 62024;
+}
+
+extend google.protobuf.EnumValueOptions {
+ optional string enumvalue_customname = 66001;
+}
+
+extend google.protobuf.FileOptions {
+ optional bool goproto_getters_all = 63001;
+ optional bool goproto_enum_prefix_all = 63002;
+ optional bool goproto_stringer_all = 63003;
+ optional bool verbose_equal_all = 63004;
+ optional bool face_all = 63005;
+ optional bool gostring_all = 63006;
+ optional bool populate_all = 63007;
+ optional bool stringer_all = 63008;
+ optional bool onlyone_all = 63009;
+
+ optional bool equal_all = 63013;
+ optional bool description_all = 63014;
+ optional bool testgen_all = 63015;
+ optional bool benchgen_all = 63016;
+ optional bool marshaler_all = 63017;
+ optional bool unmarshaler_all = 63018;
+ optional bool stable_marshaler_all = 63019;
+
+ optional bool sizer_all = 63020;
+
+ optional bool goproto_enum_stringer_all = 63021;
+ optional bool enum_stringer_all = 63022;
+
+ optional bool unsafe_marshaler_all = 63023;
+ optional bool unsafe_unmarshaler_all = 63024;
+
+ optional bool goproto_extensions_map_all = 63025;
+ optional bool goproto_unrecognized_all = 63026;
+ optional bool gogoproto_import = 63027;
+ optional bool protosizer_all = 63028;
+ optional bool compare_all = 63029;
+ optional bool typedecl_all = 63030;
+ optional bool enumdecl_all = 63031;
+
+ optional bool goproto_registration = 63032;
+}
+
+extend google.protobuf.MessageOptions {
+ optional bool goproto_getters = 64001;
+ optional bool goproto_stringer = 64003;
+ optional bool verbose_equal = 64004;
+ optional bool face = 64005;
+ optional bool gostring = 64006;
+ optional bool populate = 64007;
+ optional bool stringer = 67008;
+ optional bool onlyone = 64009;
+
+ optional bool equal = 64013;
+ optional bool description = 64014;
+ optional bool testgen = 64015;
+ optional bool benchgen = 64016;
+ optional bool marshaler = 64017;
+ optional bool unmarshaler = 64018;
+ optional bool stable_marshaler = 64019;
+
+ optional bool sizer = 64020;
+
+ optional bool unsafe_marshaler = 64023;
+ optional bool unsafe_unmarshaler = 64024;
+
+ optional bool goproto_extensions_map = 64025;
+ optional bool goproto_unrecognized = 64026;
+
+ optional bool protosizer = 64028;
+ optional bool compare = 64029;
+
+ optional bool typedecl = 64030;
+}
+
+extend google.protobuf.FieldOptions {
+ optional bool nullable = 65001;
+ optional bool embed = 65002;
+ optional string customtype = 65003;
+ optional string customname = 65004;
+ optional string jsontag = 65005;
+ optional string moretags = 65006;
+ optional string casttype = 65007;
+ optional string castkey = 65008;
+ optional string castvalue = 65009;
+
+ optional bool stdtime = 65010;
+ optional bool stdduration = 65011;
+}
diff --git a/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/remote.proto b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/remote.proto
new file mode 100644
index 0000000000..0c50ce8234
--- /dev/null
+++ b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/remote.proto
@@ -0,0 +1,88 @@
+// Copyright 2016 Prometheus Team
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+package prometheus;
+
+option go_package = "prompb";
+
+import "types.proto";
+import "gogoproto/gogo.proto";
+
+message WriteRequest {
+ repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
+ // Cortex uses this field to determine the source of the write request.
+ // We reserve it to avoid any compatibility issues.
+ reserved 2;
+ repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false];
+}
+
+// ReadRequest represents a remote read request.
+message ReadRequest {
+ repeated Query queries = 1;
+
+ enum ResponseType {
+ // Server will return a single ReadResponse message with matched series that includes list of raw samples.
+ // It's recommended to use streamed response types instead.
+ //
+ // Response headers:
+ // Content-Type: "application/x-protobuf"
+ // Content-Encoding: "snappy"
+ SAMPLES = 0;
+ // Server will stream a delimited ChunkedReadResponse message that
+ // contains XOR or HISTOGRAM(!) encoded chunks for a single series.
+ // Each message is following varint size and fixed size bigendian
+ // uint32 for CRC32 Castagnoli checksum.
+ //
+ // Response headers:
+ // Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
+ // Content-Encoding: ""
+ STREAMED_XOR_CHUNKS = 1;
+ }
+
+ // accepted_response_types allows negotiating the content type of the response.
+ //
+ // Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
+ // implemented by server, error is returned.
+ // For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
+ repeated ResponseType accepted_response_types = 2;
+}
+
+// ReadResponse is a response when response_type equals SAMPLES.
+message ReadResponse {
+ // In same order as the request's queries.
+ repeated QueryResult results = 1;
+}
+
+message Query {
+ int64 start_timestamp_ms = 1;
+ int64 end_timestamp_ms = 2;
+ repeated prometheus.LabelMatcher matchers = 3;
+ prometheus.ReadHints hints = 4;
+}
+
+message QueryResult {
+ // Samples within a time series must be ordered by time.
+ repeated prometheus.TimeSeries timeseries = 1;
+}
+
+// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS.
+// We strictly stream full series after series, optionally split by time. This means that a single frame can contain
+// partition of the single series, but once a new series is started to be streamed it means that no more chunks will
+// be sent for previous one. Series are returned sorted in the same way TSDB block are internally.
+message ChunkedReadResponse {
+ repeated prometheus.ChunkedSeries chunked_series = 1;
+
+ // query_index represents an index of the query from ReadRequest.queries these chunks relates to.
+ int64 query_index = 2;
+}
diff --git a/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/types.proto b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/types.proto
new file mode 100644
index 0000000000..69bec49549
--- /dev/null
+++ b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/types.proto
@@ -0,0 +1,187 @@
+// Copyright 2017 Prometheus Team
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+package prometheus;
+
+option go_package = "prompb";
+
+import "gogoproto/gogo.proto";
+
+message MetricMetadata {
+ enum MetricType {
+ UNKNOWN = 0;
+ COUNTER = 1;
+ GAUGE = 2;
+ HISTOGRAM = 3;
+ GAUGEHISTOGRAM = 4;
+ SUMMARY = 5;
+ INFO = 6;
+ STATESET = 7;
+ }
+
+ // Represents the metric type, these match the set from Prometheus.
+ // Refer to model/textparse/interface.go for details.
+ MetricType type = 1;
+ string metric_family_name = 2;
+ string help = 4;
+ string unit = 5;
+}
+
+message Sample {
+ double value = 1;
+ // timestamp is in ms format, see model/timestamp/timestamp.go for
+ // conversion from time.Time to Prometheus timestamp.
+ int64 timestamp = 2;
+}
+
+message Exemplar {
+ // Optional, can be empty.
+ repeated Label labels = 1 [(gogoproto.nullable) = false];
+ double value = 2;
+ // timestamp is in ms format, see model/timestamp/timestamp.go for
+ // conversion from time.Time to Prometheus timestamp.
+ int64 timestamp = 3;
+}
+
+// A native histogram, also known as a sparse histogram.
+// Original design doc:
+// https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit
+// The appendix of this design doc also explains the concept of float
+// histograms. This Histogram message can represent both, the usual
+// integer histogram as well as a float histogram.
+message Histogram {
+ enum ResetHint {
+ UNKNOWN = 0; // Need to test for a counter reset explicitly.
+ YES = 1; // This is the 1st histogram after a counter reset.
+ NO = 2; // There was no counter reset between this and the previous Histogram.
+ GAUGE = 3; // This is a gauge histogram where counter resets don't happen.
+ }
+
+ oneof count { // Count of observations in the histogram.
+ uint64 count_int = 1;
+ double count_float = 2;
+ }
+ double sum = 3; // Sum of observations in the histogram.
+ // The schema defines the bucket schema. Currently, valid numbers
+ // are -4 <= n <= 8. They are all for base-2 bucket schemas, where 1
+ // is a bucket boundary in each case, and then each power of two is
+ // divided into 2^n logarithmic buckets. Or in other words, each
+ // bucket boundary is the previous boundary times 2^(2^-n). In the
+ // future, more bucket schemas may be added using numbers < -4 or >
+ // 8.
+ sint32 schema = 4;
+ double zero_threshold = 5; // Breadth of the zero bucket.
+ oneof zero_count { // Count in zero bucket.
+ uint64 zero_count_int = 6;
+ double zero_count_float = 7;
+ }
+
+ // Negative Buckets.
+ repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false];
+ // Use either "negative_deltas" or "negative_counts", the former for
+ // regular histograms with integer counts, the latter for float
+ // histograms.
+ repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
+ repeated double negative_counts = 10; // Absolute count of each bucket.
+
+ // Positive Buckets.
+ repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false];
+ // Use either "positive_deltas" or "positive_counts", the former for
+ // regular histograms with integer counts, the latter for float
+ // histograms.
+ repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
+ repeated double positive_counts = 13; // Absolute count of each bucket.
+
+ ResetHint reset_hint = 14;
+ // timestamp is in ms format, see model/timestamp/timestamp.go for
+ // conversion from time.Time to Prometheus timestamp.
+ int64 timestamp = 15;
+}
+
+// A BucketSpan defines a number of consecutive buckets with their
+// offset. Logically, it would be more straightforward to include the
+// bucket counts in the Span. However, the protobuf representation is
+// more compact in the way the data is structured here (with all the
+// buckets in a single array separate from the Spans).
+message BucketSpan {
+ sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
+ uint32 length = 2; // Length of consecutive buckets.
+}
+
+// TimeSeries represents samples and labels for a single time series.
+message TimeSeries {
+ // For a timeseries to be valid, and for the samples and exemplars
+ // to be ingested by the remote system properly, the labels field is required.
+ repeated Label labels = 1 [(gogoproto.nullable) = false];
+ repeated Sample samples = 2 [(gogoproto.nullable) = false];
+ repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
+ repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
+}
+
+message Label {
+ string name = 1;
+ string value = 2;
+}
+
+message Labels {
+ repeated Label labels = 1 [(gogoproto.nullable) = false];
+}
+
+// Matcher specifies a rule, which can match or set of labels or not.
+message LabelMatcher {
+ enum Type {
+ EQ = 0;
+ NEQ = 1;
+ RE = 2;
+ NRE = 3;
+ }
+ Type type = 1;
+ string name = 2;
+ string value = 3;
+}
+
+message ReadHints {
+ int64 step_ms = 1; // Query step size in milliseconds.
+ string func = 2; // String representation of surrounding function or aggregation.
+ int64 start_ms = 3; // Start time in milliseconds.
+ int64 end_ms = 4; // End time in milliseconds.
+ repeated string grouping = 5; // List of label names used in aggregation.
+ bool by = 6; // Indicate whether it is without or by.
+ int64 range_ms = 7; // Range vector selector range in milliseconds.
+}
+
+// Chunk represents a TSDB chunk.
+// Time range [min, max] is inclusive.
+message Chunk {
+ int64 min_time_ms = 1;
+ int64 max_time_ms = 2;
+
+ // We require this to match chunkenc.Encoding.
+ enum Encoding {
+ UNKNOWN = 0;
+ XOR = 1;
+ HISTOGRAM = 2;
+ FLOAT_HISTOGRAM = 3;
+ }
+ Encoding type = 3;
+ bytes data = 4;
+}
+
+// ChunkedSeries represents single, encoded time series.
+message ChunkedSeries {
+ // Labels should be sorted.
+ repeated Label labels = 1 [(gogoproto.nullable) = false];
+ // Chunks will be in start time order and may overlap.
+ repeated Chunk chunks = 2 [(gogoproto.nullable) = false];
+}