iliax 2 năm trước cách đây
mục cha
commit
dbab4e367d
35 tập tin đã thay đổi với 526 bổ sung322 xóa
  1. 68 1
      documentation/compose/jmx-exporter/kafka-broker.yml
  2. 5 0
      kafka-ui-api/pom.xml
  3. 28 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  4. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  5. 2 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  6. 22 12
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java
  7. 0 22
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java
  8. 29 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java
  9. 6 22
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java
  10. 28 28
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java
  11. 0 69
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java
  12. 0 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java
  13. 24 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java
  14. 87 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/MetricsScrapping.java
  15. 25 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/PerBrokerScrapedMetrics.java
  16. 41 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java
  17. 0 21
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java
  18. 0 10
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scraper.java
  19. 0 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java
  20. 26 15
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/WellKnownMetrics.java
  21. 8 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java
  22. 5 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java
  23. 3 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsFormatter.java
  24. 26 26
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsRetriever.java
  25. 29 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java
  26. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxSslSocketFactory.java
  27. 0 13
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prom/PrometheusScraper.java
  28. 2 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusEndpointMetricsParser.java
  29. 15 17
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusMetricsRetriever.java
  30. 31 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusScraper.java
  31. 0 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java
  32. 2 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java
  33. 1 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java
  34. 7 6
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java
  35. 4 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java

+ 68 - 1
documentation/compose/jmx-exporter/kafka-broker.yml

@@ -1,2 +1,69 @@
+lowercaseOutputName: true
 rules:
-  - pattern: ".*"
+  # Special cases and very specific rules
+  - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
+    name: kafka_server_$1_$2
+    type: GAUGE
+    labels:
+      clientId: '$3'
+      topic: '$4'
+      partition: '$5'
+  - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
+    name: kafka_server_$1_$2
+    type: GAUGE
+    labels:
+      clientId: '$3'
+      broker: '$4:$5'
+
+  - pattern: kafka.server<type=KafkaRequestHandlerPool, name=RequestHandlerAvgIdlePercent><>OneMinuteRate
+    name: kafka_server_kafkarequesthandlerpool_requesthandleravgidlepercent_total
+    type: GAUGE
+
+  - pattern: kafka.server<type=socket-server-metrics, clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
+    name: kafka_server_socketservermetrics_connections
+    type: GAUGE
+    labels:
+      client_software_name: '$1'
+      client_software_version: '$2'
+      listener: '$3'
+      network_processor: '$4'
+
+  - pattern: 'kafka.server<type=socket-server-metrics, listener=(.+), networkProcessor=(.+)><>(.+):'
+    name: kafka_server_socketservermetrics_$3
+    type: GAUGE
+    labels:
+      listener: '$1'
+      network_processor: '$2'
+
+  # Count and Value
+  - pattern: kafka.(.*)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>(Count|Value)
+    name: kafka_$1_$2_$3
+    labels:
+      '$4': '$5'
+      '$6': '$7'
+  - pattern: kafka.(.*)<type=(.+), name=(.+), (.+)=(.+)><>(Count|Value)
+    name: kafka_$1_$2_$3
+    labels:
+      '$4': '$5'
+  - pattern: kafka.(.*)<type=(.+), name=(.+)><>(Count|Value)
+    name: kafka_$1_$2_$3
+
+  # Percentile
+  - pattern: kafka.(.*)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
+    name: kafka_$1_$2_$3
+    type: GAUGE
+    labels:
+      '$4': '$5'
+      '$6': '$7'
+      quantile: '0.$8'
+  - pattern: kafka.(.*)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
+    name: kafka_$1_$2_$3
+    type: GAUGE
+    labels:
+      '$4': '$5'
+      quantile: '0.$6'
+  - pattern: kafka.(.*)<type=(.+), name=(.+)><>(\d+)thPercentile
+    name: kafka_$1_$2_$3
+    type: GAUGE
+    labels:
+      quantile: '0.$4'

+ 5 - 0
kafka-ui-api/pom.xml

@@ -244,6 +244,11 @@
             <artifactId>simpleclient_common</artifactId>
             <version>0.16.0</version>
         </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient_pushgateway</artifactId>
+            <version>0.16.0</version>
+        </dependency>
 
 
         <dependency>

+ 28 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -1,6 +1,6 @@
 package com.provectus.kafka.ui.config;
 
-import com.provectus.kafka.ui.model.MetricsConfig;
+import com.provectus.kafka.ui.model.MetricsScrapeProperties;
 import jakarta.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -63,7 +63,7 @@ public class ClustersProperties {
   }
 
   @Data
