iliax 1 year ago
parent
commit
0ec59e8cab

+ 9 - 0
kafka-ui-api/pom.xml

@@ -242,6 +242,15 @@
             <groupId>io.prometheus</groupId>
             <groupId>io.prometheus</groupId>
             <artifactId>simpleclient_common</artifactId>
             <artifactId>simpleclient_common</artifactId>
         </dependency>
         </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_pushgateway</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>1.1.9.1</version>
+        </dependency>
 
 
         <dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <groupId>org.codehaus.groovy</groupId>

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -43,7 +43,7 @@ public class ClustersProperties {
     KsqldbServerAuth ksqldbServerAuth;
     KsqldbServerAuth ksqldbServerAuth;
     KeystoreConfig ksqldbServerSsl;
     KeystoreConfig ksqldbServerSsl;
     List<ConnectCluster> kafkaConnect;
     List<ConnectCluster> kafkaConnect;
-    MetricsConfigData metrics;
+    MetricsConfig metrics;
     Map<String, Object> properties;
     Map<String, Object> properties;
     boolean readOnly = false;
     boolean readOnly = false;
     List<SerdeConfig> serde;
     List<SerdeConfig> serde;
@@ -66,7 +66,7 @@ public class ClustersProperties {
 
 
   @Data
   @Data
   @ToString(exclude = {"password", "keystorePassword"})
   @ToString(exclude = {"password", "keystorePassword"})
-  public static class MetricsConfigData {
+  public static class MetricsConfig {
     String type;
     String type;
     Integer port;
     Integer port;
     Boolean ssl;
     Boolean ssl;

+ 20 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java

@@ -12,13 +12,16 @@ import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetricsScr
 import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsRetriever;
 import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsRetriever;
 import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsScraper;
 import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsScraper;
 import com.provectus.kafka.ui.service.metrics.scrape.prometheus.PrometheusScraper;
 import com.provectus.kafka.ui.service.metrics.scrape.prometheus.PrometheusScraper;
+import com.provectus.kafka.ui.service.metrics.sink.MetricsSink;
 import jakarta.annotation.Nullable;
 import jakarta.annotation.Nullable;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.Optional;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Node;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
+@Slf4j
 @RequiredArgsConstructor
 @RequiredArgsConstructor
 public class MetricsScrapping {
 public class MetricsScrapping {
 
 
@@ -30,10 +33,13 @@ public class MetricsScrapping {
   @Nullable
   @Nullable
   private final PrometheusScraper prometheusScraper;
   private final PrometheusScraper prometheusScraper;
 
 
+  private final MetricsSink sink;
+
   public static MetricsScrapping create(Cluster cluster,
   public static MetricsScrapping create(Cluster cluster,
                                         JmxMetricsRetriever jmxMetricsRetriever) {
                                         JmxMetricsRetriever jmxMetricsRetriever) {
     JmxMetricsScraper jmxMetricsScraper = null;
     JmxMetricsScraper jmxMetricsScraper = null;
     PrometheusScraper prometheusScraper = null;
     PrometheusScraper prometheusScraper = null;
+    MetricsSink sink = MetricsSink.noop();
 
 
     var metrics = cluster.getMetrics();
     var metrics = cluster.getMetrics();
     if (cluster.getMetrics() != null) {
     if (cluster.getMetrics() != null) {
@@ -43,8 +49,14 @@ public class MetricsScrapping {
       } else if (metrics.getType().equalsIgnoreCase(PROMETHEUS_METRICS_TYPE)) {
       } else if (metrics.getType().equalsIgnoreCase(PROMETHEUS_METRICS_TYPE)) {
         prometheusScraper = new PrometheusScraper(scrapeProperties);
         prometheusScraper = new PrometheusScraper(scrapeProperties);
       }
       }
+      sink = MetricsSink.create(cluster.getMetrics());
     }
     }
-    return new MetricsScrapping(new InferredMetricsScraper(), jmxMetricsScraper, prometheusScraper);
+    return new MetricsScrapping(
+        new InferredMetricsScraper(),
+        jmxMetricsScraper,
+        prometheusScraper,
+        sink
+    );
   }
   }
 
 
   private static MetricsScrapeProperties createScrapeProps(Cluster cluster) {
   private static MetricsScrapeProperties createScrapeProps(Cluster cluster) {
@@ -73,6 +85,13 @@ public class MetricsScrapping {
             .ioRates(ext.ioRates())
             .ioRates(ext.ioRates())
             .perBrokerScrapedMetrics(ext.perBrokerMetrics())
             .perBrokerScrapedMetrics(ext.perBrokerMetrics())
             .build()
             .build()
+    ).flatMap(metrics ->
+        sink.send(metrics.getSummarizedMetrics())
+            .onErrorResume(th -> {
+                  log.warn("Error sending metrics to metrics sink", th);
+                  return Mono.empty();
+                }
+            ).thenReturn(metrics)
     );
     );
   }
   }
 
 

+ 21 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java

@@ -0,0 +1,21 @@
+package com.provectus.kafka.ui.service.metrics.sink;
+
+import static io.prometheus.client.Collector.MetricFamilySamples;
+
+import com.provectus.kafka.ui.config.ClustersProperties;
+import java.util.stream.Stream;
+import reactor.core.publisher.Mono;
+
+public interface MetricsSink {
+
+  static MetricsSink noop() {
+    return m -> Mono.empty();
+  }
+
+  static MetricsSink create(ClustersProperties.MetricsConfig metricsConfig) {
+
+  }
+
+  Mono<Void> send(Stream<MetricFamilySamples> metrics);
+
+}

+ 65 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusPushGatewaySink.java

@@ -0,0 +1,65 @@
+package com.provectus.kafka.ui.service.metrics.sink;
+
+import static io.prometheus.client.Collector.MetricFamilySamples;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.exporter.BasicAuthHttpConnectionFactory;
+import io.prometheus.client.exporter.PushGateway;
+import jakarta.annotation.Nullable;
+import java.net.URL;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+@RequiredArgsConstructor
+class PrometheusPushGatewaySink implements MetricsSink {
+
+  private final static String DEFAULT_PGW_JOBNAME = "kafkaui";
+
+  private final PushGateway pushGateway;
+  private final String job;
+  //TODO: read about grouping rules
+
+  @SneakyThrows
+  static PrometheusPushGatewaySink create(String url,
+                                          @Nullable String job,
+                                          @Nullable String username,
+                                          @Nullable String passw) {
+    var pushGateway = new PushGateway(new URL(url));
+    if (StringUtils.hasText(username) && StringUtils.hasText(passw)) {
+      pushGateway.setConnectionFactory(new BasicAuthHttpConnectionFactory(username, passw));
+    }
+    return new PrometheusPushGatewaySink(
+        pushGateway,
+        Optional.ofNullable(job).orElse(DEFAULT_PGW_JOBNAME)
+    );
+  }
+
+  @Override
+  public Mono<Void> send(Stream<MetricFamilySamples> metrics) {
+    return Mono.<Void>fromRunnable(() -> pushSync(metrics.toList()))
+        .subscribeOn(Schedulers.boundedElastic());
+  }
+
+  @SneakyThrows
+  private void pushSync(List<MetricFamilySamples> metricsToPush) {
+    if (metricsToPush.isEmpty()) {
+      return;
+    }
+    pushGateway.push(
+        new Collector() {
+          @Override
+          public List<MetricFamilySamples> collect() {
+            return metricsToPush;
+          }
+        },
+        job
+    );
+  }
+}

+ 121 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java

@@ -0,0 +1,121 @@
+package com.provectus.kafka.ui.service.metrics.sink;
+
+import static io.prometheus.client.Collector.*;
+import static prometheus.Types.*;
+
+import com.google.common.base.Enums;
+import com.provectus.kafka.ui.util.WebClientConfigurator;
+import groovy.lang.Tuple;
+import io.prometheus.client.Collector;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import org.springframework.http.MediaType;
+import org.springframework.util.unit.DataSize;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.xerial.snappy.Snappy;
+import prometheus.Remote;
+import prometheus.Types;
+import prometheus.Types.MetricMetadata.MetricType;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+@RequiredArgsConstructor
+class PrometheusRemoteWriteSink implements MetricsSink {
+
+  private final WebClient webClient;
+  private final String writeEndpoint;
+
+  PrometheusRemoteWriteSink(String prometheusUrl) {
+    this.writeEndpoint = prometheusUrl + "/api/v1/write";
+    this.webClient = new WebClientConfigurator().configureBufferSize(DataSize.ofMegabytes(20)).build();
+  }
+
+  @SneakyThrows
+  @Override
+  public Mono<Void> send(Stream<MetricFamilySamples> metrics) {
+    byte[] bytesToWrite = Snappy.compress(createWriteRequest(metrics).toByteArray());
+    return webClient.post()
+        .uri(writeEndpoint)
+        //.contentType(MediaType.APPLICATION_FORM_URLENCODED)
+        .contentType(MediaType.parseMediaType("application/x-protobuf")) //???
+        .header("User-Agent", "promremote-kui/0.1.0")
+        .header("Content-Encoding", "snappy")
+        .header("X-Prometheus-Remote-Write-Version", "0.1.0")
+        .bodyValue(bytesToWrite)
+        .retrieve()
+        .toBodilessEntity()
+        .then();
+  }
+
+  private Remote.WriteRequest createWriteRequest(Stream<MetricFamilySamples> metrics) {
+    var tsAndMeta = createTimeSeries(metrics);
+    return Remote.WriteRequest.newBuilder()
+        .addAllTimeseries(tsAndMeta.getT1())
+        .addAllMetadata(tsAndMeta.getT2())
+        .build();
+  }
+
+  public Tuple2<List<TimeSeries>, List<MetricMetadata>> createTimeSeries(Stream<MetricFamilySamples> metrics) {
+    long currentTs = System.currentTimeMillis();
+    List<TimeSeries> timeSeriesList = new ArrayList<>();
+    List<MetricMetadata> metadatasList = new ArrayList<>();
+    metrics.forEach(mfs -> {
+      for (MetricFamilySamples.Sample sample : mfs.samples) {
+        TimeSeries.Builder timeSeriesBuilder = TimeSeries.newBuilder();
+        timeSeriesBuilder.addLabels(
+            Label.newBuilder()
+                .setName("__name__")
+                .setValue(escapedLabelValue(sample.name))
+                .build()
+        );
+        for (int i = 0; i < sample.labelNames.size(); i++) {
+          timeSeriesBuilder.addLabels(
+              Label.newBuilder()
+                  .setName(sample.labelNames.get(i))
+                  .setValue(escapedLabelValue(sample.labelValues.get(i)))
+                  .build()
+          );
+        }
+        timeSeriesBuilder.addSamples(
+            Sample.newBuilder()
+                .setValue(sample.value)
+                .setTimestamp(currentTs)
+                .build()
+        );
+        timeSeriesList.add(timeSeriesBuilder.build());
+        metadatasList.add(
+            MetricMetadata.newBuilder()
+                .setType(Enums.getIfPresent(MetricType.class, mfs.type.toString()).or(MetricType.UNKNOWN))
+                .setHelp(mfs.help)
+                .setUnit(mfs.unit)
+                .build()
+        );
+      }
+    });
+    return Tuples.of(timeSeriesList, metadatasList);
+  }
+
+  private static String escapedLabelValue(String s) {
+    //TODO: refactor
+    StringWriter writer = new StringWriter(s.length());
+    for (int i = 0; i < s.length(); i++) {
+      char c = s.charAt(i);
+      switch (c) {
+        case '\\' -> writer.append("\\\\");
+        case '\"' -> writer.append("\\\"");
+        case '\n' -> writer.append("\\n");
+        default -> writer.append(c);
+      }
+    }
+    return writer.toString();
+  }
+
+}

+ 39 - 1
kafka-ui-contract/pom.xml

@@ -46,6 +46,11 @@
                     <artifactId>javax.annotation-api</artifactId>
                     <artifactId>javax.annotation-api</artifactId>
                     <version>1.3.2</version>
                     <version>1.3.2</version>
                 </dependency>
                 </dependency>
+                <dependency>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                    <version>3.22.4</version>
+                </dependency>
             </dependencies>
             </dependencies>
 
 
             <build>
             <build>
@@ -229,7 +234,40 @@
                                 </configuration>
                                 </configuration>
                             </execution>
                             </execution>
                         </executions>
                         </executions>
-
+                    </plugin>
+                    <plugin>
+                        <groupId>kr.motd.maven</groupId>
+                        <artifactId>os-maven-plugin</artifactId>
+                        <version>1.7.0</version>
+                        <executions>
+                            <execution>
+                                <phase>initialize</phase>
+                                <goals>
+                                    <goal>detect</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.xolstice.maven.plugins</groupId>
+                        <artifactId>protobuf-maven-plugin</artifactId>
+                        <version>0.6.1</version>
+                        <executions>
+                            <execution>
+                                <phase>generate-sources</phase>
+                                <goals>
+                                    <goal>compile</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <attachProtoSources>false</attachProtoSources>
+                            <protoSourceRoot>${project.basedir}/src/main/resources/proto</protoSourceRoot>
+                            <includes>
+                                <include>**/*.proto</include>
+                            </includes>
+                            <protocArtifact>com.google.protobuf:protoc:3.21.12:exe:${os.detected.classifier}</protocArtifact>
+                        </configuration>
                     </plugin>
                     </plugin>
                 </plugins>
                 </plugins>
             </build>
             </build>

+ 133 - 0
kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/gogoproto/gogo.proto

@@ -0,0 +1,133 @@
+// Protocol Buffers for Go with Gadgets
+//
+// Copyright (c) 2013, The GoGo Authors. All rights reserved.
+// http://github.com/gogo/protobuf
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto2";
+package gogoproto;
+
+import "google/protobuf/descriptor.proto";
+
+option java_package = "com.google.protobuf";
+option java_outer_classname = "GoGoProtos";
+option go_package = "github.com/gogo/protobuf/gogoproto";
+
+extend google.protobuf.EnumOptions {
+    optional bool goproto_enum_prefix = 62001;
+    optional bool goproto_enum_stringer = 62021;
+    optional bool enum_stringer = 62022;
+    optional string enum_customname = 62023;
+    optional bool enumdecl = 62024;
+}
+
+extend google.protobuf.EnumValueOptions {
+    optional string enumvalue_customname = 66001;
+}
+
+extend google.protobuf.FileOptions {
+    optional bool goproto_getters_all = 63001;
+    optional bool goproto_enum_prefix_all = 63002;
+    optional bool goproto_stringer_all = 63003;
+    optional bool verbose_equal_all = 63004;
+    optional bool face_all = 63005;
+    optional bool gostring_all = 63006;
+    optional bool populate_all = 63007;
+    optional bool stringer_all = 63008;
+    optional bool onlyone_all = 63009;
+
+    optional bool equal_all = 63013;
+    optional bool description_all = 63014;
+    optional bool testgen_all = 63015;
+    optional bool benchgen_all = 63016;
+    optional bool marshaler_all = 63017;
+    optional bool unmarshaler_all = 63018;
+    optional bool stable_marshaler_all = 63019;
+
+    optional bool sizer_all = 63020;
+
+    optional bool goproto_enum_stringer_all = 63021;
+    optional bool enum_stringer_all = 63022;
+
+    optional bool unsafe_marshaler_all = 63023;
+    optional bool unsafe_unmarshaler_all = 63024;
+
+    optional bool goproto_extensions_map_all = 63025;
+    optional bool goproto_unrecognized_all = 63026;
+    optional bool gogoproto_import = 63027;
+    optional bool protosizer_all = 63028;
+    optional bool compare_all = 63029;
+    optional bool typedecl_all = 63030;
+    optional bool enumdecl_all = 63031;
+
+    optional bool goproto_registration = 63032;
+}
+
+extend google.protobuf.MessageOptions {
+    optional bool goproto_getters = 64001;
+    optional bool goproto_stringer = 64003;
+    optional bool verbose_equal = 64004;
+    optional bool face = 64005;
+    optional bool gostring = 64006;
+    optional bool populate = 64007;
+    optional bool stringer = 67008;
+    optional bool onlyone = 64009;
+
+    optional bool equal = 64013;
+    optional bool description = 64014;
+    optional bool testgen = 64015;
+    optional bool benchgen = 64016;
+    optional bool marshaler = 64017;
+    optional bool unmarshaler = 64018;
+    optional bool stable_marshaler = 64019;
+
+    optional bool sizer = 64020;
+
+    optional bool unsafe_marshaler = 64023;
+    optional bool unsafe_unmarshaler = 64024;
+
+    optional bool goproto_extensions_map = 64025;
+    optional bool goproto_unrecognized = 64026;
+
+    optional bool protosizer = 64028;
+    optional bool compare = 64029;
+
+    optional bool typedecl = 64030;
+}
+
+extend google.protobuf.FieldOptions {
+    optional bool nullable = 65001;
+    optional bool embed = 65002;
+    optional string customtype = 65003;
+    optional string customname = 65004;
+    optional string jsontag = 65005;
+    optional string moretags = 65006;
+    optional string casttype = 65007;
+    optional string castkey = 65008;
+    optional string castvalue = 65009;
+
+    optional bool stdtime = 65010;
+    optional bool stdduration = 65011;
+}

+ 88 - 0
kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/remote.proto

@@ -0,0 +1,88 @@
+// Copyright 2016 Prometheus Team
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+package prometheus;
+
+option go_package = "prompb";
+
+import "types.proto";
+import "gogoproto/gogo.proto";
+
+message WriteRequest {
+    repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
+    // Cortex uses this field to determine the source of the write request.
+    // We reserve it to avoid any compatibility issues.
+    reserved  2;
+    repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false];
+}
+
+// ReadRequest represents a remote read request.
+message ReadRequest {
+    repeated Query queries = 1;
+
+    enum ResponseType {
+        // Server will return a single ReadResponse message with matched series that includes list of raw samples.
+        // It's recommended to use streamed response types instead.
+        //
+        // Response headers:
+        // Content-Type: "application/x-protobuf"
+        // Content-Encoding: "snappy"
+        SAMPLES = 0;
+        // Server will stream a delimited ChunkedReadResponse message that
+        // contains XOR or HISTOGRAM(!) encoded chunks for a single series.
+        // Each message is following varint size and fixed size bigendian
+        // uint32 for CRC32 Castagnoli checksum.
+        //
+        // Response headers:
+        // Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
+        // Content-Encoding: ""
+        STREAMED_XOR_CHUNKS = 1;
+    }
+
+    // accepted_response_types allows negotiating the content type of the response.
+    //
+    // Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
+    // implemented by server, error is returned.
+    // For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
+    repeated ResponseType accepted_response_types = 2;
+}
+
+// ReadResponse is a response when response_type equals SAMPLES.
+message ReadResponse {
+    // In same order as the request's queries.
+    repeated QueryResult results = 1;
+}
+
+message Query {
+    int64 start_timestamp_ms = 1;
+    int64 end_timestamp_ms = 2;
+    repeated prometheus.LabelMatcher matchers = 3;
+    prometheus.ReadHints hints = 4;
+}
+
+message QueryResult {
+    // Samples within a time series must be ordered by time.
+    repeated prometheus.TimeSeries timeseries = 1;
+}
+
+// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS.
+// We strictly stream full series after series, optionally split by time. This means that a single frame can contain
+// partition of the single series, but once a new series is started to be streamed it means that no more chunks will
+// be sent for previous one. Series are returned sorted in the same way TSDB block are internally.
+message ChunkedReadResponse {
+    repeated prometheus.ChunkedSeries chunked_series = 1;
+
+    // query_index represents an index of the query from ReadRequest.queries these chunks relates to.
+    int64 query_index = 2;
+}

+ 187 - 0
kafka-ui-contract/src/main/resources/proto/prometheus-remote-write-api/types.proto

@@ -0,0 +1,187 @@
+// Copyright 2017 Prometheus Team
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+package prometheus;
+
+option go_package = "prompb";
+
+import "gogoproto/gogo.proto";
+
+message MetricMetadata {
+    enum MetricType {
+        UNKNOWN        = 0;
+        COUNTER        = 1;
+        GAUGE          = 2;
+        HISTOGRAM      = 3;
+        GAUGEHISTOGRAM = 4;
+        SUMMARY        = 5;
+        INFO           = 6;
+        STATESET       = 7;
+    }
+
+    // Represents the metric type, these match the set from Prometheus.
+    // Refer to model/textparse/interface.go for details.
+    MetricType type = 1;
+    string metric_family_name = 2;
+    string help = 4;
+    string unit = 5;
+}
+
+message Sample {
+    double value    = 1;
+    // timestamp is in ms format, see model/timestamp/timestamp.go for
+    // conversion from time.Time to Prometheus timestamp.
+    int64 timestamp = 2;
+}
+
+message Exemplar {
+    // Optional, can be empty.
+    repeated Label labels = 1 [(gogoproto.nullable) = false];
+    double value = 2;
+    // timestamp is in ms format, see model/timestamp/timestamp.go for
+    // conversion from time.Time to Prometheus timestamp.
+    int64 timestamp = 3;
+}
+
+// A native histogram, also known as a sparse histogram.
+// Original design doc:
+// https://docs.google.com/document/d/1cLNv3aufPZb3fNfaJgdaRBZsInZKKIHo9E6HinJVbpM/edit
+// The appendix of this design doc also explains the concept of float
+// histograms. This Histogram message can represent both, the usual
+// integer histogram as well as a float histogram.
+message Histogram {
+    enum ResetHint {
+        UNKNOWN = 0; // Need to test for a counter reset explicitly.
+        YES     = 1; // This is the 1st histogram after a counter reset.
+        NO      = 2; // There was no counter reset between this and the previous Histogram.
+        GAUGE   = 3; // This is a gauge histogram where counter resets don't happen.
+    }
+
+    oneof count { // Count of observations in the histogram.
+        uint64 count_int   = 1;
+        double count_float = 2;
+    }
+    double sum = 3; // Sum of observations in the histogram.
+    // The schema defines the bucket schema. Currently, valid numbers
+    // are -4 <= n <= 8. They are all for base-2 bucket schemas, where 1
+    // is a bucket boundary in each case, and then each power of two is
+    // divided into 2^n logarithmic buckets. Or in other words, each
+    // bucket boundary is the previous boundary times 2^(2^-n). In the
+    // future, more bucket schemas may be added using numbers < -4 or >
+    // 8.
+    sint32 schema             = 4;
+    double zero_threshold     = 5; // Breadth of the zero bucket.
+    oneof zero_count { // Count in zero bucket.
+        uint64 zero_count_int     = 6;
+        double zero_count_float   = 7;
+    }
+
+    // Negative Buckets.
+    repeated BucketSpan negative_spans =  8 [(gogoproto.nullable) = false];
+    // Use either "negative_deltas" or "negative_counts", the former for
+    // regular histograms with integer counts, the latter for float
+    // histograms.
+    repeated sint64 negative_deltas    =  9; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
+    repeated double negative_counts    = 10; // Absolute count of each bucket.
+
+    // Positive Buckets.
+    repeated BucketSpan positive_spans = 11 [(gogoproto.nullable) = false];
+    // Use either "positive_deltas" or "positive_counts", the former for
+    // regular histograms with integer counts, the latter for float
+    // histograms.
+    repeated sint64 positive_deltas    = 12; // Count delta of each bucket compared to previous one (or to zero for 1st bucket).
+    repeated double positive_counts    = 13; // Absolute count of each bucket.
+
+    ResetHint reset_hint               = 14;
+    // timestamp is in ms format, see model/timestamp/timestamp.go for
+    // conversion from time.Time to Prometheus timestamp.
+    int64 timestamp = 15;
+}
+
+// A BucketSpan defines a number of consecutive buckets with their
+// offset. Logically, it would be more straightforward to include the
+// bucket counts in the Span. However, the protobuf representation is
+// more compact in the way the data is structured here (with all the
+// buckets in a single array separate from the Spans).
+message BucketSpan {
+    sint32 offset = 1; // Gap to previous span, or starting point for 1st span (which can be negative).
+    uint32 length = 2; // Length of consecutive buckets.
+}
+
+// TimeSeries represents samples and labels for a single time series.
+message TimeSeries {
+    // For a timeseries to be valid, and for the samples and exemplars
+    // to be ingested by the remote system properly, the labels field is required.
+    repeated Label labels         = 1 [(gogoproto.nullable) = false];
+    repeated Sample samples       = 2 [(gogoproto.nullable) = false];
+    repeated Exemplar exemplars   = 3 [(gogoproto.nullable) = false];
+    repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
+}
+
+message Label {
+    string name  = 1;
+    string value = 2;
+}
+
+message Labels {
+    repeated Label labels = 1 [(gogoproto.nullable) = false];
+}
+
+// Matcher specifies a rule, which can match or set of labels or not.
+message LabelMatcher {
+    enum Type {
+        EQ  = 0;
+        NEQ = 1;
+        RE  = 2;
+        NRE = 3;
+    }
+    Type type    = 1;
+    string name  = 2;
+    string value = 3;
+}
+
+message ReadHints {
+    int64 step_ms = 1;  // Query step size in milliseconds.
+    string func = 2;    // String representation of surrounding function or aggregation.
+    int64 start_ms = 3; // Start time in milliseconds.
+    int64 end_ms = 4;   // End time in milliseconds.
+    repeated string grouping = 5; // List of label names used in aggregation.
+    bool by = 6; // Indicate whether it is without or by.
+    int64 range_ms = 7; // Range vector selector range in milliseconds.
+}
+
+// Chunk represents a TSDB chunk.
+// Time range [min, max] is inclusive.
+message Chunk {
+    int64 min_time_ms = 1;
+    int64 max_time_ms = 2;
+
+    // We require this to match chunkenc.Encoding.
+    enum Encoding {
+        UNKNOWN         = 0;
+        XOR             = 1;
+        HISTOGRAM       = 2;
+        FLOAT_HISTOGRAM = 3;
+    }
+    Encoding type  = 3;
+    bytes data     = 4;
+}
+
+// ChunkedSeries represents single, encoded time series.
+message ChunkedSeries {
+    // Labels should be sorted.
+    repeated Label labels = 1 [(gogoproto.nullable) = false];
+    // Chunks will be in start time order and may overlap.
+    repeated Chunk chunks = 2 [(gogoproto.nullable) = false];
+}