iliax 1 år sedan
förälder
incheckning
725c823947

+ 3 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java

@@ -7,14 +7,13 @@ import com.provectus.kafka.ui.model.GraphDescriptionsDTO;
 import com.provectus.kafka.ui.model.GraphParameterDTO;
 import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.graphs.GraphDescription;
 import com.provectus.kafka.ui.service.graphs.GraphsService;
-import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.time.Duration;
 import java.time.OffsetDateTime;
 import java.util.Optional;
-import java.util.stream.Stream;
 import lombok.RequiredArgsConstructor;
 import org.mapstruct.Mapper;
 import org.mapstruct.factory.Mappers;
@@ -41,7 +40,7 @@ public class GraphsController extends AbstractController implements GraphsApi {
 
   @Override
   public Mono<ResponseEntity<PrometheusApiQueryResponseDTO>> getGraphData(String clusterName,
-                                                                          Mono<GraphDataRequestDTO> graphDataRequestDTO,
+                                                                          Mono<GraphDataRequestDTO> graphDataRequestDto,
                                                                           ServerWebExchange exchange) {
     var context = AccessContext.builder()
         .cluster(clusterName)
@@ -50,7 +49,7 @@ public class GraphsController extends AbstractController implements GraphsApi {
 
     return accessControlService.validateAccess(context)
         .then(
-            graphDataRequestDTO.flatMap(req ->
+            graphDataRequestDto.flatMap(req ->
                     graphsService.getGraphData(
                         getCluster(clusterName),
                         req.getId(),

+ 6 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -32,11 +32,11 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
 import com.provectus.kafka.ui.model.TopicConfigDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.model.TopicDetailsDTO;
+import com.provectus.kafka.ui.service.metrics.SummarizedMetrics;
 import java.math.BigDecimal;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.kafka.clients.admin.ConfigEntry;
@@ -60,11 +60,11 @@ public interface ClusterMapper {
   @Deprecated
   default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
     return new ClusterMetricsDTO()
-        .items(convert(metrics.getSummarizedMetrics().toList()));
+        .items(convert(new SummarizedMetrics(metrics).asStream()).toList());
   }
 
-  private List<MetricDTO> convert(List<MetricFamilySamples> metrics) {
-    return metrics.stream()
+  private Stream<MetricDTO> convert(Stream<MetricFamilySamples> metrics) {
+    return metrics
         .flatMap(m -> m.samples.stream())
         .map(s ->
             new MetricDTO()
@@ -74,11 +74,11 @@ public interface ClusterMapper {
                     //collecting to map, keeping order
                     .collect(toMap(s.labelNames::get, s.labelValues::get, (m1, m2) -> null, LinkedHashMap::new)))
                 .value(BigDecimal.valueOf(s.value))
-        ).toList();
+        );
   }
 
   default BrokerMetricsDTO toBrokerMetrics(List<MetricFamilySamples> metrics) {
-    return new BrokerMetricsDTO().metrics(convert(metrics));
+    return new BrokerMetricsDTO().metrics(convert(metrics.stream()).toList());
   }
 
   @Mapping(target = "isSensitive", source = "sensitive")

+ 0 - 59
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java

@@ -1,18 +1,11 @@
 package com.provectus.kafka.ui.model;
 
 import static io.prometheus.client.Collector.MetricFamilySamples;
-import static java.util.stream.Collectors.toMap;
 
-import com.google.common.collect.Streams;
 import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetrics;
-import groovy.lang.Tuple;
 import java.math.BigDecimal;
-import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Stream;
 import lombok.Builder;
 import lombok.Value;
 
@@ -49,56 +42,4 @@ public class Metrics {
     }
   }
 
-  public Stream<MetricFamilySamples> getSummarizedMetrics() {
-    return Streams.concat(
-        inferredMetrics.asStream(),
-        perBrokerScrapedMetrics
-            .values()
-            .stream()
-            .flatMap(Collection::stream)
-            .collect(toMap(mfs -> mfs.name, Optional::of, Metrics::summarizeMfs, LinkedHashMap::new))
-            .values()
-            .stream()
-            .filter(Optional::isPresent)
-            .map(Optional::get)
-    );
-  }
-
-  //returns Optional.empty if merging not supported for metric type
-  private static Optional<MetricFamilySamples> summarizeMfs(Optional<MetricFamilySamples> mfs1opt,
-                                                            Optional<MetricFamilySamples> mfs2opt) {
-    if ((mfs1opt.isEmpty() || mfs2opt.isEmpty()) || (mfs1opt.get().type != mfs2opt.get().type)) {
-      return Optional.empty();
-    }
-    var mfs1 = mfs1opt.get();
-    return switch (mfs1.type) {
-      case GAUGE, COUNTER -> Optional.of(
-          new MetricFamilySamples(
-              mfs1.name,
-              mfs1.type,
-              mfs1.help,
-              Stream.concat(mfs1.samples.stream(), mfs2opt.get().samples.stream())
-                  .collect(
-                      toMap(
-                          // merging samples with same labels
-                          s -> Tuple.tuple(s.name, s.labelNames, s.labelValues),
-                          s -> s,
-                          (s1, s2) -> new MetricFamilySamples.Sample(
-                              s1.name,
-                              s1.labelNames,
-                              s1.labelValues,
-                              s1.value + s2.value
-                          ),
-                          LinkedHashMap::new
-                      )
-                  )
-                  .values()
-                  .stream()
-                  .toList()
-          )
-      );
-      default -> Optional.empty();
-    };
-  }
-
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java

@@ -27,7 +27,7 @@ public class MetricsScrapeProperties {
   @Nullable
   TruststoreConfig truststoreConfig;
 
-  public static MetricsScrapeProperties create(ClustersProperties.Cluster cluster){
+  public static MetricsScrapeProperties create(ClustersProperties.Cluster cluster) {
     var metrics = Objects.requireNonNull(cluster.getMetrics());
     return MetricsScrapeProperties.builder()
         .port(metrics.getPort())

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescription.java

@@ -11,6 +11,14 @@ public record GraphDescription(String id,
                                String prometheusQuery,
                                Set<String> params) {
 
+  public static GraphDescriptionBuilder instant() {
+    return builder();
+  }
+
+  public static GraphDescriptionBuilder range(Duration defaultInterval) {
+    return builder().defaultInterval(defaultInterval);
+  }
+
   public boolean isRange() {
     return defaultInterval != null;
   }

+ 10 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescriptions.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.service.graphs;
 
+import static java.util.stream.Collectors.toMap;
+
 import com.provectus.kafka.ui.exception.ValidationException;
 import java.time.Duration;
 import java.util.HashMap;
@@ -7,7 +9,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.springframework.stereotype.Component;
 
@@ -19,9 +20,8 @@ class GraphDescriptions {
   private final Map<String, GraphDescription> graphsById;
 
   GraphDescriptions() {
-    validateGraphDescr(PREDEFINED_GRAPHS);
-    this.graphsById = PREDEFINED_GRAPHS.stream()
-        .collect(Collectors.toMap(GraphDescription::id, d -> d));
+    validate();
+    this.graphsById = PREDEFINED_GRAPHS.stream().collect(toMap(GraphDescription::id, d -> d));
   }
 
   Optional<GraphDescription> getById(String id) {
@@ -32,9 +32,9 @@ class GraphDescriptions {
     return graphsById.values().stream();
   }
 
-  private void validateGraphDescr(List<GraphDescription> descriptions) {
+  private void validate() {
     Map<String, String> errors = new HashMap<>();
-    for (GraphDescription description : descriptions) {
+    for (GraphDescription description : PREDEFINED_GRAPHS) {
       new PromQueryTemplate(description)
           .validateSyntax()
           .ifPresent(err -> errors.put(description.id(), err));
@@ -46,33 +46,29 @@ class GraphDescriptions {
 
   private static final List<GraphDescription> PREDEFINED_GRAPHS = List.of(
 
-      GraphDescription.builder()
+      GraphDescription.range(DEFAULT_RANGE_DURATION)
           .id("broker_bytes_disk_ts")
-          .defaultInterval(DEFAULT_RANGE_DURATION)
           .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}")
           .params(Set.of())
           .build(),
 
-      GraphDescription.builder()
+      GraphDescription.instant()
           .id("broker_bytes_disk")
           .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}")
           .params(Set.of())
           .build(),
 
-      GraphDescription.builder()
+      GraphDescription.instant()
           .id("kafka_topic_partition_current_offset")
           .prometheusQuery("kafka_topic_partition_current_offset{cluster=\"${cluster}\"}")
           .params(Set.of())
           .build(),
 
-      GraphDescription.builder()
+      GraphDescription.range(DEFAULT_RANGE_DURATION)
           .id("kafka_topic_partition_current_offset_per_topic_ts")
-          .defaultInterval(DEFAULT_RANGE_DURATION)
           .prometheusQuery("kafka_topic_partition_current_offset{cluster=\"${cluster}\",topic = \"${topic}\"}")
           .params(Set.of("topic"))
           .build()
-
-      //TODO: add
   );
 
 }

+ 73 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/SummarizedMetrics.java

@@ -0,0 +1,73 @@
+package com.provectus.kafka.ui.service.metrics;
+
+import static java.util.stream.Collectors.toMap;
+
+import com.google.common.collect.Streams;
+import com.provectus.kafka.ui.model.Metrics;
+import groovy.lang.Tuple;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Optional;
+import java.util.stream.Stream;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class SummarizedMetrics {
+
+  private final Metrics metrics;
+
+  public Stream<MetricFamilySamples> asStream() {
+    return Streams.concat(
+        metrics.getInferredMetrics().asStream(),
+        metrics.getPerBrokerScrapedMetrics()
+            .values()
+            .stream()
+            .flatMap(Collection::stream)
+            .collect(toMap(mfs -> mfs.name, Optional::of, SummarizedMetrics::summarizeMfs, LinkedHashMap::new))
+            .values()
+            .stream()
+            .filter(Optional::isPresent)
+            .map(Optional::get)
+    );
+  }
+
+  //returns Optional.empty if merging not supported for metric type
+  private static Optional<MetricFamilySamples> summarizeMfs(Optional<MetricFamilySamples> mfs1opt,
+                                                            Optional<MetricFamilySamples> mfs2opt) {
+    if ((mfs1opt.isEmpty() || mfs2opt.isEmpty()) || (mfs1opt.get().type != mfs2opt.get().type)) {
+      return Optional.empty();
+    }
+    var mfs1 = mfs1opt.get();
+    return switch (mfs1.type) {
+      case GAUGE, COUNTER -> Optional.of(
+          new MetricFamilySamples(
+              mfs1.name,
+              mfs1.type,
+              mfs1.help,
+              Stream.concat(mfs1.samples.stream(), mfs2opt.get().samples.stream())
+                  .collect(
+                      toMap(
+                          // merging samples with same labels
+                          s -> Tuple.tuple(s.name, s.labelNames, s.labelValues),
+                          s -> s,
+                          (s1, s2) -> new MetricFamilySamples.Sample(
+                              s1.name,
+                              s1.labelNames,
+                              s1.labelValues,
+                              s1.value + s2.value
+                          ),
+                          LinkedHashMap::new
+                      )
+                  )
+                  .values()
+                  .stream()
+                  .toList()
+          )
+      );
+      default -> Optional.empty();
+    };
+  }
+
+
+}

+ 21 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExpose.java

@@ -22,6 +22,7 @@ import org.springframework.http.ResponseEntity;
 public final class PrometheusExpose {
 
   private static final String CLUSTER_EXPOSE_LBL_NAME = "cluster";
+  private static final String BROKER_EXPOSE_LBL_NAME = "broker_id";
 
   private static final HttpHeaders PROMETHEUS_EXPOSE_ENDPOINT_HEADERS;
 
@@ -49,9 +50,19 @@ public final class PrometheusExpose {
   }
 
   public static Stream<MetricFamilySamples> prepareMetricsForGlobalExpose(String clusterName, Metrics metrics) {
-    return metrics
-        .getSummarizedMetrics()
-        .map(mfs -> addLbl(mfs, CLUSTER_EXPOSE_LBL_NAME, clusterName));
+    return Stream.concat(
+            metrics.getInferredMetrics().asStream(),
+            extractBrokerMetricsWithLabel(metrics)
+        )
+        .map(mfs -> appendLabel(mfs, CLUSTER_EXPOSE_LBL_NAME, clusterName));
+  }
+
+  private static Stream<MetricFamilySamples> extractBrokerMetricsWithLabel(Metrics metrics) {
+    return metrics.getPerBrokerScrapedMetrics().entrySet().stream()
+        .flatMap(e -> {
+          String brokerId = String.valueOf(e.getKey());
+          return e.getValue().stream().map(mfs -> appendLabel(mfs, BROKER_EXPOSE_LBL_NAME, brokerId));
+        });
   }
 
   private static MetricFamilySamples concatSamples(MetricFamilySamples mfs1,
@@ -62,7 +73,7 @@ public final class PrometheusExpose {
     );
   }
 
-  private static MetricFamilySamples addLbl(MetricFamilySamples mfs, String lblName, String lblVal) {
+  private static MetricFamilySamples appendLabel(MetricFamilySamples mfs, String lblName, String lblVal) {
     return new MetricFamilySamples(
         mfs.name, mfs.unit, mfs.type, mfs.help,
         mfs.samples.stream()
@@ -96,17 +107,17 @@ public final class PrometheusExpose {
 
   // copied from io.prometheus.client.exporter.common.TextFormat:writeEscapedLabelValue
   public static String escapedLabelValue(String s) {
-    StringWriter writer = new StringWriter(s.length());
+    StringBuilder sb = new StringBuilder(s.length());
     for (int i = 0; i < s.length(); i++) {
       char c = s.charAt(i);
       switch (c) {
-        case '\\' -> writer.append("\\\\");
-        case '\"' -> writer.append("\\\"");
-        case '\n' -> writer.append("\\n");
-        default -> writer.append(c);
+        case '\\' -> sb.append("\\\\");
+        case '\"' -> sb.append("\\\"");
+        case '\n' -> sb.append("\\n");
+        default -> sb.append(c);
       }
     }
-    return writer.toString();
+    return sb.toString();
   }
 
 }

+ 11 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java

@@ -1,11 +1,13 @@
 package com.provectus.kafka.ui.service.metrics.sink;
 
+import static com.google.common.base.Charsets.UTF_8;
 import static com.provectus.kafka.ui.service.MessagesService.createProducer;
 import static com.provectus.kafka.ui.service.metrics.prometheus.PrometheusExpose.escapedLabelValue;
-import static io.prometheus.client.Collector.*;
+import static io.prometheus.client.Collector.MetricFamilySamples;
+import static io.prometheus.client.Collector.doubleToGoString;
+import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
 
 import com.fasterxml.jackson.databind.json.JsonMapper;
-import com.google.common.base.Charsets;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import java.time.Instant;
 import java.time.ZoneOffset;
@@ -21,7 +23,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import reactor.core.publisher.Mono;
 
 /*
- * Format and implementation are the same as in https://github.com/Telefonica/prometheus-kafka-adapter
+ * Format of records copied from https://github.com/Telefonica/prometheus-kafka-adapter
  */
 @RequiredArgsConstructor
 class KafkaSink implements MetricsSink {
@@ -30,11 +32,13 @@ class KafkaSink implements MetricsSink {
 
   private static final JsonMapper JSON_MAPPER = new JsonMapper();
 
+  private static final Map<String, Object> PRODUCER_ADDITIONAL_CONFIGS = Map.of(COMPRESSION_TYPE_CONFIG, "gzip");
+
   private final String topic;
   private final Producer<byte[], byte[]> producer;
 
   static KafkaSink create(ClustersProperties.Cluster cluster, String targetTopic) {
-    return new KafkaSink(targetTopic, createProducer(cluster, Map.of()));
+    return new KafkaSink(targetTopic, createProducer(cluster, PRODUCER_ADDITIONAL_CONFIGS));
   }
 
   @Override
@@ -58,13 +62,13 @@ class KafkaSink implements MetricsSink {
             lbls.put(sample.labelNames.get(i), escapedLabelValue(sample.labelValues.get(i)));
           }
           var km = new KafkaMetric(ts, doubleToGoString(sample.value), sample.name, lbls);
-          return new ProducerRecord<>(topic, toJson(km));
+          return new ProducerRecord<>(topic, toJsonBytes(km));
         });
   }
 
   @SneakyThrows
-  private static byte[] toJson(KafkaMetric m) {
-    return JSON_MAPPER.writeValueAsString(m).getBytes(Charsets.UTF_8);
+  private static byte[] toJsonBytes(KafkaMetric m) {
+    return JSON_MAPPER.writeValueAsString(m).getBytes(UTF_8);
   }
 
 }

+ 13 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java

@@ -19,20 +19,19 @@ public interface MetricsSink {
         .flatMap(metrics -> Optional.ofNullable(metrics.getStore()))
         .flatMap(store -> Optional.ofNullable(store.getPrometheus()))
         .ifPresent(prometheusConf -> {
-              if (hasText(prometheusConf.getUrl()) && Boolean.TRUE.equals(prometheusConf.getRemoteWrite())) {
-                sinks.add(new PrometheusRemoteWriteSink(prometheusConf.getUrl()));
-              }
-              if (hasText(prometheusConf.getPushGatewayUrl())) {
-                sinks.add(
-                    PrometheusPushGatewaySink.create(
-                        prometheusConf.getPushGatewayUrl(),
-                        prometheusConf.getPushGatewayJobName(),
-                        prometheusConf.getPushGatewayUsername(),
-                        prometheusConf.getPushGatewayPassword()
-                    ));
-              }
-            }
-        );
+          if (hasText(prometheusConf.getUrl()) && Boolean.TRUE.equals(prometheusConf.getRemoteWrite())) {
+            sinks.add(new PrometheusRemoteWriteSink(prometheusConf.getUrl()));
+          }
+          if (hasText(prometheusConf.getPushGatewayUrl())) {
+            sinks.add(
+                PrometheusPushGatewaySink.create(
+                    prometheusConf.getPushGatewayUrl(),
+                    prometheusConf.getPushGatewayJobName(),
+                    prometheusConf.getPushGatewayUsername(),
+                    prometheusConf.getPushGatewayPassword()
+                ));
+          }
+        });
 
     Optional.ofNullable(cluster.getMetrics())
         .flatMap(metrics -> Optional.ofNullable(metrics.getStore()))

+ 53 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExposeTest.java

@@ -0,0 +1,53 @@
+package com.provectus.kafka.ui.service.metrics.prometheus;
+
+import static com.provectus.kafka.ui.service.metrics.prometheus.PrometheusExpose.prepareMetricsForGlobalExpose;
+import static io.prometheus.client.Collector.Type.GAUGE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.model.Metrics;
+import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetrics;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+class PrometheusExposeTest {
+
+  @Test
+  void prepareMetricsForGlobalExposeAppendsClusterAndBrokerIdLabelsToMetrics() {
+
+    var inferredMfs = new MetricFamilySamples("infer", GAUGE, "help", List.of(
+        new Sample("infer1", List.of("lbl1"), List.of("lblVal1"), 100)));
+
+    var broker1Mfs = new MetricFamilySamples("brok", GAUGE, "help", List.of(
+        new Sample("brok", List.of("broklbl1"), List.of("broklblVal1"), 101)));
+
+    var broker2Mfs = new MetricFamilySamples("brok", GAUGE, "help", List.of(
+        new Sample("brok", List.of("broklbl1"), List.of("broklblVal1"), 102)));
+
+    List<MetricFamilySamples> prepared = prepareMetricsForGlobalExpose(
+        "testCluster",
+        Metrics.builder()
+            .inferredMetrics(new InferredMetrics(List.of(inferredMfs)))
+            .perBrokerScrapedMetrics(Map.of(1, List.of(broker1Mfs), 2, List.of(broker2Mfs)))
+            .build()
+    ).toList();
+
+    assertThat(prepared)
+        .hasSize(3)
+        .contains(new MetricFamilySamples("infer", GAUGE, "help", List.of(
+            new Sample("infer1", List.of("cluster", "lbl1"), List.of("testCluster", "lblVal1"), 100))))
+        .contains(
+            new MetricFamilySamples("brok", GAUGE, "help", List.of(
+                new Sample("brok", List.of("cluster", "broker_id", "broklbl1"),
+                    List.of("testCluster", "1", "broklblVal1"), 101)))
+        )
+        .contains(
+            new MetricFamilySamples("brok", GAUGE, "help", List.of(
+                new Sample("brok", List.of("cluster", "broker_id", "broklbl1"),
+                    List.of("testCluster", "2", "broklblVal1"), 102)))
+        );
+  }
+
+}