iliax 2 年之前
父节点
当前提交
98b1aa5ec0
共有 23 个文件被更改,包括 819 次插入397 次删除
  1. 3 1
      documentation/compose/kafka-ui-arm64.yaml
  2. 16 78
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java
  3. 7 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java
  4. 42 24
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
  5. 3 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java
  6. 5 19
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java
  7. 30 22
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScanner.java
  8. 4 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java
  9. 4 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/PerBrokerScrapedMetrics.java
  10. 11 13
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java
  11. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetrics.java
  12. 5 12
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraper.java
  13. 17 16
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsRetriever.java
  14. 6 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxSslSocketFactory.java
  15. 0 47
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParser.java
  16. 289 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointParser.java
  17. 26 40
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusMetricsRetriever.java
  18. 9 10
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusScraper.java
  19. 102 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PrometheusEndpointUtil.java
  20. 6 16
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScannerTest.java
  21. 0 31
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParserTest.java
  22. 175 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointParserTest.java
  23. 59 38
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusMetricsRetrieverTest.java

+ 3 - 1
documentation/compose/kafka-ui-arm64.yaml

@@ -19,7 +19,9 @@ services:
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
-      DYNAMIC_CONFIG_ENABLED: 'true'  # not necessary, added for tests
+      KAFKA_CLUSTERS_1_NAME: local2
+      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka0:29092
+#      KAFKA_CLUSTERS_1_METRICS_PORT: 9997
 
   kafka0:
     image: confluentinc/cp-kafka:7.2.1.arm64

+ 16 - 78
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/PrometheusExposeController.java

@@ -1,19 +1,11 @@
 package com.provectus.kafka.ui.controller;
 
-import static io.prometheus.client.Collector.MetricFamilySamples;
-
-import com.google.common.collect.Iterators;
 import com.provectus.kafka.ui.api.PrometheusExposeApi;
