From 0ec59e8cab33a611ca8ba3aae1655f59425787dc Mon Sep 17 00:00:00 2001 From: iliax Date: Thu, 13 Jul 2023 15:19:53 +0400 Subject: [PATCH] wip --- kafka-ui-api/pom.xml | 9 + .../kafka/ui/config/ClustersProperties.java | 4 +- .../metrics/scrape/MetricsScrapping.java | 21 +- .../ui/service/metrics/sink/MetricsSink.java | 21 ++ .../sink/PrometheusPushGatewaySink.java | 65 ++++++ .../sink/PrometheusRemoteWriteSink.java | 121 ++++++++++++ kafka-ui-contract/pom.xml | 40 +++- .../gogoproto/gogo.proto | 133 +++++++++++++ .../prometheus-remote-write-api/remote.proto | 88 +++++++++ .../prometheus-remote-write-api/types.proto | 187 ++++++++++++++++++ 10 files changed, 685 insertions(+), 4 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusPushGatewaySink.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java create mode 100644 kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/gogoproto/gogo.proto create mode 100644 kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/remote.proto create mode 100644 kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/types.proto diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 62c795d998..56a630643e 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -242,6 +242,15 @@ io.prometheus simpleclient_common + + io.prometheus + simpleclient_pushgateway + + + org.xerial.snappy + snappy-java + 1.1.9.1 + org.codehaus.groovy diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index e1217197e0..0b6e116c14 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -43,7 +43,7 @@ public class ClustersProperties { KsqldbServerAuth ksqldbServerAuth; KeystoreConfig ksqldbServerSsl; List kafkaConnect; - MetricsConfigData metrics; + MetricsConfig metrics; Map properties; boolean readOnly = false; List serde; @@ -66,7 +66,7 @@ public class ClustersProperties { @Data @ToString(exclude = {"password", "keystorePassword"}) - public static class MetricsConfigData { + public static class MetricsConfig { String type; Integer port; Boolean ssl; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java index b2f1f945a3..8811ef29b4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java @@ -12,13 +12,16 @@ import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetricsScr import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsRetriever; import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsScraper; import com.provectus.kafka.ui.service.metrics.scrape.prometheus.PrometheusScraper; +import com.provectus.kafka.ui.service.metrics.sink.MetricsSink; import jakarta.annotation.Nullable; import java.util.Collection; import java.util.Optional; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.Node; import reactor.core.publisher.Mono; +@Slf4j @RequiredArgsConstructor public class MetricsScrapping { @@ -30,10 +33,13 @@ public class MetricsScrapping { @Nullable private final PrometheusScraper prometheusScraper; + private final MetricsSink sink; + public static MetricsScrapping create(Cluster cluster, JmxMetricsRetriever jmxMetricsRetriever) { JmxMetricsScraper jmxMetricsScraper = null; PrometheusScraper prometheusScraper = null; + MetricsSink sink = MetricsSink.noop(); var metrics = cluster.getMetrics(); if (cluster.getMetrics() != null) { @@ -43,8 +49,14 @@ public class MetricsScrapping { } else if (metrics.getType().equalsIgnoreCase(PROMETHEUS_METRICS_TYPE)) { prometheusScraper = new PrometheusScraper(scrapeProperties); } + sink = MetricsSink.create(cluster.getMetrics()); } - return new MetricsScrapping(new InferredMetricsScraper(), jmxMetricsScraper, prometheusScraper); + return new MetricsScrapping( + new InferredMetricsScraper(), + jmxMetricsScraper, + prometheusScraper, + sink + ); } private static MetricsScrapeProperties createScrapeProps(Cluster cluster) { @@ -73,6 +85,13 @@ public class MetricsScrapping { .ioRates(ext.ioRates()) .perBrokerScrapedMetrics(ext.perBrokerMetrics()) .build() + ).flatMap(metrics -> + sink.send(metrics.getSummarizedMetrics()) + .onErrorResume(th -> { + log.warn("Error sending metrics to metrics sink", th); + return Mono.empty(); + } + ).thenReturn(metrics) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java new file mode 100644 index 0000000000..841787b9a1 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java @@ -0,0 +1,21 @@ +package com.provectus.kafka.ui.service.metrics.sink; + +import static io.prometheus.client.Collector.MetricFamilySamples; + +import com.provectus.kafka.ui.config.ClustersProperties; +import java.util.stream.Stream; +import reactor.core.publisher.Mono; + +public interface MetricsSink { + + static MetricsSink noop() { + return m -> Mono.empty(); + } + + static MetricsSink create(ClustersProperties.MetricsConfig metricsConfig) { + + } + + Mono send(Stream metrics); + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusPushGatewaySink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusPushGatewaySink.java new file mode 100644 index 0000000000..ae50b5e8a7 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusPushGatewaySink.java @@ -0,0 +1,65 @@ +package com.provectus.kafka.ui.service.metrics.sink; + +import static io.prometheus.client.Collector.MetricFamilySamples; + +import io.prometheus.client.Collector; +import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory; +import io.prometheus.client.exporter.PushGateway; +import jakarta.annotation.Nullable; +import java.net.URL; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +@RequiredArgsConstructor +class PrometheusPushGatewaySink implements MetricsSink { + + private final static String DEFAULT_PGW_JOBNAME = "kafkaui"; + + private final PushGateway pushGateway; + private final String job; + //TODO: read about grouping rules + + @SneakyThrows + static PrometheusPushGatewaySink create(String url, + @Nullable String job, + @Nullable String username, + @Nullable String passw) { + var pushGateway = new PushGateway(new URL(url)); + if (StringUtils.hasText(username) && StringUtils.hasText(passw)) { + pushGateway.setConnectionFactory(new BasicAuthHttpConnectionFactory(username, passw)); + } + return new PrometheusPushGatewaySink( + pushGateway, + Optional.ofNullable(job).orElse(DEFAULT_PGW_JOBNAME) + ); + } + + @Override + public Mono send(Stream metrics) { + return Mono.fromRunnable(() -> pushSync(metrics.toList())) + .subscribeOn(Schedulers.boundedElastic()); + } + + @SneakyThrows + private void pushSync(List metricsToPush) { + if (metricsToPush.isEmpty()) { + return; + } + pushGateway.push( + new Collector() { + @Override + public List collect() { + return metricsToPush; + } + }, + job + ); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java new file mode 100644 index 0000000000..bf767de891 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java @@ -0,0 +1,121 @@ +package com.provectus.kafka.ui.service.metrics.sink; + +import static io.prometheus.client.Collector.*; +import static prometheus.Types.*; + +import com.google.common.base.Enums; +import com.provectus.kafka.ui.util.WebClientConfigurator; +import groovy.lang.Tuple; +import io.prometheus.client.Collector; +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.springframework.http.MediaType; +import org.springframework.util.unit.DataSize; +import org.springframework.web.reactive.function.client.WebClient; +import org.xerial.snappy.Snappy; +import prometheus.Remote; +import prometheus.Types; +import prometheus.Types.MetricMetadata.MetricType; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +@RequiredArgsConstructor +class PrometheusRemoteWriteSink implements MetricsSink { + + private final WebClient webClient; + private final String writeEndpoint; + + PrometheusRemoteWriteSink(String prometheusUrl) { + this.writeEndpoint = prometheusUrl + "/api/v1/write"; + this.webClient = new WebClientConfigurator().configureBufferSize(DataSize.ofMegabytes(20)).build(); + } + + @SneakyThrows + @Override + public Mono send(Stream metrics) { + byte[] bytesToWrite = Snappy.compress(createWriteRequest(metrics).toByteArray()); + return webClient.post() + .uri(writeEndpoint) + //.contentType(MediaType.APPLICATION_FORM_URLENCODED) + .contentType(MediaType.parseMediaType("application/x-protobuf")) //??? + .header("User-Agent", "promremote-kui/0.1.0") + .header("Content-Encoding", "snappy") + .header("X-Prometheus-Remote-Write-Version", "0.1.0") + .bodyValue(bytesToWrite) + .retrieve() + .toBodilessEntity() + .then(); + } + + private Remote.WriteRequest createWriteRequest(Stream metrics) { + var tsAndMeta = createTimeSeries(metrics); + return Remote.WriteRequest.newBuilder() + .addAllTimeseries(tsAndMeta.getT1()) + .addAllMetadata(tsAndMeta.getT2()) + .build(); + } + + public Tuple2, List> createTimeSeries(Stream metrics) { + long currentTs = System.currentTimeMillis(); + List timeSeriesList = new ArrayList<>(); + List metadatasList = new ArrayList<>(); + metrics.forEach(mfs -> { + for (MetricFamilySamples.Sample sample : mfs.samples) { + TimeSeries.Builder timeSeriesBuilder = TimeSeries.newBuilder(); + timeSeriesBuilder.addLabels( + Label.newBuilder() + .setName("__name__") + .setValue(escapedLabelValue(sample.name)) + .build() + ); + for (int i = 0; i < sample.labelNames.size(); i++) { + timeSeriesBuilder.addLabels( + Label.newBuilder() + .setName(sample.labelNames.get(i)) + .setValue(escapedLabelValue(sample.labelValues.get(i))) + .build() + ); + } + timeSeriesBuilder.addSamples( + Sample.newBuilder() + .setValue(sample.value) + .setTimestamp(currentTs) + .build() + ); + timeSeriesList.add(timeSeriesBuilder.build()); + metadatasList.add( + MetricMetadata.newBuilder() + .setType(Enums.getIfPresent(MetricType.class, mfs.type.toString()).or(MetricType.UNKNOWN)) + .setHelp(mfs.help) + .setUnit(mfs.unit) + .build() + ); + } + }); + return Tuples.of(timeSeriesList, metadatasList); + } + + private static String escapedLabelValue(String s) { + //TODO: refactor + StringWriter writer = new StringWriter(s.length()); + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + switch (c) { + case '\\' -> writer.append("\\\\"); + case '\"' -> writer.append("\\\""); + case '\n' -> writer.append("\\n"); + default -> writer.append(c); + } + } + return writer.toString(); + } + +} diff --git a/kafka-ui-contract/pom.xml b/kafka-ui-contract/pom.xml index 0d8e238368..b6b433f431 100644 --- a/kafka-ui-contract/pom.xml +++ b/kafka-ui-contract/pom.xml @@ -46,6 +46,11 @@ javax.annotation-api 1.3.2 + + com.google.protobuf + protobuf-java + 3.22.4 + @@ -229,7 +234,40 @@ - + + + kr.motd.maven + os-maven-plugin + 1.7.0 + + + initialize + + detect + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + + generate-sources + + compile + + + + + false + ${project.basedir}/src/main/resources/proto + + **/*.proto + + com.google.protobuf:protoc:3.21.12:exe:${os.detected.classifier} + diff --git a/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/gogoproto/gogo.proto b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/gogoproto/gogo.proto new file mode 100644 index 0000000000..2f0a3c76bd --- /dev/null +++ b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/gogoproto/gogo.proto @@ -0,0 +1,133 @@ +// Protocol Buffers for Go with Gadgets +// +// Copyright (c) 2013, The GoGo Authors. All rights reserved. +// http://github.com/gogo/protobuf +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto2"; +package gogoproto; + +import "google/protobuf/descriptor.proto"; + +option java_package = "com.google.protobuf"; +option java_outer_classname = "GoGoProtos"; +option go_package = "github.com/gogo/protobuf/gogoproto"; + +extend google.protobuf.EnumOptions { + optional bool goproto_enum_prefix = 62001; + optional bool goproto_enum_stringer = 62021; + optional bool enum_stringer = 62022; + optional string enum_customname = 62023; + optional bool enumdecl = 62024; +} + +extend google.protobuf.EnumValueOptions { + optional string enumvalue_customname = 66001; +} + +extend google.protobuf.FileOptions { + optional bool goproto_getters_all = 63001; + optional bool goproto_enum_prefix_all = 63002; + optional bool goproto_stringer_all = 63003; + optional bool verbose_equal_all = 63004; + optional bool face_all = 63005; + optional bool gostring_all = 63006; + optional bool populate_all = 63007; + optional bool stringer_all = 63008; + optional bool onlyone_all = 63009; + + optional bool equal_all = 63013; + optional bool description_all = 63014; + optional bool testgen_all = 63015; + optional bool benchgen_all = 63016; + optional bool marshaler_all = 63017; + optional bool unmarshaler_all = 63018; + optional bool stable_marshaler_all = 63019; + + optional bool sizer_all = 63020; + + optional bool goproto_enum_stringer_all = 63021; + optional bool enum_stringer_all = 63022; + + optional bool unsafe_marshaler_all = 63023; + optional bool unsafe_unmarshaler_all = 63024; + + optional bool goproto_extensions_map_all = 63025; + optional bool goproto_unrecognized_all = 63026; + optional bool gogoproto_import = 63027; + optional bool protosizer_all = 63028; + optional bool compare_all = 63029; + optional bool typedecl_all = 63030; + optional bool enumdecl_all = 63031; + + optional bool goproto_registration = 63032; +} + +extend google.protobuf.MessageOptions { + optional bool goproto_getters = 64001; + optional bool goproto_stringer = 64003; + optional bool verbose_equal = 64004; + optional bool face = 64005; + optional bool gostring = 64006; + optional bool populate = 64007; + optional bool stringer = 67008; + optional bool onlyone = 64009; + + optional bool equal = 64013; + optional bool description = 64014; + optional bool testgen = 64015; + optional bool benchgen = 64016; + optional bool marshaler = 64017; + optional bool unmarshaler = 64018; + optional bool stable_marshaler = 64019; + + optional bool sizer = 64020; + + optional bool unsafe_marshaler = 64023; + optional bool unsafe_unmarshaler = 64024; + + optional bool goproto_extensions_map = 64025; + optional bool goproto_unrecognized = 64026; + + optional bool protosizer = 64028; + optional bool compare = 64029; + + optional bool typedecl = 64030; +} + +extend google.protobuf.FieldOptions { + optional bool nullable = 65001; + optional bool embed = 65002; + optional string customtype = 65003; + optional string customname = 65004; + optional string jsontag = 65005; + optional string moretags = 65006; + optional string casttype = 65007; + optional string castkey = 65008; + optional string castvalue = 65009; + + optional bool stdtime = 65010; + optional bool stdduration = 65011; +} diff --git a/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/remote.proto b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/remote.proto new file mode 100644 index 0000000000..0c50ce8234 --- /dev/null +++ b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/remote.proto @@ -0,0 +1,88 @@ +// Copyright 2016 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package prometheus; + +option go_package = "prompb"; + +import "types.proto"; +import "gogoproto/gogo.proto"; + +message WriteRequest { + repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; + // Cortex uses this field to determine the source of the write request. + // We reserve it to avoid any compatibility issues. + reserved 2; + repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false]; +} + +// ReadRequest represents a remote read request. +message ReadRequest { + repeated Query queries = 1; + + enum ResponseType { + // Server will return a single ReadResponse message with matched series that includes list of raw samples. + // It's recommended to use streamed response types instead. + // + // Response headers: + // Content-Type: "application/x-protobuf" + // Content-Encoding: "snappy" + SAMPLES = 0; + // Server will stream a delimited ChunkedReadResponse message that + // contains XOR or HISTOGRAM(!) encoded chunks for a single series. + // Each message is following varint size and fixed size bigendian + // uint32 for CRC32 Castagnoli checksum. + // + // Response headers: + // Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" + // Content-Encoding: "" + STREAMED_XOR_CHUNKS = 1; + } + + // accepted_response_types allows negotiating the content type of the response. + // + // Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is + // implemented by server, error is returned. + // For request that do not contain `accepted_response_types` field the SAMPLES response type will be used. + repeated ResponseType accepted_response_types = 2; +} + +// ReadResponse is a response when response_type equals SAMPLES. +message ReadResponse { + // In same order as the request's queries. + repeated QueryResult results = 1; +} + +message Query { + int64 start_timestamp_ms = 1; + int64 end_timestamp_ms = 2; + repeated prometheus.LabelMatcher matchers = 3; + prometheus.ReadHints hints = 4; +} + +message QueryResult { + // Samples within a time series must be ordered by time. + repeated prometheus.TimeSeries timeseries = 1; +} + +// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. +// We strictly stream full series after series, optionally split by time. This means that a single frame can contain +// partition of the single series, but once a new series is started to be streamed it means that no more chunks will +// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. +message ChunkedReadResponse { + repeated prometheus.ChunkedSeries chunked_series = 1; + + // query_index represents an index of the query from ReadRequest.queries these chunks relates to. + int64 query_index = 2; +} diff --git a/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/types.proto b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/types.proto new file mode 100644 index 0000000000..69bec49549 --- /dev/null +++ b/kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/types.proto @@ -0,0 +1,187 @@ +// Copyright 2017 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package prometheus; + +option go_package = "prompb"; + +import "gogoproto/gogo.proto"; + +message MetricMetadata { + enum MetricType { + UNKNOWN = 0; + COUNTER = 1; + GAUGE = 2; + HISTOGRAM = 3; + GAUGEHISTOGRAM = 4; + SUMMARY = 5; + INFO = 6; + STATESET = 7; + } + + // Represents the metric type, these match the set from Prometheus. + // Refer to model/textparse/interface.go for details. + MetricType type = 1; + string metric_family_name = 2; + string help = 4; + string unit = 5; +} + +message Sample { + double value = 1; + // timestamp is in ms format, see model/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + int64 timestamp = 2; +} + +message Exemplar { + // Optional, can be empty. + repeated Label labels = 1 [(gogoproto.nullable) = false]; + double value = 2; + // timestamp is in ms format, see model/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + int64 timestamp = 3; +} + +// A native histogram, also known as a sparse histogram. +// Original design doc: +// https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit +// The appendix of this design doc also explains the concept of float +// histograms. This Histogram message can represent both, the usual +// integer histogram as well as a float histogram. +message Histogram { + enum ResetHint { + UNKNOWN = 0; // Need to test for a counter reset explicitly. + YES = 1; // This is the 1st histogram after a counter reset. + NO = 2; // There was no counter reset between this and the previous Histogram. + GAUGE = 3; // This is a gauge histogram where counter resets don't happen. + } + + oneof count { // Count of observations in the histogram. + uint64 count_int = 1; + double count_float = 2; + } + double sum = 3; // Sum of observations in the histogram. + // The schema defines the bucket schema. Currently, valid numbers + // are -4 <= n <= 8. They are all for base-2 bucket schemas, where 1 + // is a bucket boundary in each case, and then each power of two is + // divided into 2^n logarithmic buckets. Or in other words, each + // bucket boundary is the previous boundary times 2^(2^-n). In the + // future, more bucket schemas may be added using numbers < -4 or > + // 8. + sint32 schema = 4; + double zero_threshold = 5; // Breadth of the zero bucket. + oneof zero_count { // Count in zero bucket. + uint64 zero_count_int = 6; + double zero_count_float = 7; + } + + // Negative Buckets. + repeated BucketSpan negative_spans = 8 [(gogoproto.nullable) = false]; + // Use either "negative_deltas" or "negative_counts", the former for + // regular histograms with integer counts, the latter for float + // histograms. + repeated sint64 negative_deltas = 9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket). + repeated double negative_counts = 10; // Absolute count of each bucket. + + // Positive Buckets. + repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false]; + // Use either "positive_deltas" or "positive_counts", the former for + // regular histograms with integer counts, the latter for float + // histograms. + repeated sint64 positive_deltas = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket). + repeated double positive_counts = 13; // Absolute count of each bucket. + + ResetHint reset_hint = 14; + // timestamp is in ms format, see model/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + int64 timestamp = 15; +} + +// A BucketSpan defines a number of consecutive buckets with their +// offset. Logically, it would be more straightforward to include the +// bucket counts in the Span. However, the protobuf representation is +// more compact in the way the data is structured here (with all the +// buckets in a single array separate from the Spans). +message BucketSpan { + sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative). + uint32 length = 2; // Length of consecutive buckets. +} + +// TimeSeries represents samples and labels for a single time series. +message TimeSeries { + // For a timeseries to be valid, and for the samples and exemplars + // to be ingested by the remote system properly, the labels field is required. + repeated Label labels = 1 [(gogoproto.nullable) = false]; + repeated Sample samples = 2 [(gogoproto.nullable) = false]; + repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false]; + repeated Histogram histograms = 4 [(gogoproto.nullable) = false]; +} + +message Label { + string name = 1; + string value = 2; +} + +message Labels { + repeated Label labels = 1 [(gogoproto.nullable) = false]; +} + +// Matcher specifies a rule, which can match or set of labels or not. +message LabelMatcher { + enum Type { + EQ = 0; + NEQ = 1; + RE = 2; + NRE = 3; + } + Type type = 1; + string name = 2; + string value = 3; +} + +message ReadHints { + int64 step_ms = 1; // Query step size in milliseconds. + string func = 2; // String representation of surrounding function or aggregation. + int64 start_ms = 3; // Start time in milliseconds. + int64 end_ms = 4; // End time in milliseconds. + repeated string grouping = 5; // List of label names used in aggregation. + bool by = 6; // Indicate whether it is without or by. + int64 range_ms = 7; // Range vector selector range in milliseconds. +} + +// Chunk represents a TSDB chunk. +// Time range [min, max] is inclusive. +message Chunk { + int64 min_time_ms = 1; + int64 max_time_ms = 2; + + // We require this to match chunkenc.Encoding. + enum Encoding { + UNKNOWN = 0; + XOR = 1; + HISTOGRAM = 2; + FLOAT_HISTOGRAM = 3; + } + Encoding type = 3; + bytes data = 4; +} + +// ChunkedSeries represents single, encoded time series. +message ChunkedSeries { + // Labels should be sorted. + repeated Label labels = 1 [(gogoproto.nullable) = false]; + // Chunks will be in start time order and may overlap. + repeated Chunk chunks = 2 [(gogoproto.nullable) = false]; +}