-  @ToString(exclude = "password")
+  @ToString(exclude = {"password", "keystorePassword"})
   public static class MetricsConfigData {
     String type;
     Integer port;
@@ -72,6 +72,31 @@ public class ClustersProperties {
     String password;
     String keystoreLocation;
     String keystorePassword;
+
+//    JmxScraper jmxScraper;
+//    PrometheusScraper prometheusScraper;
+//
+//    @Data
+//    @ToString(exclude = "password")
+//    public static class JmxScraper {
+//      Integer port;
+//      Boolean ssl;
+//      String username;
+//      String password;
+//      String keystoreLocation;
+//      String keystorePassword;
+//    }
+//
+//    @Data
+//    @ToString(exclude = "password")
+//    public static class PrometheusScraper {
+//      Integer port;
+//      Boolean ssl;
+//      String username;
+//      String password;
+//      String keystoreLocation;
+//      String keystorePassword;
+//    }
   }
 
   @Data
@@ -155,7 +180,7 @@ public class ClustersProperties {
   private void setMetricsDefaults() {
     for (Cluster cluster : clusters) {
       if (cluster.getMetrics() != null && !StringUtils.hasText(cluster.getMetrics().getType())) {
-        cluster.getMetrics().setType(MetricsConfig.JMX_METRICS_TYPE);
+        cluster.getMetrics().setType(MetricsScrapeProperties.JMX_METRICS_TYPE);
       }
     }
   }

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -52,6 +52,7 @@ public interface ClusterMapper {
 
   ClusterStatsDTO toClusterStats(InternalClusterState clusterState);
 
+  @Deprecated
   default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
     return new ClusterMetricsDTO()
         .items(metrics.getSummarizedMetrics().map(this::convert).collect(Collectors.toList()));

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
 import com.provectus.kafka.ui.service.masking.DataMasking;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.MetricsScrapping;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.util.ReactiveFailover;
 import java.util.Map;
@@ -25,10 +26,10 @@ public class KafkaCluster {
   private final String bootstrapServers;
   private final Properties properties;
   private final boolean readOnly;
-  private final MetricsConfig metricsConfig;
   private final DataMasking masking;
   private final PollingSettings pollingSettings;
   private final ReactiveFailover<KafkaSrClientApi> schemaRegistryClient;
   private final Map<String, ReactiveFailover<KafkaConnectClientApi>> connectsClients;
   private final ReactiveFailover<KsqlApiClient> ksqlClient;
+  private final MetricsScrapping metricsScrapping;
 }

+ 22 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java

@@ -1,8 +1,11 @@
 package com.provectus.kafka.ui.model;
 
+import static io.prometheus.client.Collector.*;
 import static java.util.stream.Collectors.toMap;
 
 import com.provectus.kafka.ui.service.metrics.RawMetric;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.InferredMetrics;
+import io.prometheus.client.Collector;
 import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.List;
@@ -16,25 +19,32 @@ import lombok.Value;
 @Value
 public class Metrics {
 
-  Map<Integer, BigDecimal> brokerBytesInPerSec;
-  Map<Integer, BigDecimal> brokerBytesOutPerSec;
-  Map<String, BigDecimal> topicBytesInPerSec;
-  Map<String, BigDecimal> topicBytesOutPerSec;
-  Map<Integer, List<RawMetric>> perBrokerMetrics;
-
   public static Metrics empty() {
     return Metrics.builder()
-        .brokerBytesInPerSec(Map.of())
-        .brokerBytesOutPerSec(Map.of())
-        .topicBytesInPerSec(Map.of())
-        .topicBytesOutPerSec(Map.of())
-        .perBrokerMetrics(Map.of())
+        .ioRates(null) //TODO: empty
+        .perBrokerScrapedMetrics(Map.of())
+        .inferredMetrics(InferredMetrics.empty())
         .build();
   }
 
+  @Builder
+  public record IoRates(Map<Integer, BigDecimal> brokerBytesInPerSec,
+                        Map<Integer, BigDecimal> brokerBytesOutPerSec,
+                        Map<String, BigDecimal> topicBytesInPerSec,
+                        Map<String, BigDecimal> topicBytesOutPerSec) {
+  }
+
+  IoRates ioRates;
+  InferredMetrics inferredMetrics;
+  Map<Integer, List<MetricFamilySamples>> perBrokerScrapedMetrics;
+
+  @Deprecated
   public Stream<RawMetric> getSummarizedMetrics() {
-    return perBrokerMetrics.values().stream()
+    return perBrokerScrapedMetrics
+        .values()
+        .stream()
         .flatMap(Collection::stream)
+        .flatMap(RawMetric::create)
         .collect(toMap(RawMetric::identityKey, m -> m, (m1, m2) -> m1.copyWithValue(m1.value().add(m2.value()))))
         .values()
         .stream();

+ 0 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java

@@ -1,22 +0,0 @@
-package com.provectus.kafka.ui.model;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder(toBuilder = true)
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class MetricsConfig {
-  public static final String JMX_METRICS_TYPE = "JMX";
-  public static final String PROMETHEUS_METRICS_TYPE = "PROMETHEUS";
-
-  private final String type;
-  private final Integer port;
-  private final boolean ssl;
-  private final String username;
-  private final String password;
-  private final String keystoreLocation;
-  private final String keystorePassword;
-}

+ 29 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java

@@ -0,0 +1,29 @@
+package com.provectus.kafka.ui.model;
+
+import static com.provectus.kafka.ui.config.ClustersProperties.*;
+
+import com.provectus.kafka.ui.config.ClustersProperties;
+import jakarta.annotation.Nullable;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+@Value
+@Builder
+public class MetricsScrapeProperties {
+  public static final String JMX_METRICS_TYPE = "JMX";
+  public static final String PROMETHEUS_METRICS_TYPE = "PROMETHEUS";
+
+  Integer port;
+  boolean ssl;
+  String username;
+  String password;
+
+  @Nullable
+  KeystoreConfig keystoreConfig;
+
+  @Nullable
+  TruststoreConfig truststoreConfig;
+
+
+}

+ 6 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java

@@ -8,9 +8,10 @@ import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
 import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MetricsConfig;
 import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
 import com.provectus.kafka.ui.service.masking.DataMasking;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsRetriever;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.MetricsScrapping;
 import com.provectus.kafka.ui.sr.ApiClient;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.util.KafkaServicesValidation;
@@ -22,7 +23,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Stream;
-import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 import org.springframework.util.unit.DataSize;
@@ -39,11 +39,13 @@ public class KafkaClusterFactory {
   private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
 
   private final DataSize webClientMaxBuffSize;
+  private final JmxMetricsRetriever jmxMetricsRetriever;
 
-  public KafkaClusterFactory(WebclientProperties webclientProperties) {
+  public KafkaClusterFactory(WebclientProperties webclientProperties, JmxMetricsRetriever jmxMetricsRetriever) {
     this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
         .map(DataSize::parse)
         .orElse(DEFAULT_WEBCLIENT_BUFFER);
+    this.jmxMetricsRetriever = jmxMetricsRetriever;
   }
 
   public KafkaCluster create(ClustersProperties properties,
@@ -56,6 +58,7 @@ public class KafkaClusterFactory {
     builder.readOnly(clusterProperties.isReadOnly());
     builder.masking(DataMasking.create(clusterProperties.getMasking()));
     builder.pollingSettings(PollingSettings.create(clusterProperties, properties));
+    builder.metricsScrapping(MetricsScrapping.create(clusterProperties, jmxMetricsRetriever));
 
     if (schemaRegistryConfigured(clusterProperties)) {
       builder.schemaRegistryClient(schemaRegistryClient(clusterProperties));
@@ -66,9 +69,6 @@ public class KafkaClusterFactory {
     if (ksqlConfigured(clusterProperties)) {
       builder.ksqlClient(ksqlClient(clusterProperties));
     }
-    if (metricsConfigured(clusterProperties)) {
-      builder.metricsConfig(metricsConfigDataToMetricsConfig(clusterProperties.getMetrics()));
-    }
     builder.originalProperties(clusterProperties);
     return builder.build();
   }
@@ -202,20 +202,4 @@ public class KafkaClusterFactory {
     return clusterProperties.getMetrics() != null;
   }
 
-  @Nullable
-  private MetricsConfig metricsConfigDataToMetricsConfig(ClustersProperties.MetricsConfigData metricsConfigData) {
-    if (metricsConfigData == null) {
-      return null;
-    }
-    MetricsConfig.MetricsConfigBuilder builder = MetricsConfig.builder();
-    builder.type(metricsConfigData.getType());
-    builder.port(metricsConfigData.getPort());
-    builder.ssl(Optional.ofNullable(metricsConfigData.getSsl()).orElse(false));
-    builder.username(metricsConfigData.getUsername());
-    builder.password(metricsConfigData.getPassword());
-    builder.keystoreLocation(metricsConfigData.getKeystoreLocation());
-    builder.keystorePassword(metricsConfigData.getKeystorePassword());
-    return builder.build();
-  }
-
 }

+ 28 - 28
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java

@@ -8,7 +8,6 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.Metrics;
 import com.provectus.kafka.ui.model.ServerStatusDTO;
 import com.provectus.kafka.ui.model.Statistics;
-import com.provectus.kafka.ui.service.metrics.MetricsCollector;
 import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState;
 import java.util.List;
 import java.util.Map;
@@ -20,13 +19,13 @@ import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Node;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
 
 @Service
 @RequiredArgsConstructor
 @Slf4j
 public class StatisticsService {
 
-  private final MetricsCollector metricsCollector;
   private final AdminClientService adminClientService;
   private final FeatureService featureService;
   private final StatisticsCache cache;
@@ -37,30 +36,24 @@ public class StatisticsService {
 
   private Mono<Statistics> getStatistics(KafkaCluster cluster) {
     return adminClientService.get(cluster).flatMap(ac ->
-            ac.describeCluster().flatMap(description ->
-                ac.updateInternalStats(description.getController()).then(
-                    Mono.zip(
-                        List.of(
-                            metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
-                            getLogDirInfo(description, ac),
-                            featureService.getAvailableFeatures(ac, cluster, description),
-                            loadTopicConfigs(cluster),
-                            describeTopics(cluster),
-                            loadClusterState(ac)
-                        ),
-                        results ->
-                            Statistics.builder()
-                                .status(ServerStatusDTO.ONLINE)
-                                .clusterDescription(description)
-                                .version(ac.getVersion())
-                                .metrics((Metrics) results[0])
-                                .logDirInfo((InternalLogDirStats) results[1])
-                                .features((List<ClusterFeature>) results[2])
-                                .topicConfigs((Map<String, List<ConfigEntry>>) results[3])
-                                .topicDescriptions((Map<String, TopicDescription>) results[4])
-                                .clusterState((ScrapedClusterState) results[5])
-                                .build()
-                    ))))
+            ac.describeCluster()
+                .flatMap(description ->
+                    ac.updateInternalStats(description.getController())
+                        .then(
+                            Mono.zip(
+                                featureService.getAvailableFeatures(ac, cluster, description),
+                                loadClusterState(description, ac)
+                            ).flatMap(featuresAndState ->
+                                scrapeMetrics(cluster, featuresAndState.getT2(), description)
+                                    .map(metrics ->
+                                        Statistics.builder()
+                                            .status(ServerStatusDTO.ONLINE)
+                                            .clusterDescription(description)
+                                            .version(ac.getVersion())
+                                            .metrics(metrics)
+                                            .features(featuresAndState.getT1())
+                                            .clusterState(featuresAndState.getT2())
+                                            .build())))))
         .doOnError(e ->
             log.error("Failed to collect cluster {} info", cluster.getName(), e))
         .onErrorResume(
@@ -80,8 +73,15 @@ public class StatisticsService {
     return adminClientService.get(c).flatMap(ReactiveAdminClient::getTopicsConfig);
   }
 
-  private Mono<ScrapedClusterState> loadClusterState(ReactiveAdminClient ac){
-    return ScrapedClusterState.scrape(ac);
+  private Mono<ScrapedClusterState> loadClusterState(ClusterDescription clusterDescription,
+                                                     ReactiveAdminClient ac) {
+    return ScrapedClusterState.scrape(clusterDescription, ac);
+  }
+
+  private Mono<Metrics> scrapeMetrics(KafkaCluster c,
+                                      ScrapedClusterState clusterState,
+                                      ClusterDescription clusterDescription) {
+    return c.getMetricsScrapping().scrape(clusterState, clusterDescription.getNodes());
   }
 
 }

+ 0 - 69
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java

@@ -1,69 +0,0 @@
-package com.provectus.kafka.ui.service.metrics;
-
-import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.Metrics;
-import com.provectus.kafka.ui.model.MetricsConfig;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.Node;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-public class MetricsCollector {
-
-  private final JmxMetricsRetriever jmxMetricsRetriever;
-  private final PrometheusMetricsRetriever prometheusMetricsRetriever;
-
-  public Mono<Metrics> getBrokerMetrics(KafkaCluster cluster, Collection<Node> nodes) {
-    return Flux.fromIterable(nodes)
-        .flatMap(n -> getMetrics(cluster, n).map(lst -> Tuples.of(n, lst)))
-        .collectMap(Tuple2::getT1, Tuple2::getT2)
-        .map(nodeMetrics -> collectMetrics(cluster, nodeMetrics))
-        .defaultIfEmpty(Metrics.empty());
-  }
-
-  private Mono<List<RawMetric>> getMetrics(KafkaCluster kafkaCluster, Node node) {
-    Flux<RawMetric> metricFlux = Flux.empty();
-    if (kafkaCluster.getMetricsConfig() != null) {
-      String type = kafkaCluster.getMetricsConfig().getType();
-      if (type == null || type.equalsIgnoreCase(MetricsConfig.JMX_METRICS_TYPE)) {
-        metricFlux = jmxMetricsRetriever.retrieve(kafkaCluster, node);
-      } else if (type.equalsIgnoreCase(MetricsConfig.PROMETHEUS_METRICS_TYPE)) {
-        metricFlux = prometheusMetricsRetriever.retrieve(kafkaCluster, node);
-      }
-    }
-    return metricFlux.collectList();
-  }
-
-  public Metrics collectMetrics(KafkaCluster cluster, Map<Node, List<RawMetric>> perBrokerMetrics) {
-    Metrics.MetricsBuilder builder = Metrics.builder()
-        .perBrokerMetrics(
-            perBrokerMetrics.entrySet()
-                .stream()
-                .collect(Collectors.toMap(e -> e.getKey().id(), Map.Entry::getValue)));
-
-    populateWellknowMetrics(cluster, perBrokerMetrics)
-        .apply(builder);
-
-    return builder.build();
-  }
-
-  private WellKnownMetrics populateWellknowMetrics(KafkaCluster cluster, Map<Node, List<RawMetric>> perBrokerMetrics) {
-    WellKnownMetrics wellKnownMetrics = new WellKnownMetrics();
-    perBrokerMetrics.forEach((node, metrics) ->
-        metrics.forEach(metric ->
-            wellKnownMetrics.populate(node, metric)));
-    return wellKnownMetrics;
-  }
-
-}

+ 0 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java

@@ -1,9 +0,0 @@
-package com.provectus.kafka.ui.service.metrics;
-
-import com.provectus.kafka.ui.model.KafkaCluster;
-import org.apache.kafka.common.Node;
-import reactor.core.publisher.Flux;
-
-interface MetricsRetriever {
-  Flux<RawMetric> retrieve(KafkaCluster c, Node node);
-}

+ 24 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java

@@ -1,7 +1,14 @@
 package com.provectus.kafka.ui.service.metrics;
 
+import static io.prometheus.client.Collector.*;
+
+import io.prometheus.client.Collector;
 import java.math.BigDecimal;
+import java.util.Collection;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import lombok.AllArgsConstructor;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -23,10 +30,27 @@ public interface RawMetric {
 
   //--------------------------------------------------
 
+  static Stream<MetricFamilySamples> groupIntoMFS(Collection<RawMetric> lst) {
+    //TODO: impl
+    return null;
+  }
+
   static RawMetric create(String name, Map<String, String> labels, BigDecimal value) {
     return new SimpleMetric(name, labels, value);
   }
 
+  static Stream<RawMetric> create(MetricFamilySamples samples) {
+    return samples.samples.stream()
+        .map(s -> create(
+                s.name,
+                IntStream.range(0, s.labelNames.size())
+                    .boxed()
+                    .collect(Collectors.<Integer, String, String>toMap(s.labelNames::get, s.labelValues::get)),
+                BigDecimal.valueOf(s.value)
+            )
+        );
+  }
+
   record SimpleMetric(String name,
                       Map<String, String> labels,
                       BigDecimal value) implements RawMetric {

+ 87 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/MetricsScrapping.java

@@ -0,0 +1,87 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape;
+
+import static com.provectus.kafka.ui.config.ClustersProperties.*;
+import static com.provectus.kafka.ui.model.MetricsScrapeProperties.*;
+
+import com.provectus.kafka.ui.model.Metrics;
+import com.provectus.kafka.ui.model.MetricsScrapeProperties;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.InferredMetrics;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.InferredMetricsScraper;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsRetriever;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsScraper;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusScraper;
+import jakarta.annotation.Nullable;
+import java.util.Collection;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.Node;
+import reactor.core.publisher.Mono;
+
+@RequiredArgsConstructor
+public class MetricsScrapping {
+
+  private final InferredMetricsScraper inferredMetricsScraper;
+
+  @Nullable
+  private final JmxMetricsScraper jmxMetricsScraper;
+
+  @Nullable
+  private final PrometheusScraper prometheusScraper;
+
+  public static MetricsScrapping create(Cluster cluster,
+                                        JmxMetricsRetriever jmxMetricsRetriever) {
+    InferredMetricsScraper inferredMetricsScraper = new InferredMetricsScraper();
+    JmxMetricsScraper jmxMetricsScraper = null;
+    PrometheusScraper prometheusScraper = null;
+
+    var metrics = cluster.getMetrics();
+    if (cluster.getMetrics() != null) {
+      var scrapeProperties = createScrapeProps(cluster);
+      if (metrics.getType() == null || metrics.getType().equalsIgnoreCase(JMX_METRICS_TYPE)) {
+        jmxMetricsScraper = new JmxMetricsScraper(scrapeProperties, jmxMetricsRetriever);
+      } else if (metrics.getType().equalsIgnoreCase(PROMETHEUS_METRICS_TYPE)) {
+        prometheusScraper = new PrometheusScraper(scrapeProperties);
+      }
+    }
+    return new MetricsScrapping(inferredMetricsScraper, jmxMetricsScraper, prometheusScraper);
+  }
+
+  private static MetricsScrapeProperties createScrapeProps(Cluster cluster) {
+    var metrics = cluster.getMetrics();
+    return MetricsScrapeProperties.builder()
+        .port(metrics.getPort())
+        .ssl(metrics.getSsl())
+        .username(metrics.getUsername())
+        .password(metrics.getPassword())
+        .truststoreConfig(cluster.getSsl())
+        .keystoreConfig(
+            metrics.getKeystoreLocation() != null
+                ? new KeystoreConfig(metrics.getKeystoreLocation(), metrics.getKeystorePassword())
+                : null
+        )
+        .build();
+  }
+
+  public Mono<Metrics> scrape(ScrapedClusterState clusterState, Collection<Node> nodes) {
+    Mono<InferredMetrics> inferred = inferredMetricsScraper.scrape(clusterState);
+    Mono<? extends PerBrokerScrapedMetrics> external = scrapeExternal(nodes);
+    return inferred.zipWith(
+        external,
+        (inf, ext) -> Metrics.builder()
+            .ioRates(ext.ioRates())
+            .perBrokerScrapedMetrics(ext.getPerBrokerMetrics())
+            .inferredMetrics(inf)
+            .build()
+    );
+  }
+
+  private Mono<? extends PerBrokerScrapedMetrics> scrapeExternal(Collection<Node> nodes) {
+    if (jmxMetricsScraper != null) {
+      return jmxMetricsScraper.scrape(nodes);
+    }
+    if (prometheusScraper != null) {
+      return prometheusScraper.scrape(nodes);
+    }
+    return Mono.just(PerBrokerScrapedMetrics.empty());
+  }
+
+}

+ 25 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/PerBrokerScrapedMetrics.java

@@ -0,0 +1,25 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape;
+
+import com.provectus.kafka.ui.model.Metrics;
+import io.prometheus.client.Collector;
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class PerBrokerScrapedMetrics {
+
+  @Getter
+  private final Map<Integer, List<Collector.MetricFamilySamples>> perBrokerMetrics;
+
+  public static PerBrokerScrapedMetrics empty() {
+    return new PerBrokerScrapedMetrics(Map.of());
+  }
+
+  Metrics.IoRates ioRates() {
+    //TODO: rename WKMetrics
+    return new WellKnownMetrics(perBrokerMetrics).ioRates();
+  }
+
+}

+ 41 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java

@@ -1,16 +1,27 @@
 package com.provectus.kafka.ui.service.metrics.v2.scrape;
 
+import static com.provectus.kafka.ui.service.ReactiveAdminClient.*;
+
 import com.google.common.collect.Table;
+import com.provectus.kafka.ui.model.InternalLogDirStats;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.time.Instant;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import lombok.Builder;
 import lombok.Value;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
 import reactor.core.publisher.Mono;
 
+@Builder
 @Value
 public class ScrapedClusterState {
 
@@ -22,7 +33,7 @@ public class ScrapedClusterState {
       String name,
       List<ConfigEntry> configs,
       TopicDescription description,
-      Map<Integer, Long> offsets,
+      Map<Integer, Long> endOffsets,
       SegmentStats segmentStats,
       Map<Integer, SegmentStats> partitionsSegmentStats) {
   }
@@ -30,6 +41,7 @@ public class ScrapedClusterState {
   record ConsumerGroupState(
       Instant scrapeTime,
       String group,
+      org.apache.kafka.common.ConsumerGroupState state,
       ConsumerGroupDescription description,
       Table<String, Integer, Long> committedOffsets,
       Map<String, Instant> lastTopicActivity) {
@@ -45,11 +57,36 @@ public class ScrapedClusterState {
   Map<String, ConsumerGroupState> consumerGroupsStates;
 
   public static ScrapedClusterState empty() {
-    //TODO impl
-    return null;
+    return ScrapedClusterState.builder()
+        .scrapeStartTime(Instant.now())
+        .nodesStates(Map.of())
+        .topicStates(Map.of())
+        .consumerGroupsStates(Map.of())
+        .build();
   }
 
-  public static Mono<ScrapedClusterState> scrape(ReactiveAdminClient ac) {
+  public static Mono<ScrapedClusterState> scrape(ClusterDescription clusterDescription,
+                                                 ReactiveAdminClient ac) {
+
+    Mono<InternalLogDirStats> segmentStatsMono = ac.describeLogDirs().map(InternalLogDirStats::new);
+    Mono<List<String>> cgListingsMono = ac.listConsumerGroups().map(l -> l.stream().map(ConsumerGroupListing::groupId).toList());
+    Mono<Map<String, TopicDescription>> topicDescriptionsMono = ac.describeTopics();
+    Mono<Map<String, List<ConfigEntry>>> topicConfigsMono = ac.getTopicsConfig();
+
+    Mono.zip(
+        segmentStatsMono,
+        cgListingsMono,
+        topicDescriptionsMono,
+        topicConfigsMono
+    ).flatMap(tuple -> {
+      InternalLogDirStats segmentStats = tuple.getT1();
+      List<String> consumerGroups = tuple.getT2();
+      Map<String, TopicDescription> topicDescriptions = tuple.getT3();
+      Map<String, List<ConfigEntry>> topicConfigs = tuple.getT4();
+
+      Mono<>
+    })
+
     return null;//TODO impl
   }
 

+ 0 - 21
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java

@@ -1,21 +0,0 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape;
-
-import io.prometheus.client.Collector.MetricFamilySamples;
-import java.util.Collection;
-
-import java.util.List;
-import java.util.stream.Stream;
-
-public interface ScrapedMetrics {
-
-  static ScrapedMetrics create(Collection<MetricFamilySamples> lst) {
-    return lst::stream;
-  }
-
-  static ScrapedMetrics empty() {
-    return create(List.of());
-  }
-
-  Stream<MetricFamilySamples> asStream();
-
-}

+ 0 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scraper.java

@@ -1,10 +0,0 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape;
-
-
-import reactor.core.publisher.Mono;
-
-public interface Scraper<T extends  ScrapedMetrics> {
-
-  Mono<T> scrape();
-
-}

+ 0 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java

@@ -1,8 +0,0 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape;
-
-public class Scrapping {
-
-  
-
-
-}

+ 26 - 15
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/WellKnownMetrics.java

@@ -1,15 +1,18 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.v2.scrape;
 
 import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
 import static org.apache.commons.lang3.StringUtils.endsWithIgnoreCase;
 
 import com.provectus.kafka.ui.model.Metrics;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
+import io.prometheus.client.Collector;
 import java.math.BigDecimal;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.kafka.common.Node;
 
-class WellKnownMetrics {
+public class WellKnownMetrics {
 
   // per broker
   final Map<Integer, BigDecimal> brokerBytesInFifteenMinuteRate = new HashMap<>();
@@ -19,33 +22,41 @@ class WellKnownMetrics {
   final Map<String, BigDecimal> bytesInFifteenMinuteRate = new HashMap<>();
   final Map<String, BigDecimal> bytesOutFifteenMinuteRate = new HashMap<>();
 
-  void populate(Node node, RawMetric rawMetric) {
-    updateBrokerIOrates(node, rawMetric);
-    updateTopicsIOrates(rawMetric);
+  public WellKnownMetrics(Map<Integer, List<Collector.MetricFamilySamples>> perBrokerMetrics) {
+    perBrokerMetrics.forEach((nodeId, metrics) -> {
+      metrics.forEach(m -> {
+        RawMetric.create(m).forEach(rawMetric -> {
+          updateBrokerIOrates(nodeId, rawMetric);
+          updateTopicsIOrates(rawMetric);
+        });
+      });
+    });
   }
 
-  void apply(Metrics.MetricsBuilder metricsBuilder) {
-    metricsBuilder.topicBytesInPerSec(bytesInFifteenMinuteRate);
-    metricsBuilder.topicBytesOutPerSec(bytesOutFifteenMinuteRate);
-    metricsBuilder.brokerBytesInPerSec(brokerBytesInFifteenMinuteRate);
-    metricsBuilder.brokerBytesOutPerSec(brokerBytesOutFifteenMinuteRate);
+  public Metrics.IoRates ioRates() {
+    return Metrics.IoRates.builder()
+        .topicBytesInPerSec(bytesInFifteenMinuteRate)
+        .topicBytesOutPerSec(bytesOutFifteenMinuteRate)
+        .brokerBytesInPerSec(brokerBytesInFifteenMinuteRate)
+        .brokerBytesOutPerSec(brokerBytesOutFifteenMinuteRate)
+        .build();
   }
 
-  private void updateBrokerIOrates(Node node, RawMetric rawMetric) {
+  private void updateBrokerIOrates(int nodeId, RawMetric rawMetric) {
     String name = rawMetric.name();
-    if (!brokerBytesInFifteenMinuteRate.containsKey(node.id())
+    if (!brokerBytesInFifteenMinuteRate.containsKey(nodeId)
         && rawMetric.labels().size() == 1
         && "BytesInPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
         && containsIgnoreCase(name, "BrokerTopicMetrics")
         && endsWithIgnoreCase(name, "FifteenMinuteRate")) {
-      brokerBytesInFifteenMinuteRate.put(node.id(),  rawMetric.value());
+      brokerBytesInFifteenMinuteRate.put(nodeId, rawMetric.value());
     }
-    if (!brokerBytesOutFifteenMinuteRate.containsKey(node.id())
+    if (!brokerBytesOutFifteenMinuteRate.containsKey(nodeId)
         && rawMetric.labels().size() == 1
         && "BytesOutPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
         && containsIgnoreCase(name, "BrokerTopicMetrics")
         && endsWithIgnoreCase(name, "FifteenMinuteRate")) {
-      brokerBytesOutFifteenMinuteRate.put(node.id(), rawMetric.value());
+      brokerBytesOutFifteenMinuteRate.put(nodeId, rawMetric.value());
     }
   }
 

+ 8 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java

@@ -1,23 +1,23 @@
 package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
 
-import static io.prometheus.client.Collector.*;
+import static io.prometheus.client.Collector.MetricFamilySamples;
 
-import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics;
 import java.util.List;
-import java.util.stream.Stream;
 
-public class InferredMetrics implements ScrapedMetrics {
+public class InferredMetrics {
 
   private final List<MetricFamilySamples> metrics;
 
+  public static InferredMetrics empty() {
+    return new InferredMetrics(List.of());
+  }
+
   public InferredMetrics(List<MetricFamilySamples> metrics) {
     this.metrics = metrics;
   }
 
-  @Override
-  public Stream<MetricFamilySamples> asStream() {
-    return metrics.stream();
+  public List<MetricFamilySamples> asList() {
+    return metrics;
   }
 
 }

+ 5 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java

@@ -1,25 +1,20 @@
 package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
 
 import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper;
 import java.util.List;
-import java.util.function.Supplier;
 import lombok.RequiredArgsConstructor;
 import reactor.core.publisher.Mono;
 
 @RequiredArgsConstructor
-public class InferredMetricsScraper implements Scraper<InferredMetrics> {
+public class InferredMetricsScraper {
 
-  private final Supplier<ScrapedClusterState> currentStateSupplier;
   private ScrapedClusterState prevState = null;
 
-  @Override
-  public synchronized Mono<InferredMetrics> scrape() {
+  public synchronized Mono<InferredMetrics> scrape(ScrapedClusterState newState) {
     if (prevState == null) {
-      prevState = currentStateSupplier.get();
-      return Mono.empty();
+      prevState = newState;
+      return Mono.just(InferredMetrics.empty());
     }
-    var newState = currentStateSupplier.get();
     var inferred = infer(prevState, newState);
     prevState = newState;
     return Mono.just(inferred);
@@ -27,6 +22,7 @@ public class InferredMetricsScraper implements Scraper<InferredMetrics> {
 
   private static InferredMetrics infer(ScrapedClusterState prevState,
                                        ScrapedClusterState newState) {
+
     //TODO: impl
     return new InferredMetrics(List.of());
   }

+ 3 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatter.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsFormatter.java

@@ -1,5 +1,7 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx;
 
+import com.provectus.kafka.ui.service.metrics.RawMetric;
+import io.prometheus.client.Collector;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;

+ 26 - 26
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsRetriever.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsRetriever.java

@@ -1,6 +1,8 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx;
 
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.MetricsScrapeProperties;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -18,14 +20,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.common.Node;
 import org.springframework.stereotype.Service;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 
 @Service
 @Slf4j
-class JmxMetricsRetriever implements MetricsRetriever, Closeable {
+public class JmxMetricsRetriever implements Closeable {
 
   private static final boolean SSL_JMX_SUPPORTED;
 
@@ -43,35 +44,34 @@ class JmxMetricsRetriever implements MetricsRetriever, Closeable {
     JmxSslSocketFactory.clearFactoriesCache();
   }
 
-  @Override
-  public Flux<RawMetric> retrieve(KafkaCluster c, Node node) {
-    if (isSslJmxEndpoint(c) && !SSL_JMX_SUPPORTED) {
-      log.warn("Cluster {} has jmx ssl configured, but it is not supported", c.getName());
-      return Flux.empty();
+  public Mono<List<RawMetric>> retrieveFromNode(MetricsScrapeProperties metricsConfig, Node node) {
+    if (isSslJmxEndpoint(metricsConfig) && !SSL_JMX_SUPPORTED) {
+      log.warn("Cluster has jmx ssl configured, but it is not supported by app");
+      return Mono.just(List.of());
     }
-    return Mono.fromSupplier(() -> retrieveSync(c, node))
-        .subscribeOn(Schedulers.boundedElastic())
-        .flatMapMany(Flux::fromIterable);
+    return Mono.fromSupplier(() -> retrieveSync(metricsConfig, node))
+        .subscribeOn(Schedulers.boundedElastic());
   }
 
-  private boolean isSslJmxEndpoint(KafkaCluster cluster) {
-    return cluster.getMetricsConfig().getKeystoreLocation() != null;
+  private boolean isSslJmxEndpoint(MetricsScrapeProperties metricsScrapeProperties) {
+    return metricsScrapeProperties.getKeystoreConfig() != null
+        && metricsScrapeProperties.getKeystoreConfig().getKeystoreLocation() != null;
   }
 
   @SneakyThrows
-  private List<RawMetric> retrieveSync(KafkaCluster c, Node node) {
-    String jmxUrl = JMX_URL + node.host() + ":" + c.getMetricsConfig().getPort() + "/" + JMX_SERVICE_TYPE;
+  private List<RawMetric> retrieveSync(MetricsScrapeProperties metricsConfig, Node node) {
+    String jmxUrl = JMX_URL + node.host() + ":" + metricsConfig.getPort() + "/" + JMX_SERVICE_TYPE;
     log.debug("Collection JMX metrics for {}", jmxUrl);
     List<RawMetric> result = new ArrayList<>();
-    withJmxConnector(jmxUrl, c, jmxConnector -> getMetricsFromJmx(jmxConnector, result));
+    withJmxConnector(jmxUrl, metricsConfig, jmxConnector -> getMetricsFromJmx(jmxConnector, result));
     log.debug("{} metrics collected for {}", result.size(), jmxUrl);
     return result;
   }
 
   private void withJmxConnector(String jmxUrl,
-                                KafkaCluster c,
+                                MetricsScrapeProperties metricsConfig,
                                 Consumer<JMXConnector> consumer) {
-    var env = prepareJmxEnvAndSetThreadLocal(c);
+    var env = prepareJmxEnvAndSetThreadLocal(metricsConfig);
     try (JMXConnector connector = JMXConnectorFactory.newJMXConnector(new JMXServiceURL(jmxUrl), env)) {
       try {
         connector.connect(env);
@@ -87,16 +87,16 @@ class JmxMetricsRetriever implements MetricsRetriever, Closeable {
     }
   }
 
-  private Map<String, Object> prepareJmxEnvAndSetThreadLocal(KafkaCluster cluster) {
-    var metricsConfig = cluster.getMetricsConfig();
+  private Map<String, Object> prepareJmxEnvAndSetThreadLocal(MetricsScrapeProperties metricsConfig) {
     Map<String, Object> env = new HashMap<>();
-    if (isSslJmxEndpoint(cluster)) {
-      var clusterSsl = cluster.getOriginalProperties().getSsl();
+    if (isSslJmxEndpoint(metricsConfig)) {
+      var truststoreConfig = metricsConfig.getTruststoreConfig();
+      var keystoreConfig = metricsConfig.getKeystoreConfig();
       JmxSslSocketFactory.setSslContextThreadLocal(
-          clusterSsl != null ? clusterSsl.getTruststoreLocation() : null,
-          clusterSsl != null ? clusterSsl.getTruststorePassword() : null,
-          metricsConfig.getKeystoreLocation(),
-          metricsConfig.getKeystorePassword()
+          truststoreConfig != null ? truststoreConfig.getTruststoreLocation() : null,
+          truststoreConfig != null ? truststoreConfig.getTruststorePassword() : null,
+          keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
+          keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null
       );
       JmxSslSocketFactory.editJmxConnectorEnv(env);
     }

+ 29 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java

@@ -1,13 +1,36 @@
 package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx;
 
-import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper;
+import static io.prometheus.client.Collector.*;
+
+import com.provectus.kafka.ui.model.MetricsScrapeProperties;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.PerBrokerScrapedMetrics;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.Node;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuples;
+
+public class JmxMetricsScraper  {
 
-public class JmxMetricsScraper implements Scraper<ScrapedMetrics> {
+  private final JmxMetricsRetriever jmxMetricsRetriever;
+  private final MetricsScrapeProperties scrapeProperties;
+
+  public JmxMetricsScraper(MetricsScrapeProperties scrapeProperties,
+                           JmxMetricsRetriever jmxMetricsRetriever) {
+    this.scrapeProperties = scrapeProperties;
+    this.jmxMetricsRetriever = jmxMetricsRetriever;
+  }
 
-  @Override
-  public Mono<ScrapedMetrics> scrape() {
-    return null;
+  public Mono<PerBrokerScrapedMetrics> scrape(Collection<Node> nodes) {
+    Mono<Map<Integer, List<MetricFamilySamples>>> collected = Flux.fromIterable(nodes)
+        .flatMap(n -> jmxMetricsRetriever.retrieveFromNode(scrapeProperties, n).map(metrics -> Tuples.of(n, metrics)))
+        .collectMap(
+            t -> t.getT1().id(),
+            t -> RawMetric.groupIntoMFS(t.getT2()).toList()
+        );
+    return collected.map(PerBrokerScrapedMetrics::new);
   }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxSslSocketFactory.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx;
 
 import com.google.common.base.Preconditions;
 import java.io.FileInputStream;

+ 0 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prom/PrometheusScraper.java

@@ -1,13 +0,0 @@
-package com.provectus.kafka.ui.service.metrics.v2.scrape.prom;
-
-import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics;
-import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper;
-import reactor.core.publisher.Mono;
-
-public class PrometheusScraper implements Scraper<ScrapedMetrics> {
-
-  @Override
-  public Mono<ScrapedMetrics> scrape() {
-    return null;
-  }
-}

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParser.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusEndpointMetricsParser.java

@@ -1,5 +1,6 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus;
 
+import com.provectus.kafka.ui.service.metrics.RawMetric;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Optional;

+ 15 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusMetricsRetriever.java

@@ -1,12 +1,14 @@
-package com.provectus.kafka.ui.service.metrics;
+package com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus;
+
+import static io.prometheus.client.Collector.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MetricsConfig;
+import com.provectus.kafka.ui.model.MetricsScrapeProperties;
+import com.provectus.kafka.ui.service.metrics.RawMetric;
 import com.provectus.kafka.ui.util.WebClientConfigurator;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.Node;
@@ -19,33 +21,29 @@ import reactor.core.publisher.Mono;
 
 @Service
 @Slf4j
-class PrometheusMetricsRetriever implements MetricsRetriever {
+class PrometheusMetricsRetriever {
 
   private static final String METRICS_ENDPOINT_PATH = "/metrics";
   private static final int DEFAULT_EXPORTER_PORT = 11001;
 
-  @Override
-  public Flux<RawMetric> retrieve(KafkaCluster c, Node node) {
-    log.debug("Retrieving metrics from prometheus exporter: {}:{}", node.host(), c.getMetricsConfig().getPort());
+  public Mono<List<MetricFamilySamples>> retrieve(MetricsScrapeProperties metricsConfig, Node node) {
+    log.debug("Retrieving metrics from prometheus exporter: {}:{}", node.host(), metricsConfig.getPort());
 
-    MetricsConfig metricsConfig = c.getMetricsConfig();
     var webClient = new WebClientConfigurator()
         .configureBufferSize(DataSize.ofMegabytes(20))
         .configureBasicAuth(metricsConfig.getUsername(), metricsConfig.getPassword())
-        .configureSsl(
-            c.getOriginalProperties().getSsl(),
-            new ClustersProperties.KeystoreConfig(
-                metricsConfig.getKeystoreLocation(),
-                metricsConfig.getKeystorePassword()))
+        .configureSsl(metricsConfig.getTruststoreConfig(), metricsConfig.getKeystoreConfig())
         .build();
 
-    return retrieve(webClient, node.host(), c.getMetricsConfig());
+    return retrieve(webClient, node.host(), metricsConfig)
+        .collectList()
+        .map(metrics -> RawMetric.groupIntoMFS(metrics).toList());
   }
 
   @VisibleForTesting
-  Flux<RawMetric> retrieve(WebClient webClient, String host, MetricsConfig metricsConfig) {
+  Flux<RawMetric> retrieve(WebClient webClient, String host, MetricsScrapeProperties metricsConfig) {
     int port = Optional.ofNullable(metricsConfig.getPort()).orElse(DEFAULT_EXPORTER_PORT);
-    boolean sslEnabled = metricsConfig.isSsl() || metricsConfig.getKeystoreLocation() != null;
+    boolean sslEnabled = metricsConfig.isSsl() || metricsConfig.getKeystoreConfig() != null;
     var request = webClient.get()
         .uri(UriComponentsBuilder.newInstance()
             .scheme(sslEnabled ? "https" : "http")

+ 31 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusScraper.java

@@ -0,0 +1,31 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus;
+
+import com.provectus.kafka.ui.model.MetricsScrapeProperties;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.PerBrokerScrapedMetrics;
+import io.prometheus.client.Collector;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.Node;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuples;
+
+public class PrometheusScraper {
+
+  private final static PrometheusMetricsRetriever RETRIEVER = new PrometheusMetricsRetriever();
+
+  private final MetricsScrapeProperties metricsConfig;
+
+  public PrometheusScraper(MetricsScrapeProperties metricsConfig) {
+    this.metricsConfig = metricsConfig;
+  }
+
+  public Mono<PerBrokerScrapedMetrics> scrape(Collection<Node> clusterNodes) {
+    Mono<Map<Integer, List<Collector.MetricFamilySamples>>> collected = Flux.fromIterable(clusterNodes)
+        .flatMap(n -> RETRIEVER.retrieve(metricsConfig, n).map(metrics -> Tuples.of(n, metrics)))
+        .collectMap(t -> t.getT1().id(), t -> t.getT2());
+
+    return collected.map(PerBrokerScrapedMetrics::new);
+  }
+}

+ 0 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java

@@ -81,9 +81,6 @@ public class ReactiveFailover<T> {
         .flatMap(f)
         .onErrorResume(failoverExceptionsPredicate, th -> {
           publisher.markFailed();
-          if (candidates.size() == 1) {
-            return Mono.error(th);
-          }
           var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList();
           if (newCandidates.isEmpty()) {
             return Mono.error(th);
@@ -106,9 +103,6 @@ public class ReactiveFailover<T> {
         .flatMapMany(f)
         .onErrorResume(failoverExceptionsPredicate, th -> {
           publisher.markFailed();
-          if (candidates.size() == 1) {
-            return Flux.error(th);
-          }
           var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList();
           if (newCandidates.isEmpty()) {
             return Flux.error(th);

+ 2 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service.metrics;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsFormatter;
 import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
@@ -74,4 +75,4 @@ class JmxMetricsFormatterTest {
     assertThat(actual.value()).isCloseTo(expected.value(), Offset.offset(new BigDecimal("0.001")));
   }
 
-}
+}

+ 1 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service.metrics;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusEndpointMetricsParser;
 import java.util.Map;
 import java.util.Optional;
 import org.junit.jupiter.api.Test;

+ 7 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.service.metrics;
 
-import com.provectus.kafka.ui.model.MetricsConfig;
+import com.provectus.kafka.ui.model.MetricsScrapeProperties;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusMetricsRetriever;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
@@ -34,7 +35,7 @@ class PrometheusMetricsRetrieverTest {
     var url = mockWebServer.url("/metrics");
     mockWebServer.enqueue(prepareResponse());
 
-    MetricsConfig metricsConfig = prepareMetricsConfig(url.port(), null, null);
+    MetricsScrapeProperties metricsConfig = prepareMetricsConfig(url.port(), null, null);
 
     StepVerifier.create(retriever.retrieve(WebClient.create(), url.host(), metricsConfig))
         .expectNextSequence(expectedRawMetrics())
@@ -48,7 +49,7 @@ class PrometheusMetricsRetrieverTest {
     mockWebServer.enqueue(prepareResponse());
 
 
-    MetricsConfig metricsConfig = prepareMetricsConfig(url.port(), "username", "password");
+    MetricsScrapeProperties metricsConfig = prepareMetricsConfig(url.port(), "username", "password");
 
     StepVerifier.create(retriever.retrieve(WebClient.create(), url.host(), metricsConfig))
         .expectNextSequence(expectedRawMetrics())
@@ -69,11 +70,11 @@ class PrometheusMetricsRetrieverTest {
     );
   }
 
-  MetricsConfig prepareMetricsConfig(Integer port, String username, String password) {
-    return MetricsConfig.builder()
+  MetricsScrapeProperties prepareMetricsConfig(Integer port, String username, String password) {
+    return MetricsScrapeProperties.builder()
         .ssl(false)
         .port(port)
-        .type(MetricsConfig.PROMETHEUS_METRICS_TYPE)
+        .type(MetricsScrapeProperties.PROMETHEUS_METRICS_TYPE)
         .username(username)
         .password(password)
         .build();

+ 4 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java

@@ -3,6 +3,8 @@ package com.provectus.kafka.ui.service.metrics;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.model.Metrics;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.WellKnownMetrics;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusEndpointMetricsParser;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Map;
@@ -68,7 +70,7 @@ class WellKnownMetricsTest {
     wellKnownMetrics.brokerBytesOutFifteenMinuteRate.put(2, new BigDecimal(20));
 
     Metrics.MetricsBuilder builder = Metrics.builder();
-    wellKnownMetrics.apply(builder);
+    wellKnownMetrics.ioRates(builder);
     var metrics = builder.build();
 
     // checking per topic io rates
@@ -90,4 +92,4 @@ class WellKnownMetricsTest {
         .forEach(m -> wellKnownMetrics.populate(n, m));
   }
 
-}
+}