+import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.StatisticsCache;
-import io.prometheus.client.exporter.common.TextFormat;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
+import com.provectus.kafka.ui.util.PrometheusEndpointUtil;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import org.springframework.http.HttpHeaders;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -27,15 +19,21 @@ public class PrometheusExposeController extends AbstractController implements Pr
 
   @Override
   public Mono<ResponseEntity<String>> getAllMetrics(ServerWebExchange exchange) {
-    return constructResponse(getSummarizedMetricsWithClusterLbl());
+    return Mono.just(
+        PrometheusEndpointUtil.exposeAllMetrics(
+            clustersStorage.getKafkaClusters()
+                .stream()
+                .collect(Collectors.toMap(KafkaCluster::getName, c -> statisticsCache.get(c).getMetrics()))
+        )
+    );
   }
 
   @Override
   public Mono<ResponseEntity<String>> getAllClusterMetrics(String clusterName, ServerWebExchange exchange) {
-    return constructResponse(
-        statisticsCache.get(getCluster(clusterName))
-            .getMetrics()
-            .getSummarizedMetrics()
+    return Mono.just(
+        PrometheusEndpointUtil.exposeClusterMetrics(
+            statisticsCache.get(getCluster(clusterName)).getMetrics()
+        )
     );
   }
 
@@ -43,70 +41,10 @@ public class PrometheusExposeController extends AbstractController implements Pr
   public Mono<ResponseEntity<String>> getBrokerMetrics(String clusterName,
                                                        Long brokerId,
                                                        ServerWebExchange exchange) {
-    //TODO: discuss - do we need to append broker_id lbl ?
-    return constructResponse(
-        statisticsCache.get(getCluster(clusterName))
-            .getMetrics()
-            .getPerBrokerScrapedMetrics()
-            .getOrDefault(brokerId.intValue(), List.of())
-            .stream()
-    );
-  }
-
-  private Stream<MetricFamilySamples> getSummarizedMetricsWithClusterLbl() {
-    return clustersStorage.getKafkaClusters()
-        .stream()
-        .flatMap(c -> statisticsCache.get(c)
-            .getMetrics()
-            .getSummarizedMetrics()
-            .map(mfs -> appendLbl(mfs, "cluster", c.getName())))
-        // merging MFS with same name
-        .collect(Collectors.toMap(mfs -> mfs.name, mfs -> mfs, PrometheusExposeController::merge))
-        .values()
-        .stream();
-  }
-
-  private static MetricFamilySamples merge(MetricFamilySamples mfs1, MetricFamilySamples mfs2) {
-    return new MetricFamilySamples(
-        mfs1.name, mfs1.unit, mfs1.type, mfs1.help,
-        Stream.concat(mfs1.samples.stream(), mfs2.samples.stream()).toList()
-    );
-  }
-
-  private static MetricFamilySamples appendLbl(MetricFamilySamples mfs, String lblName, String lblVal) {
-    return new MetricFamilySamples(
-        mfs.name, mfs.unit, mfs.type, mfs.help,
-        mfs.samples.stream()
-            .map(sample ->
-                new MetricFamilySamples.Sample(
-                    sample.name,
-                    prependToList(sample.labelNames, lblName),
-                    prependToList(sample.labelValues, lblVal),
-                    sample.value
-                )).toList()
-    );
-  }
-
-  private static <T> List<T> prependToList(List<T> lst, T toPrepend) {
-    var result = new ArrayList<T>(lst.size() + 1);
-    result.add(toPrepend);
-    result.addAll(lst);
-    return result;
-  }
-
-  @SneakyThrows
-  private static Mono<ResponseEntity<String>> constructResponse(Stream<MetricFamilySamples> metrics) {
-    StringWriter writer = new StringWriter();
-    TextFormat.writeOpenMetrics100(writer, Iterators.asEnumeration(metrics.iterator()));
-
-    HttpHeaders responseHeaders = new HttpHeaders();
-    responseHeaders.set(HttpHeaders.CONTENT_TYPE, TextFormat.CONTENT_TYPE_OPENMETRICS_100);
-
     return Mono.just(
-        ResponseEntity
-            .ok()
-            .headers(responseHeaders)
-            .body(writer.toString())
+        PrometheusEndpointUtil.exposeBrokerMetrics(
+            statisticsCache.get(getCluster(clusterName)).getMetrics(), brokerId.intValue()
+        )
     );
   }
 

+ 7 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java

@@ -46,9 +46,9 @@ public class InternalLogDirStats {
     return new InternalLogDirStats(Map.of());
   }
 
-  public InternalLogDirStats(Map<Integer, Map<String, LogDirDescription>> log) {
+  public InternalLogDirStats(Map<Integer, Map<String, LogDirDescription>> logsInfo) {
     final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
-        log.entrySet().stream().flatMap(b ->
+        logsInfo.entrySet().stream().flatMap(b ->
             b.getValue().entrySet().stream().flatMap(topicMap ->
                 topicMap.getValue().replicaInfos().entrySet().stream()
                     .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size()))
@@ -74,12 +74,14 @@ public class InternalLogDirStats {
             collectingAndThen(
                 summarizingLong(Tuple3::getT3), SegmentStats::new)));
 
-    brokerDirsStats = calculateSpaceStats(log);
+    brokerDirsStats = calculateSpaceStats(logsInfo);
   }
 
-  private static Map<Integer, LogDirSpaceStats> calculateSpaceStats(Map<Integer, Map<String, LogDirDescription>> log) {
+  private static Map<Integer, LogDirSpaceStats> calculateSpaceStats(
+      Map<Integer, Map<String, LogDirDescription>> logsInfo) {
+
     var stats = new HashMap<Integer, LogDirSpaceStats>();
-    log.forEach((brokerId, logDirStats) -> {
+    logsInfo.forEach((brokerId, logDirStats) -> {
       Map<String, Long> totalBytes = new HashMap<>();
       Map<String, Long> usableBytes = new HashMap<>();
       logDirStats.forEach((logDir, descr) -> {

+ 42 - 24
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java

@@ -6,10 +6,14 @@ import static java.util.stream.Collectors.toMap;
 import com.google.common.collect.Streams;
 import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetrics;
 import groovy.lang.Tuple;
+import jakarta.annotation.Nullable;
 import java.math.BigDecimal;
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.stream.Stream;
 import lombok.Builder;
 import lombok.Value;
@@ -37,7 +41,7 @@ public class Metrics {
                         Map<String, BigDecimal> topicBytesInPerSec,
                         Map<String, BigDecimal> topicBytesOutPerSec) {
 
-    public static IoRates empty() {
+    static IoRates empty() {
       return IoRates.builder()
           .brokerBytesOutPerSec(Map.of())
           .brokerBytesInPerSec(Map.of())
@@ -54,34 +58,48 @@ public class Metrics {
             .values()
             .stream()
             .flatMap(Collection::stream)
-            .collect(toMap(mfs -> mfs.name, mfs -> mfs, Metrics::summarizeMfs))
+            .collect(toMap(mfs -> mfs.name, Optional::of, Metrics::summarizeMfs, LinkedHashMap::new))
             .values()
             .stream()
+            .filter(Optional::isPresent)
+            .map(Optional::get)
     );
   }
 
-  private static MetricFamilySamples summarizeMfs(MetricFamilySamples mfs1, MetricFamilySamples mfs2) {
-    return new MetricFamilySamples(
-        mfs1.name,
-        mfs1.type,
-        mfs1.help,
-        Stream.concat(mfs1.samples.stream(), mfs2.samples.stream())
-            .collect(
-                toMap(
-                    s -> Tuple.tuple(s.labelNames, s.labelValues),
-                    s -> s,
-                    (s1, s2) -> new MetricFamilySamples.Sample(
-                        s1.name,
-                        s1.labelNames,
-                        s1.labelValues,
-                        s1.value + s2.value
-                    )
-                )
-            )
-            .values()
-            .stream()
-            .toList()
-    );
+  //returns Optional.empty if merging not supported for metric type
+  private static Optional<MetricFamilySamples> summarizeMfs(Optional<MetricFamilySamples> mfs1opt,
+                                                            Optional<MetricFamilySamples> mfs2opt) {
+    if ((mfs1opt.isEmpty() || mfs2opt.isEmpty()) || (mfs1opt.get().type != mfs2opt.get().type)) {
+      return Optional.empty();
+    }
+    var mfs1 = mfs1opt.get();
+    return switch (mfs1.type) {
+      case GAUGE, COUNTER -> Optional.of(
+          new MetricFamilySamples(
+              mfs1.name,
+              mfs1.type,
+              mfs1.help,
+              Stream.concat(mfs1.samples.stream(), mfs2opt.get().samples.stream())
+                  .collect(
+                      toMap(
+                          // merging samples with same labels
+                          s -> Tuple.tuple(s.name, s.labelNames, s.labelValues),
+                          s -> s,
+                          (s1, s2) -> new MetricFamilySamples.Sample(
+                              s1.name,
+                              s1.labelNames,
+                              s1.labelValues,
+                              s1.value + s2.value
+                          )
+                      )
+                  )
+                  .values()
+                  .stream()
+                  .toList()
+          )
+      );
+      default -> Optional.empty();
+    };
   }
 
 }

+ 3 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java

@@ -54,15 +54,14 @@ public class StatisticsService {
             e -> Mono.just(Statistics.empty().toBuilder().lastKafkaException(e).build()));
   }
 
-  private Mono<ScrapedClusterState> loadClusterState(ClusterDescription clusterDescription,
-                                                     ReactiveAdminClient ac) {
+  private Mono<ScrapedClusterState> loadClusterState(ClusterDescription clusterDescription, ReactiveAdminClient ac) {
     return ScrapedClusterState.scrape(clusterDescription, ac);
   }
 
-  private Mono<Metrics> scrapeMetrics(KafkaCluster c,
+  private Mono<Metrics> scrapeMetrics(KafkaCluster cluster,
                                       ScrapedClusterState clusterState,
                                       ClusterDescription clusterDescription) {
-    return c.getMetricsScrapping().scrape(clusterState, clusterDescription.getNodes());
+    return cluster.getMetricsScrapping().scrape(clusterState, clusterDescription.getNodes());
   }
 
 }

+ 5 - 19
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.service.metrics;
 
-import static io.prometheus.client.Collector.*;
+import static io.prometheus.client.Collector.MetricFamilySamples;
+import static io.prometheus.client.Collector.Type;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
@@ -8,8 +9,6 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 public interface RawMetric {
@@ -26,9 +25,9 @@ public interface RawMetric {
     return new SimpleMetric(name, labels, value);
   }
 
-  static Stream<MetricFamilySamples> groupIntoMFS(Collection<RawMetric> lst) {
+  static Stream<MetricFamilySamples> groupIntoMFS(Collection<RawMetric> rawMetrics) {
     Map<String, MetricFamilySamples> map = new LinkedHashMap<>();
-    for (RawMetric m : lst) {
+    for (RawMetric m : rawMetrics) {
       var mfs = map.get(m.name());
       if (mfs == null) {
         mfs = new MetricFamilySamples(m.name(), Type.GAUGE, m.name(), new ArrayList<>());
@@ -41,19 +40,6 @@ public interface RawMetric {
     return map.values().stream();
   }
 
-  static Stream<RawMetric> create(MetricFamilySamples samples) {
-    return samples.samples.stream()
-        .map(s -> create(
-                s.name,
-                IntStream.range(0, s.labelNames.size())
-                    .boxed()
-                    .collect(Collectors.<Integer, String, String>toMap(s.labelNames::get, s.labelValues::get)),
-                BigDecimal.valueOf(s.value)
-            )
-        );
-  }
-
-  record SimpleMetric(String name, Map<String, String> labels, BigDecimal value) implements RawMetric {
-  }
+  record SimpleMetric(String name, Map<String, String> labels, BigDecimal value) implements RawMetric { }
 
 }

+ 30 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScanner.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service.metrics.scrape;
 
+import static io.prometheus.client.Collector.*;
 import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
 import static org.apache.commons.lang3.StringUtils.endsWithIgnoreCase;
 
@@ -11,7 +12,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class IoRatesMetricsScanner {
+// Scans external jmx/prometheus metric and tries to infer io rates
+class IoRatesMetricsScanner {
 
   // per broker
   final Map<Integer, BigDecimal> brokerBytesInFifteenMinuteRate = new HashMap<>();
@@ -21,12 +23,12 @@ public class IoRatesMetricsScanner {
   final Map<String, BigDecimal> bytesInFifteenMinuteRate = new HashMap<>();
   final Map<String, BigDecimal> bytesOutFifteenMinuteRate = new HashMap<>();
 
-  public IoRatesMetricsScanner(Map<Integer, List<Collector.MetricFamilySamples>> perBrokerMetrics) {
+  IoRatesMetricsScanner(Map<Integer, List<MetricFamilySamples>> perBrokerMetrics) {
     perBrokerMetrics.forEach((nodeId, metrics) -> {
       metrics.forEach(m -> {
-        RawMetric.create(m).forEach(rawMetric -> {
-          updateBrokerIOrates(nodeId, rawMetric);
-          updateTopicsIOrates(rawMetric);
+        m.samples.forEach(metricSample -> {
+          updateBrokerIOrates(nodeId, metricSample);
+          updateTopicsIOrates(metricSample);
         });
       });
     });
@@ -41,35 +43,41 @@ public class IoRatesMetricsScanner {
         .build();
   }
 
-  private void updateBrokerIOrates(int nodeId, RawMetric rawMetric) {
-    String name = rawMetric.name();
+  private void updateBrokerIOrates(int nodeId, MetricFamilySamples.Sample metric) {
+    String name = metric.name;
     if (!brokerBytesInFifteenMinuteRate.containsKey(nodeId)
-        && rawMetric.labels().size() == 1
-        && "BytesInPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
+        && metric.labelValues.size() == 1
+        && "BytesInPerSec".equalsIgnoreCase(metric.labelValues.get(0))
         && containsIgnoreCase(name, "BrokerTopicMetrics")
         && endsWithIgnoreCase(name, "FifteenMinuteRate")) {
-      brokerBytesInFifteenMinuteRate.put(nodeId, rawMetric.value());
+      brokerBytesInFifteenMinuteRate.put(nodeId, BigDecimal.valueOf(metric.value));
     }
     if (!brokerBytesOutFifteenMinuteRate.containsKey(nodeId)
-        && rawMetric.labels().size() == 1
-        && "BytesOutPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
+        && metric.labelValues.size() == 1
+        && "BytesOutPerSec".equalsIgnoreCase(metric.labelValues.get(0))
         && containsIgnoreCase(name, "BrokerTopicMetrics")
         && endsWithIgnoreCase(name, "FifteenMinuteRate")) {
-      brokerBytesOutFifteenMinuteRate.put(nodeId, rawMetric.value());
+      brokerBytesOutFifteenMinuteRate.put(nodeId, BigDecimal.valueOf(metric.value));
     }
   }
 
-  private void updateTopicsIOrates(RawMetric rawMetric) {
-    String name = rawMetric.name();
-    String topic = rawMetric.labels().get("topic");
-    if (topic != null
+  private void updateTopicsIOrates(MetricFamilySamples.Sample metric) {
+    String name = metric.name;
+    int topicLblIdx = metric.labelNames.indexOf("topic");
+    if (topicLblIdx >= 0
         && containsIgnoreCase(name, "BrokerTopicMetrics")
         && endsWithIgnoreCase(name, "FifteenMinuteRate")) {
-      String nameProperty = rawMetric.labels().get("name");
-      if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
-        bytesInFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
-      } else if ("BytesOutPerSec".equalsIgnoreCase(nameProperty)) {
-        bytesOutFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
+      String topic = metric.labelValues.get(topicLblIdx);
+      int nameLblIdx = metric.labelNames.indexOf("name");
+      if (nameLblIdx >= 0) {
+        var nameLblVal = metric.labelValues.get(nameLblIdx);
+        if ("BytesInPerSec".equalsIgnoreCase(nameLblVal)) {
+          BigDecimal val = BigDecimal.valueOf(metric.value);
+          bytesInFifteenMinuteRate.merge(topic, val, BigDecimal::add);
+        } else if ("BytesOutPerSec".equalsIgnoreCase(nameLblVal)) {
+          BigDecimal val = BigDecimal.valueOf(metric.value);
+          bytesOutFifteenMinuteRate.merge(topic, val, BigDecimal::add);
+        }
       }
     }
   }

+ 4 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java

@@ -63,18 +63,18 @@ public class MetricsScrapping {
 
   public Mono<Metrics> scrape(ScrapedClusterState clusterState, Collection<Node> nodes) {
     Mono<InferredMetrics> inferred = inferredMetricsScraper.scrape(clusterState);
-    Mono<? extends PerBrokerScrapedMetrics> external = scrapeExternal(nodes);
+    Mono<PerBrokerScrapedMetrics> external = scrapeExternal(nodes);
     return inferred.zipWith(
         external,
         (inf, ext) -> Metrics.builder()
-            .ioRates(ext.ioRates())
-            .perBrokerScrapedMetrics(ext.getPerBrokerMetrics())
             .inferredMetrics(inf)
+            .ioRates(ext.ioRates())
+            .perBrokerScrapedMetrics(ext.perBrokerMetrics())
             .build()
     );
   }
 
-  private Mono<? extends PerBrokerScrapedMetrics> scrapeExternal(Collection<Node> nodes) {
+  private Mono<PerBrokerScrapedMetrics> scrapeExternal(Collection<Node> nodes) {
     if (jmxMetricsScraper != null) {
       return jmxMetricsScraper.scrape(nodes);
     }

+ 4 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/PerBrokerScrapedMetrics.java

@@ -1,19 +1,14 @@
 package com.provectus.kafka.ui.service.metrics.scrape;
 
+import static io.prometheus.client.Collector.MetricFamilySamples;
+
 import com.provectus.kafka.ui.model.Metrics;
-import io.prometheus.client.Collector;
 import java.util.List;
 import java.util.Map;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-@RequiredArgsConstructor
-public class PerBrokerScrapedMetrics {
 
-  @Getter
-  private final Map<Integer, List<Collector.MetricFamilySamples>> perBrokerMetrics;
+public record PerBrokerScrapedMetrics(Map<Integer, List<MetricFamilySamples>> perBrokerMetrics) {
 
-  public static PerBrokerScrapedMetrics empty() {
+  static PerBrokerScrapedMetrics empty() {
     return new PerBrokerScrapedMetrics(Map.of());
   }
 

+ 11 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java

@@ -3,7 +3,6 @@ package com.provectus.kafka.ui.service.metrics.scrape;
 import static com.provectus.kafka.ui.model.InternalLogDirStats.*;
 import static com.provectus.kafka.ui.service.ReactiveAdminClient.*;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Table;
 import com.provectus.kafka.ui.model.InternalLogDirStats;
 import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
@@ -30,6 +29,11 @@ import reactor.core.publisher.Mono;
 @Value
 public class ScrapedClusterState {
 
+  Instant scrapeFinishedAt;
+  Map<Integer, NodeState> nodesStates;
+  Map<String, TopicState> topicStates;
+  Map<String, ConsumerGroupState> consumerGroupsStates;
+
   public record NodeState(int id,
                           Node node,
                           @Nullable SegmentStats segmentStats,
@@ -52,21 +56,15 @@ public class ScrapedClusterState {
       Map<TopicPartition, Long> committedOffsets) {
   }
 
-  Instant scrapeStartTime;
-  Map<Integer, NodeState> nodesStates;
-  Map<String, TopicState> topicStates;
-  Map<String, ConsumerGroupState> consumerGroupsStates;
-
   public static ScrapedClusterState empty() {
     return ScrapedClusterState.builder()
-        .scrapeStartTime(Instant.now())
+        .scrapeFinishedAt(Instant.now())
         .nodesStates(Map.of())
         .topicStates(Map.of())
         .consumerGroupsStates(Map.of())
         .build();
   }
 
-
   public ScrapedClusterState updateTopics(Map<String, TopicDescription> descriptions,
                                           Map<String, List<ConfigEntry>> configs,
                                           InternalPartitionsOffsets partitionsOffsets) {
@@ -92,7 +90,7 @@ public class ScrapedClusterState {
       );
     });
     return toBuilder()
-        .topicStates(ImmutableMap.copyOf(updatedTopicStates))
+        .topicStates(updatedTopicStates)
         .build();
   }
 
@@ -100,7 +98,7 @@ public class ScrapedClusterState {
     var newTopicStates = new HashMap<>(topicStates);
     newTopicStates.remove(topic);
     return toBuilder()
-        .topicStates(ImmutableMap.copyOf(newTopicStates))
+        .topicStates(newTopicStates)
         .build();
   }
 
@@ -180,9 +178,9 @@ public class ScrapedClusterState {
 
     return new ScrapedClusterState(
         Instant.now(),
-        Map.copyOf(nodesStates),
-        Map.copyOf(topicStates),
-        Map.copyOf(consumerGroupsStates)
+        nodesStates,
+        topicStates,
+        consumerGroupsStates
     );
   }
 

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetrics.java

@@ -5,7 +5,6 @@ import static io.prometheus.client.Collector.MetricFamilySamples;
 import java.util.List;
 import java.util.stream.Stream;
 
-//TODO: maybe rename to state-based metrics?
 public class InferredMetrics {
 
   private final List<MetricFamilySamples> metrics;

+ 5 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraper.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service.metrics.scrape.inferred;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
 import io.prometheus.client.Collector.MetricFamilySamples;
 import io.prometheus.client.GaugeMetricFamily;
@@ -7,6 +8,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.MemberDescription;
@@ -20,19 +22,13 @@ public class InferredMetricsScraper {
   private ScrapedClusterState prevState = null;
 
   public synchronized Mono<InferredMetrics> scrape(ScrapedClusterState newState) {
-    if (prevState == null) {
-      prevState = newState;
-      return Mono.just(InferredMetrics.empty());
-    }
     var inferred = infer(prevState, newState);
     prevState = newState;
     return Mono.just(inferred);
   }
 
-  private static InferredMetrics infer(ScrapedClusterState prevState,
-                                       ScrapedClusterState newState) {
-
-    log.debug("Scraped cluster state: {}", newState); //TODO: rm
+  @VisibleForTesting
+  static InferredMetrics infer(@Nullable ScrapedClusterState prevState, ScrapedClusterState newState) {
     var registry = new MetricsRegistry();
     fillNodesMetrics(registry, newState);
     fillTopicMetrics(registry, newState);
@@ -51,11 +47,8 @@ public class InferredMetricsScraper {
                List<String> lbls,
                List<String> lblVals,
                Number value) {
-      var found = metrics.get(name);
       GaugeMetricFamily gauge;
-      if (found != null) {
-        gauge = (GaugeMetricFamily) found;
-      } else {
+      if ((gauge = (GaugeMetricFamily) metrics.get(name)) == null) {
         gauge = new GaugeMetricFamily(name, help, lbls);
         metrics.put(name, gauge);
       }

+ 17 - 16
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxMetricsRetriever.java

@@ -18,12 +18,13 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.common.Node;
+import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 
-@Service
+@Component //need to be a component, since
 @Slf4j
 public class JmxMetricsRetriever implements Closeable {
 
@@ -52,25 +53,25 @@ public class JmxMetricsRetriever implements Closeable {
         .subscribeOn(Schedulers.boundedElastic());
   }
 
-  private boolean isSslJmxEndpoint(MetricsScrapeProperties metricsScrapeProperties) {
-    return metricsScrapeProperties.getKeystoreConfig() != null
-        && metricsScrapeProperties.getKeystoreConfig().getKeystoreLocation() != null;
+  private boolean isSslJmxEndpoint(MetricsScrapeProperties scrapeProperties) {
+    return scrapeProperties.getKeystoreConfig() != null
+        && scrapeProperties.getKeystoreConfig().getKeystoreLocation() != null;
   }
 
   @SneakyThrows
-  private List<RawMetric> retrieveSync(MetricsScrapeProperties metricsConfig, Node node) {
-    String jmxUrl = JMX_URL + node.host() + ":" + metricsConfig.getPort() + "/" + JMX_SERVICE_TYPE;
+  private List<RawMetric> retrieveSync(MetricsScrapeProperties scrapeProperties, Node node) {
+    String jmxUrl = JMX_URL + node.host() + ":" + scrapeProperties.getPort() + "/" + JMX_SERVICE_TYPE;
     log.debug("Collection JMX metrics for {}", jmxUrl);
     List<RawMetric> result = new ArrayList<>();
-    withJmxConnector(jmxUrl, metricsConfig, jmxConnector -> getMetricsFromJmx(jmxConnector, result));
+    withJmxConnector(jmxUrl, scrapeProperties, jmxConnector -> getMetricsFromJmx(jmxConnector, result));
     log.debug("{} metrics collected for {}", result.size(), jmxUrl);
     return result;
   }
 
   private void withJmxConnector(String jmxUrl,
-                                MetricsScrapeProperties metricsConfig,
+                                MetricsScrapeProperties scrapeProperties,
                                 Consumer<JMXConnector> consumer) {
-    var env = prepareJmxEnvAndSetThreadLocal(metricsConfig);
+    var env = prepareJmxEnvAndSetThreadLocal(scrapeProperties);
     try (JMXConnector connector = JMXConnectorFactory.newJMXConnector(new JMXServiceURL(jmxUrl), env)) {
       try {
         connector.connect(env);
@@ -86,11 +87,11 @@ public class JmxMetricsRetriever implements Closeable {
     }
   }
 
-  private Map<String, Object> prepareJmxEnvAndSetThreadLocal(MetricsScrapeProperties metricsConfig) {
+  private Map<String, Object> prepareJmxEnvAndSetThreadLocal(MetricsScrapeProperties scrapeProperties) {
     Map<String, Object> env = new HashMap<>();
-    if (isSslJmxEndpoint(metricsConfig)) {
-      var truststoreConfig = metricsConfig.getTruststoreConfig();
-      var keystoreConfig = metricsConfig.getKeystoreConfig();
+    if (isSslJmxEndpoint(scrapeProperties)) {
+      var truststoreConfig = scrapeProperties.getTruststoreConfig();
+      var keystoreConfig = scrapeProperties.getKeystoreConfig();
       JmxSslSocketFactory.setSslContextThreadLocal(
           truststoreConfig != null ? truststoreConfig.getTruststoreLocation() : null,
           truststoreConfig != null ? truststoreConfig.getTruststorePassword() : null,
@@ -100,11 +101,11 @@ public class JmxMetricsRetriever implements Closeable {
       JmxSslSocketFactory.editJmxConnectorEnv(env);
     }
 
-    if (StringUtils.isNotEmpty(metricsConfig.getUsername())
-        && StringUtils.isNotEmpty(metricsConfig.getPassword())) {
+    if (StringUtils.isNotEmpty(scrapeProperties.getUsername())
+        && StringUtils.isNotEmpty(scrapeProperties.getPassword())) {
       env.put(
           JMXConnector.CREDENTIALS,
-          new String[] {metricsConfig.getUsername(), metricsConfig.getPassword()}
+          new String[] {scrapeProperties.getUsername(), scrapeProperties.getPassword()}
       );
     }
     return env;

+ 6 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/jmx/JmxSslSocketFactory.java

@@ -61,9 +61,8 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory {
     } catch (Exception e) {
       log.error("----------------------------------");
       log.error("SSL can't be enabled for JMX retrieval. "
-              + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}",
+              + "Make sure your java app is running with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}",
           e.getMessage());
-      log.trace("SSL can't be enabled for JMX retrieval", e);
       log.error("----------------------------------");
     }
     SSL_JMX_SUPPORTED = sslJmxSupported;
@@ -75,7 +74,7 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory {
 
   private static final ThreadLocal<Ssl> SSL_CONTEXT_THREAD_LOCAL = new ThreadLocal<>();
 
-  private static final Map<HostAndPort, javax.net.ssl.SSLSocketFactory> CACHED_FACTORIES = new ConcurrentHashMap<>();
+  private static final Map<HostAndPort, javax.net.ssl.SSLSocketFactory> CACHED_SSL_FACTORIES = new ConcurrentHashMap<>();
 
   private record HostAndPort(String host, int port) {
   }
@@ -96,7 +95,7 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory {
 
   // should be called when (host:port) -> factory cache should be invalidated (ex. on app config reload)
   public static void clearFactoriesCache() {
-    CACHED_FACTORIES.clear();
+    CACHED_SSL_FACTORIES.clear();
   }
 
   public static void clearThreadLocalContext() {
@@ -156,11 +155,11 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory {
   @Override
   public Socket createSocket(String host, int port) throws IOException {
     var hostAndPort = new HostAndPort(host, port);
-    if (CACHED_FACTORIES.containsKey(hostAndPort)) {
-      return CACHED_FACTORIES.get(hostAndPort).createSocket(host, port);
+    if (CACHED_SSL_FACTORIES.containsKey(hostAndPort)) {
+      return CACHED_SSL_FACTORIES.get(hostAndPort).createSocket(host, port);
     } else if (threadLocalContextSet()) {
       var factory = createFactoryFromThreadLocalCtx();
-      CACHED_FACTORIES.put(hostAndPort, factory);
+      CACHED_SSL_FACTORIES.put(hostAndPort, factory);
       return factory.createSocket(host, port);
     }
     return defaultSocketFactory.createSocket(host, port);

+ 0 - 47
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParser.java

@@ -1,47 +0,0 @@
-package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
-
-import com.provectus.kafka.ui.service.metrics.RawMetric;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.math.NumberUtils;
-
-@Slf4j
-public class PrometheusEndpointMetricsParser {
-
-  /**
-   * Matches openmetrics format. For example, string:
-   * kafka_server_BrokerTopicMetrics_FiveMinuteRate{name="BytesInPerSec",topic="__consumer_offsets",} 16.94886650744339
-   * will produce:
-   * name=kafka_server_BrokerTopicMetrics_FiveMinuteRate
-   * value=16.94886650744339
-   * labels={name="BytesInPerSec", topic="__consumer_offsets"}",
-   */
-  private static final Pattern PATTERN = Pattern.compile(
-      "(?<metricName>^\\w+)([ \t]*\\{*(?<properties>.*)}*)[ \\t]+(?<value>[\\d]+\\.?[\\d]+)?");
-
-  public static Optional<RawMetric> parse(String s) {
-    Matcher matcher = PATTERN.matcher(s);
-    if (matcher.matches()) {
-      String value = matcher.group("value");
-      String metricName = matcher.group("metricName");
-      if (metricName == null || !NumberUtils.isCreatable(value)) {
-        return Optional.empty();
-      }
-      var labels = Arrays.stream(matcher.group("properties").split(","))
-          .filter(str -> !"".equals(str))
-          .map(str -> str.split("="))
-          .filter(spit -> spit.length == 2)
-          .collect(Collectors.toUnmodifiableMap(
-              str -> str[0].trim(),
-              str -> str[1].trim().replace("\"", "")));
-
-      return Optional.of(RawMetric.create(metricName, labels, new BigDecimal(value)));
-    }
-    return Optional.empty();
-  }
-}

+ 289 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointParser.java

@@ -0,0 +1,289 @@
+package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
+
+import static io.prometheus.client.Collector.MetricFamilySamples.*;
+
+import com.google.common.base.Enums;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.Type;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+
+public class PrometheusEndpointParser {
+
+  // will be set if no TYPE provided (or it is unsupported)
+  private static final Type DEFAULT_TYPE = Type.GAUGE;
+
+  private PrometheusEndpointParser() {
+  }
+
+  private static class ParserContext {
+    final List<MetricFamilySamples> registered = new ArrayList<>();
+
+    String name;
+    String help;
+    Type type = null;
+    Set<String> allowedNames = new HashSet<>();
+    List<Sample> samples = new ArrayList<>();
+
+    void registerAndReset() {
+      if (!samples.isEmpty()) {
+        registered.add(new MetricFamilySamples(name, type, Optional.ofNullable(help).orElse(name), List.copyOf(samples)));
+      }
+      //resetting state:
+      name = null;
+      help = null;
+      type = null;
+      allowedNames.clear();
+      samples.clear();
+    }
+  }
+
+  public static List<MetricFamilySamples> parse(Stream<String> lines) {
+    ParserContext context = new ParserContext();
+    lines.map(String::trim)
+        .filter(s -> !s.isBlank())
+        .forEach(line -> {
+          if (line.charAt(0) == '#') {
+            String[] parts = line.split("[ \t]+", 4);
+            if (parts.length >= 3) {
+              switch (parts[1]) {
+                case "HELP" -> processHelp(context, parts);
+                case "TYPE" -> processType(context, parts);
+                default -> { /* probably a comment */ }
+              }
+            }
+          } else {
+            processSample(context, line);
+          }
+        });
+    context.registerAndReset();
+    return context.registered;
+  }
+
+  private static void processHelp(ParserContext context, String[] parts) {
+    if (!parts[2].equals(context.name)) {
+      // starting new metric family - need to register (if possible) prev one
+      context.registerAndReset();
+      context.name = parts[2];
+      context.type = DEFAULT_TYPE;
+      context.allowedNames.add(context.name);
+    }
+    if (parts.length == 4) {
+      context.help = unescapeHelp(parts[3]);
+    }
+  }
+
+  private static void processType(ParserContext context, String[] parts) {
+    if (!parts[2].equals(context.name)) {
+      // starting new metric family - need to register (if possible) prev one
+      context.registerAndReset();
+      context.name = parts[2];
+    }
+
+    context.type = Enums.getIfPresent(Type.class, parts[3].toUpperCase()).or(DEFAULT_TYPE);
+    switch (context.type) {
+      case SUMMARY -> {
+        context.allowedNames.add(context.name);
+        context.allowedNames.add(context.name + "_count");
+        context.allowedNames.add(context.name + "_sum");
+        context.allowedNames.add(context.name + "_created");
+      }
+      case HISTOGRAM -> {
+        context.allowedNames.add(context.name + "_count");
+        context.allowedNames.add(context.name + "_sum");
+        context.allowedNames.add(context.name + "_bucket");
+        context.allowedNames.add(context.name + "_created");
+      }
+      case COUNTER -> {
+        context.allowedNames.add(context.name);
+        context.allowedNames.add(context.name + "_total");
+        context.allowedNames.add(context.name + "_created");
+      }
+      case INFO -> {
+        context.allowedNames.add(context.name);
+        context.allowedNames.add(context.name + "_info");
+      }
+      default -> context.allowedNames.add(context.name);
+    }
+  }
+
+  private static void processSample(ParserContext context, String line) {
+    parseSampleLine(line).ifPresent(sample -> {
+      if (!context.allowedNames.contains(sample.name)) {
+        // starting new metric family - need to register (if possible) prev one
+        context.registerAndReset();
+        context.name = sample.name;
+        context.type = DEFAULT_TYPE;
+        context.allowedNames.add(sample.name);
+      }
+      context.samples.add(sample);
+    });
+  }
+
+  private static String unescapeHelp(String text) {
+    // algorithm from https://github.com/prometheus/client_python/blob/a2dae6caeaf3c300db416ba10a2a3271693addd4/prometheus_client/parser.py
+    if (text == null || !text.contains("\\")) {
+      return text;
+    }
+    StringBuilder result = new StringBuilder();
+    boolean slash = false;
+    for (int c = 0; c < text.length(); c++) {
+      char charAt = text.charAt(c);
+      if (slash) {
+        if (charAt == '\\') {
+          result.append('\\');
+        } else if (charAt == 'n') {
+          result.append('\n');
+        } else {
+          result.append('\\').append(charAt);
+        }
+        slash = false;
+      } else {
+        if (charAt == '\\') {
+          slash = true;
+        } else {
+          result.append(charAt);
+        }
+      }
+    }
+    if (slash) {
+      result.append("\\");
+    }
+    return result.toString();
+  }
+
+  //returns empty if line is not valid sample string
+  private static Optional<Sample> parseSampleLine(String line) {
+    // algorithm copied from https://github.com/prometheus/client_python/blob/a2dae6caeaf3c300db416ba10a2a3271693addd4/prometheus_client/parser.py
+    StringBuilder name = new StringBuilder();
+    StringBuilder labelname = new StringBuilder();
+    StringBuilder labelvalue = new StringBuilder();
+    StringBuilder value = new StringBuilder();
+    List<String> lblNames = new ArrayList<>();
+    List<String> lblVals = new ArrayList<>();
+
+    String state = "name";
+
+    for (int c = 0; c < line.length(); c++) {
+      char charAt = line.charAt(c);
+      if (state.equals("name")) {
+        if (charAt == '{') {
+          state = "startoflabelname";
+        } else if (charAt == ' ' || charAt == '\t') {
+          state = "endofname";
+        } else {
+          name.append(charAt);
+        }
+      } else if (state.equals("endofname")) {
+        if (charAt == ' ' || charAt == '\t') {
+          // do nothing
+        } else if (charAt == '{') {
+          state = "startoflabelname";
+        } else {
+          value.append(charAt);
+          state = "value";
+        }
+      } else if (state.equals("startoflabelname")) {
+        if (charAt == ' ' || charAt == '\t') {
+          // do nothing
+        } else if (charAt == '}') {
+          state = "endoflabels";
+        } else {
+          labelname.append(charAt);
+          state = "labelname";
+        }
+      } else if (state.equals("labelname")) {
+        if (charAt == '=') {
+          state = "labelvaluequote";
+        } else if (charAt == '}') {
+          state = "endoflabels";
+        } else if (charAt == ' ' || charAt == '\t') {
+          state = "labelvalueequals";
+        } else {
+          labelname.append(charAt);
+        }
+      } else if (state.equals("labelvalueequals")) {
+        if (charAt == '=') {
+          state = "labelvaluequote";
+        } else if (charAt == ' ' || charAt == '\t') {
+          // do nothing
+        } else {
+          return Optional.empty();
+        }
+      } else if (state.equals("labelvaluequote")) {
+        if (charAt == '"') {
+          state = "labelvalue";
+        } else if (charAt == ' ' || charAt == '\t') {
+          // do nothing
+        } else {
+          return Optional.empty();
+        }
+      } else if (state.equals("labelvalue")) {
+        if (charAt == '\\') {
+          state = "labelvalueslash";
+        } else if (charAt == '"') {
+          lblNames.add(labelname.toString());
+          lblVals.add(labelvalue.toString());
+          labelname.setLength(0);
+          labelvalue.setLength(0);
+          state = "nextlabel";
+        } else {
+          labelvalue.append(charAt);
+        }
+      } else if (state.equals("labelvalueslash")) {
+        state = "labelvalue";
+        if (charAt == '\\') {
+          labelvalue.append('\\');
+        } else if (charAt == 'n') {
+          labelvalue.append('\n');
+        } else if (charAt == '"') {
+          labelvalue.append('"');
+        } else {
+          labelvalue.append('\\').append(charAt);
+        }
+      } else if (state.equals("nextlabel")) {
+        if (charAt == ',') {
+          state = "labelname";
+        } else if (charAt == '}') {
+          state = "endoflabels";
+        } else if (charAt == ' ' || charAt == '\t') {
+          // do nothing
+        } else {
+          return Optional.empty();
+        }
+      } else if (state.equals("endoflabels")) {
+        if (charAt == ' ' || charAt == '\t') {
+          // do nothing
+        } else {
+          value.append(charAt);
+          state = "value";
+        }
+      } else if (state.equals("value")) {
+        if (charAt == ' ' || charAt == '\t') {
+          break; // timestamps are NOT supported - ignoring
+        } else {
+          value.append(charAt);
+        }
+      }
+    }
+    return Optional.of(new Sample(name.toString(), lblNames, lblVals, parseDouble(value.toString())));
+  }
+
+  private static double parseDouble(String valueString) {
+    if (valueString.equalsIgnoreCase("NaN")) {
+      return Double.NaN;
+    } else if (valueString.equalsIgnoreCase("+Inf")) {
+      return Double.POSITIVE_INFINITY;
+    } else if (valueString.equalsIgnoreCase("-Inf")) {
+      return Double.NEGATIVE_INFINITY;
+    }
+    return Double.parseDouble(valueString);
+  }
+
+
+}
+

+ 26 - 40
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusMetricsRetriever.java

@@ -2,67 +2,53 @@ package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
 
 import static io.prometheus.client.Collector.*;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
 import com.provectus.kafka.ui.model.MetricsScrapeProperties;
-import com.provectus.kafka.ui.service.metrics.RawMetric;
 import com.provectus.kafka.ui.util.WebClientConfigurator;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.Node;
-import org.springframework.stereotype.Service;
 import org.springframework.util.unit.DataSize;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.util.UriComponentsBuilder;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-@Service
 @Slf4j
 class PrometheusMetricsRetriever {
 
   private static final String METRICS_ENDPOINT_PATH = "/metrics";
   private static final int DEFAULT_EXPORTER_PORT = 11001;
 
-  Mono<List<MetricFamilySamples>> retrieve(MetricsScrapeProperties metricsConfig, Node node) {
-    log.debug("Retrieving metrics from prometheus exporter: {}:{}", node.host(), metricsConfig.getPort());
+  private final int port;
+  private final boolean sslEnabled;
+  private final WebClient webClient;
 
-    var webClient = new WebClientConfigurator()
+  PrometheusMetricsRetriever(MetricsScrapeProperties scrapeProperties) {
+    this.port = Optional.ofNullable(scrapeProperties.getPort()).orElse(DEFAULT_EXPORTER_PORT);
+    this.sslEnabled = scrapeProperties.isSsl() || scrapeProperties.getKeystoreConfig() != null;
+    this.webClient = new WebClientConfigurator()
         .configureBufferSize(DataSize.ofMegabytes(20))
-        .configureBasicAuth(metricsConfig.getUsername(), metricsConfig.getPassword())
-        .configureSsl(metricsConfig.getTruststoreConfig(), metricsConfig.getKeystoreConfig())
+        .configureBasicAuth(scrapeProperties.getUsername(), scrapeProperties.getPassword())
+        .configureSsl(scrapeProperties.getTruststoreConfig(), scrapeProperties.getKeystoreConfig())
         .build();
-
-    return retrieve(webClient, node.host(), metricsConfig)
-        .collectList()
-        .map(metrics -> RawMetric.groupIntoMFS(metrics).toList());
   }
 
-  @VisibleForTesting
-  Flux<RawMetric> retrieve(WebClient webClient, String host, MetricsScrapeProperties metricsConfig) {
-    int port = Optional.ofNullable(metricsConfig.getPort()).orElse(DEFAULT_EXPORTER_PORT);
-    boolean sslEnabled = metricsConfig.isSsl() || metricsConfig.getKeystoreConfig() != null;
-    var request = webClient.get()
-        .uri(UriComponentsBuilder.newInstance()
-            .scheme(sslEnabled ? "https" : "http")
-            .host(host)
-            .port(port)
-            .path(METRICS_ENDPOINT_PATH).build().toUri());
-
-    WebClient.ResponseSpec responseSpec = request.retrieve();
-    return responseSpec.bodyToMono(String.class)
+  Mono<List<MetricFamilySamples>> retrieve(String host) {
+    log.debug("Retrieving metrics from prometheus endpoint: {}:{}", host, port);
+
+    var uri = UriComponentsBuilder.newInstance()
+        .scheme(sslEnabled ? "https" : "http")
+        .host(host)
+        .port(port)
+        .path(METRICS_ENDPOINT_PATH)
+        .build()
+        .toUri();
+
+    return webClient.get()
+        .uri(uri)
+        .retrieve()
+        .bodyToMono(String.class)
         .doOnError(e -> log.error("Error while getting metrics from {}", host, e))
-        .onErrorResume(th -> Mono.empty())
-        .flatMapMany(body ->
-            Flux.fromStream(
-                Arrays.stream(body.split("\\n"))
-                    .filter(str -> !Strings.isNullOrEmpty(str) && !str.startsWith("#")) // skipping comments strings
-                    .map(PrometheusEndpointMetricsParser::parse)
-                    .filter(Optional::isPresent)
-                    .map(Optional::get)
-            )
-        );
+        .map(body -> PrometheusEndpointParser.parse(body.lines()))
+        .onErrorResume(th -> Mono.just(List.of()));
   }
 }

+ 9 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusScraper.java

@@ -1,31 +1,30 @@
 package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
 
+import static io.prometheus.client.Collector.MetricFamilySamples;
+
 import com.provectus.kafka.ui.model.MetricsScrapeProperties;
 import com.provectus.kafka.ui.service.metrics.scrape.PerBrokerScrapedMetrics;
-import io.prometheus.client.Collector;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.kafka.common.Node;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
 public class PrometheusScraper {
 
-  private final static PrometheusMetricsRetriever RETRIEVER = new PrometheusMetricsRetriever();
-
-  private final MetricsScrapeProperties metricsConfig;
+  private final PrometheusMetricsRetriever retriever;
 
-  public PrometheusScraper(MetricsScrapeProperties metricsConfig) {
-    this.metricsConfig = metricsConfig;
+  public PrometheusScraper(MetricsScrapeProperties scrapeProperties) {
+    this.retriever = new PrometheusMetricsRetriever(scrapeProperties);
   }
 
   public Mono<PerBrokerScrapedMetrics> scrape(Collection<Node> clusterNodes) {
-    Mono<Map<Integer, List<Collector.MetricFamilySamples>>> collected = Flux.fromIterable(clusterNodes)
-        .flatMap(n -> RETRIEVER.retrieve(metricsConfig, n).map(metrics -> Tuples.of(n, metrics)))
-        .collectMap(t -> t.getT1().id(), t -> t.getT2());
-
+    Mono<Map<Integer, List<MetricFamilySamples>>> collected = Flux.fromIterable(clusterNodes)
+        .flatMap(n -> retriever.retrieve(n.host()).map(metrics -> Tuples.of(n, metrics)))
+        .collectMap(t -> t.getT1().id(), Tuple2::getT2);
     return collected.map(PerBrokerScrapedMetrics::new);
   }
 }

+ 102 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PrometheusEndpointUtil.java

@@ -0,0 +1,102 @@
+package com.provectus.kafka.ui.util;
+
+import static io.prometheus.client.Collector.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+import com.provectus.kafka.ui.model.Metrics;
+import io.prometheus.client.exporter.common.TextFormat;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.ResponseEntity;
+
+public final class PrometheusEndpointUtil {
+
+  private PrometheusEndpointUtil() {
+  }
+
+  public static ResponseEntity<String> exposeAllMetrics(Map<String, Metrics> clustersMetrics) {
+    return constructResponse(getSummarizedMetricsWithClusterLbl(clustersMetrics));
+  }
+
+  public static ResponseEntity<String> exposeClusterMetrics(Metrics clusterMetrics) {
+    return constructResponse(clusterMetrics.getSummarizedMetrics());
+  }
+
+  public static ResponseEntity<String> exposeBrokerMetrics(Metrics clusterMetrics, int brokerId) {
+    //TODO: discuss - do we need to append broker_id lbl ?
+    return constructResponse(
+        clusterMetrics
+            .getPerBrokerScrapedMetrics()
+            .getOrDefault(brokerId, List.of())
+            .stream()
+    );
+  }
+
+  private static Stream<MetricFamilySamples> getSummarizedMetricsWithClusterLbl(Map<String, Metrics> clustersMetrics) {
+    return clustersMetrics.entrySet()
+        .stream()
+        .flatMap(e -> e.getValue()
+            .getSummarizedMetrics()
+            .map(mfs -> addLbl(mfs, "cluster", e.getKey())))
+        // merging MFS with same name, keeping order
+        .collect(Collectors.toMap(mfs -> mfs.name, mfs -> mfs, PrometheusEndpointUtil::concatSamples, LinkedHashMap::new))
+        .values()
+        .stream();
+  }
+
+  private static MetricFamilySamples concatSamples(MetricFamilySamples mfs1,
+                                                   MetricFamilySamples mfs2) {
+    return new MetricFamilySamples(
+        mfs1.name,
+        mfs1.unit,
+        mfs1.type,
+        mfs1.help,
+        Stream.concat(mfs1.samples.stream(), mfs2.samples.stream()).toList()
+    );
+  }
+
+  private static MetricFamilySamples addLbl(MetricFamilySamples mfs, String lblName, String lblVal) {
+    return new MetricFamilySamples(
+        mfs.name, mfs.unit, mfs.type, mfs.help,
+        mfs.samples.stream()
+            .map(sample ->
+                new MetricFamilySamples.Sample(
+                    sample.name,
+                    prependToList(sample.labelNames, lblName),
+                    prependToList(sample.labelValues, lblVal),
+                    sample.value
+                )).toList()
+    );
+  }
+
+  private static <T> List<T> prependToList(List<T> lst, T toPrepend) {
+    var result = new ArrayList<T>(lst.size() + 1);
+    result.add(toPrepend);
+    result.addAll(lst);
+    return result;
+  }
+
+  @VisibleForTesting
+  @SneakyThrows
+  public static ResponseEntity<String> constructResponse(Stream<MetricFamilySamples> metrics) {
+    StringWriter writer = new StringWriter();
+    TextFormat.writeOpenMetrics100(writer, Iterators.asEnumeration(metrics.iterator()));
+
+    HttpHeaders responseHeaders = new HttpHeaders();
+    responseHeaders.set(HttpHeaders.CONTENT_TYPE, TextFormat.CONTENT_TYPE_OPENMETRICS_100);
+
+    return ResponseEntity
+        .ok()
+        .headers(responseHeaders)
+        .body(writer.toString());
+  }
+
+}

+ 6 - 16
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScannerTest.java

@@ -1,16 +1,14 @@
 package com.provectus.kafka.ui.service.metrics.scrape;
 
 import static io.prometheus.client.Collector.MetricFamilySamples;
+import static java.util.Arrays.stream;
+import static java.util.stream.Collectors.*;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.service.metrics.RawMetric;
-import com.provectus.kafka.ui.service.metrics.scrape.prometheus.PrometheusEndpointMetricsParser;
+import com.provectus.kafka.ui.service.metrics.scrape.prometheus.PrometheusEndpointParser;
 import java.math.BigDecimal;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
 import org.apache.kafka.common.Node;
 import org.junit.jupiter.api.Test;
 
@@ -63,23 +61,15 @@ class IoRatesMetricsScannerTest {
         .containsEntry(2, new BigDecimal("20.0"));
   }
 
+  @SafeVarargs
   private void populateWith(Map.Entry<Integer, List<MetricFamilySamples>>... entries) {
     ioRatesMetricsScanner = new IoRatesMetricsScanner(
-        Arrays.stream(entries).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
+        stream(entries).collect(toMap(Map.Entry::getKey, Map.Entry::getValue))
     );
   }
 
   private Map.Entry<Integer, List<MetricFamilySamples>> nodeMetrics(Node n, String... prometheusMetrics) {
-    return Map.entry(
-        n.id(),
-        RawMetric.groupIntoMFS(
-            Arrays.stream(prometheusMetrics)
-                .map(PrometheusEndpointMetricsParser::parse)
-                .filter(Optional::isPresent)
-                .map(Optional::get)
-                .toList()
-        ).toList()
-    );
+    return Map.entry(n.id(), PrometheusEndpointParser.parse(stream(prometheusMetrics)));
   }
 
 }

+ 0 - 31
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointMetricsParserTest.java

@@ -1,31 +0,0 @@
-package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import com.provectus.kafka.ui.service.metrics.RawMetric;
-import java.util.Map;
-import java.util.Optional;
-import org.junit.jupiter.api.Test;
-
-class PrometheusEndpointMetricsParserTest {
-
-  @Test
-  void test() {
-    String metricsString =
-        "kafka_server_BrokerTopicMetrics_FifteenMinuteRate"
-            + "{name=\"BytesOutPerSec\",topic=\"__confluent.support.metrics\",} 123.1234";
-
-    Optional<RawMetric> parsedOpt = PrometheusEndpointMetricsParser.parse(metricsString);
-
-    assertThat(parsedOpt).hasValueSatisfying(metric -> {
-      assertThat(metric.name()).isEqualTo("kafka_server_BrokerTopicMetrics_FifteenMinuteRate");
-      assertThat(metric.value()).isEqualTo("123.1234");
-      assertThat(metric.labels()).containsExactlyEntriesOf(
-          Map.of(
-              "name", "BytesOutPerSec",
-              "topic", "__confluent.support.metrics"
-          ));
-    });
-  }
-
-}

+ 175 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusEndpointParserTest.java

@@ -0,0 +1,175 @@
+package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
+
+import static com.provectus.kafka.ui.service.metrics.scrape.prometheus.PrometheusEndpointParser.parse;
+import static io.prometheus.client.Collector.MetricFamilySamples;
+import static io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import static io.prometheus.client.Collector.Type;
+import static java.lang.Double.NaN;
+import static java.lang.Double.POSITIVE_INFINITY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.provectus.kafka.ui.util.PrometheusEndpointUtil;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
+import io.prometheus.client.Info;
+import io.prometheus.client.Summary;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.jupiter.api.Test;
+
+class PrometheusEndpointParserTest {
+
+  @Test
+  void parsesAllGeneratedMetricTypes() {
+    List<MetricFamilySamples> original = generateMfs();
+    String exposed = PrometheusEndpointUtil.constructResponse(original.stream()).getBody();
+    List<MetricFamilySamples> parsed = parse(exposed.lines());
+    assertThat(parsed).containsExactlyElementsOf(original);
+  }
+
+  @Test
+  void parsesMetricsFromPrometheusEndpointOutput() {
+    String expose = """
+            # HELP http_requests_total The total number of HTTP requests.
+            # TYPE http_requests_total counter
+            http_requests_total{method="post",code="200"} 1027 1395066363000
+            http_requests_total{method="post",code="400"}    3 1395066363000
+            # Minimalistic line:
+            metric_without_timestamp_and_labels 12.47
+
+            # A weird metric from before the epoch:
+            something_weird{problem="division by zero"} +Inf -3982045
+
+            # TYPE something_untyped untyped
+            something_untyped{} -123123
+
+            # HELP http_request_duration_seconds A histogram of the request duration.
+            # TYPE http_request_duration_seconds histogram
+            http_request_duration_seconds_bucket{le="0.05"} 24054
+            http_request_duration_seconds_bucket{le="0.1"} 33444
+            http_request_duration_seconds_bucket{le="0.2"} 100392
+            http_request_duration_seconds_bucket{le="0.5"} 129389
+            http_request_duration_seconds_bucket{le="1"} 133988
+            http_request_duration_seconds_bucket{le="+Inf"} 144320
+            http_request_duration_seconds_sum 53423
+            http_request_duration_seconds_count 144320
+        """;
+    var parsed = parse(expose.lines());
+    assertThat(parsed).contains(
+        new MetricFamilySamples(
+            "http_requests_total",
+            Type.COUNTER,
+            "The total number of HTTP requests.",
+            List.of(
+                new Sample("http_requests_total", List.of("method", "code"), List.of("post", "200"), 1027),
+                new Sample("http_requests_total", List.of("method", "code"), List.of("post", "400"), 3)
+            )
+        ),
+        new MetricFamilySamples(
+            "metric_without_timestamp_and_labels",
+            Type.GAUGE,
+            "metric_without_timestamp_and_labels",
+            List.of(new Sample("metric_without_timestamp_and_labels", List.of(), List.of(), 12.47))
+        ),
+        new MetricFamilySamples(
+            "something_weird",
+            Type.GAUGE,
+            "something_weird",
+            List.of(new Sample("something_weird", List.of("problem"), List.of("division by zero"), POSITIVE_INFINITY))
+        ),
+        new MetricFamilySamples(
+            "something_untyped",
+            Type.GAUGE,
+            "something_untyped",
+            List.of(new Sample("something_untyped", List.of(), List.of(), -123123))
+        ),
+        new MetricFamilySamples(
+            "http_request_duration_seconds",
+            Type.HISTOGRAM,
+            "A histogram of the request duration.",
+            List.of(
+                new Sample("http_request_duration_seconds_bucket", List.of("le"), List.of("0.05"), 24054),
+                new Sample("http_request_duration_seconds_bucket", List.of("le"), List.of("0.1"), 33444),
+                new Sample("http_request_duration_seconds_bucket", List.of("le"), List.of("0.2"), 100392),
+                new Sample("http_request_duration_seconds_bucket", List.of("le"), List.of("0.5"), 129389),
+                new Sample("http_request_duration_seconds_bucket", List.of("le"), List.of("1"), 133988),
+                new Sample("http_request_duration_seconds_bucket", List.of("le"), List.of("+Inf"), 144320),
+                new Sample("http_request_duration_seconds_sum", List.of(), List.of(), 53423),
+                new Sample("http_request_duration_seconds_count", List.of(), List.of(), 144320)
+            )
+        )
+    );
+  }
+
+  private List<MetricFamilySamples> generateMfs() {
+    CollectorRegistry collectorRegistry = new CollectorRegistry();
+
+    Gauge.build()
+        .name("test_gauge")
+        .help("help for gauge")
+        .register(collectorRegistry)
+        .set(42);
+
+    Info.build()
+        .name("test_info")
+        .help("help for info")
+        .register(collectorRegistry)
+        .info("branch", "HEAD", "version", "1.2.3", "revision", "e0704b");
+
+    Counter.build()
+        .name("counter_no_labels")
+        .help("help for counter no lbls")
+        .register(collectorRegistry)
+        .inc(111);
+
+    var counterWithLbls = Counter.build()
+        .name("counter_with_labels")
+        .help("help for counter with lbls")
+        .labelNames("lbl1", "lbl2")
+        .register(collectorRegistry);
+
+    counterWithLbls.labels("v1", "v2").inc(234);
+    counterWithLbls.labels("v11", "v22").inc(345);
+
+    var histogram = Histogram.build()
+        .name("test_hist")
+        .help("help for hist")
+        .linearBuckets(0.0, 1.0, 10)
+        .labelNames("lbl1", "lbl2")
+        .register(collectorRegistry);
+
+    var summary = Summary.build()
+        .name("test_summary")
+        .help("help for hist")
+        .labelNames("lbl1", "lbl2")
+        .register(collectorRegistry);
+
+    for (int i = 0; i < 30; i++) {
+      var val = ThreadLocalRandom.current().nextDouble(10.0);
+      histogram.labels("v1", "v2").observe(val);
+      summary.labels("v1", "v2").observe(val);
+    }
+
+    //emulating unknown type
+    collectorRegistry.register(new Collector() {
+      @Override
+      public List<MetricFamilySamples> collect() {
+        return List.of(
+            new MetricFamilySamples(
+                "test_unknown",
+                Type.UNKNOWN,
+                "help for unknown",
+                List.of(new Sample("test_unknown", List.of("l1"), List.of("v1"), 23432.0))
+            )
+        );
+      }
+    });
+    return Lists.newArrayList(Iterators.forEnumeration(collectorRegistry.metricFamilySamples()));
+  }
+
+}

+ 59 - 38
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/prometheus/PrometheusMetricsRetrieverTest.java

@@ -1,11 +1,13 @@
 package com.provectus.kafka.ui.service.metrics.scrape.prometheus;
 
+import static io.prometheus.client.Collector.MetricFamilySamples;
+import static io.prometheus.client.Collector.Type;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import com.provectus.kafka.ui.model.MetricsScrapeProperties;
-import com.provectus.kafka.ui.service.metrics.RawMetric;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.util.List;
-import java.util.Map;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import org.junit.jupiter.api.AfterEach;
@@ -16,8 +18,6 @@ import reactor.test.StepVerifier;
 
 class PrometheusMetricsRetrieverTest {
 
-  private final PrometheusMetricsRetriever retriever = new PrometheusMetricsRetriever();
-
   private final MockWebServer mockWebServer = new MockWebServer();
 
   @BeforeEach
@@ -35,11 +35,11 @@ class PrometheusMetricsRetrieverTest {
     var url = mockWebServer.url("/metrics");
     mockWebServer.enqueue(prepareResponse());
 
-    MetricsScrapeProperties metricsConfig = prepareMetricsConfig(url.port(), null, null);
+    MetricsScrapeProperties scrapeProperties = prepareMetricsConfig(url.port(), null, null);
+    var retriever = new PrometheusMetricsRetriever(scrapeProperties);
 
-    StepVerifier.create(retriever.retrieve(WebClient.create(), url.host(), metricsConfig))
-        .expectNextSequence(expectedRawMetrics())
-        // third metric should not be present, since it has "NaN" value
+    StepVerifier.create(retriever.retrieve(url.host()))
+        .assertNext(metrics -> assertThat(metrics).containsExactlyElementsOf(expectedMetrics()))
         .verifyComplete();
   }
 
@@ -48,51 +48,72 @@ class PrometheusMetricsRetrieverTest {
     var url = mockWebServer.url("/metrics");
     mockWebServer.enqueue(prepareResponse());
 
+    MetricsScrapeProperties scrapeProperties = prepareMetricsConfig(url.port(), "username", "password");
+    var retriever = new PrometheusMetricsRetriever(scrapeProperties);
 
-    MetricsScrapeProperties metricsConfig = prepareMetricsConfig(url.port(), "username", "password");
-
-    StepVerifier.create(retriever.retrieve(WebClient.create(), url.host(), metricsConfig))
-        .expectNextSequence(expectedRawMetrics())
-        // third metric should not be present, since it has "NaN" value
+    StepVerifier.create(retriever.retrieve(url.host()))
+        .assertNext(metrics -> assertThat(metrics).containsExactlyElementsOf(expectedMetrics()))
         .verifyComplete();
   }
 
-  MockResponse prepareResponse() {
-    // body copied from real jmx exporter
+  private MockResponse prepareResponse() {
+    // body copied from jmx exporter output
     return new MockResponse().setBody(
-        "# HELP kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate Attribute exposed for management \n"
-            + "# TYPE kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate untyped\n"
-            + "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate{name=\"RequestHandlerAvgIdlePercent\",} 0.898\n"
-            + "# HELP kafka_server_socket_server_metrics_request_size_avg The average size of requests sent. \n"
-            + "# TYPE kafka_server_socket_server_metrics_request_size_avg untyped\n"
-            + "kafka_server_socket_server_metrics_request_size_avg{listener=\"PLAIN\",networkProcessor=\"1\",} 101.1\n"
-            + "kafka_server_socket_server_metrics_request_size_avg{listener=\"PLAIN2\",networkProcessor=\"5\",} NaN"
+        """
+            # HELP kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate Attribute exposed for management
+            # TYPE kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate untyped
+            kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate{name="RequestHandlerAvgIdlePercent",} 0.898
+            # HELP kafka_server_socket_server_metrics_request_size_avg The average size of requests sent.
+            # TYPE kafka_server_socket_server_metrics_request_size_avg untyped
+            kafka_server_socket_server_metrics_request_size_avg{listener="PLAIN",networkProcessor="1",} 101.1
+            kafka_server_socket_server_metrics_request_size_avg{listener="PLAIN2",networkProcessor="5",} 202.2
+            """
     );
   }
 
-  MetricsScrapeProperties prepareMetricsConfig(Integer port, String username, String password) {
+  private MetricsScrapeProperties prepareMetricsConfig(Integer port, String username, String password) {
     return MetricsScrapeProperties.builder()
         .ssl(false)
         .port(port)
-        //.type(MetricsScrapeProperties.PROMETHEUS_METRICS_TYPE)
         .username(username)
         .password(password)
         .build();
   }
 
-  List<RawMetric> expectedRawMetrics() {
-
-    var firstMetric = RawMetric.create(
-        "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate",
-        Map.of("name", "RequestHandlerAvgIdlePercent"),
-        new BigDecimal("0.898")
-    );
-
-    var secondMetric = RawMetric.create(
-        "kafka_server_socket_server_metrics_request_size_avg",
-        Map.of("listener", "PLAIN", "networkProcessor", "1"),
-        new BigDecimal("101.1")
+  private List<MetricFamilySamples> expectedMetrics() {
+    return List.of(
+        new MetricFamilySamples(
+            "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate",
+            Type.GAUGE,
+            "Attribute exposed for management",
+            List.of(
+                new Sample(
+                    "kafka_server_KafkaRequestHandlerPool_FifteenMinuteRate",
+                    List.of("name"),
+                    List.of("RequestHandlerAvgIdlePercent"),
+                    0.898
+                )
+            )
+        ),
+        new MetricFamilySamples(
+            "kafka_server_socket_server_metrics_request_size_avg",
+            Type.GAUGE,
+            "The average size of requests sent.",
+            List.of(
+                new Sample(
+                    "kafka_server_socket_server_metrics_request_size_avg",
+                    List.of("listener", "networkProcessor"),
+                    List.of("PLAIN", "1"),
+                    101.1
+                ),
+                new Sample(
+                    "kafka_server_socket_server_metrics_request_size_avg",
+                    List.of("listener", "networkProcessor"),
+                    List.of("PLAIN2", "5"),
+                    202.2
+                )
+            )
+        )
     );
-    return List.of(firstMetric, secondMetric);
   }
 }