From dbab4e367dd4888959ebd770d45693f08718d546 Mon Sep 17 00:00:00 2001 From: iliax Date: Tue, 27 Jun 2023 15:41:23 +0400 Subject: [PATCH] wip --- .../compose/jmx-exporter/kafka-broker.yml | 69 ++++++++++++++- kafka-ui-api/pom.xml | 5 ++ .../kafka/ui/config/ClustersProperties.java | 31 ++++++- .../kafka/ui/mapper/ClusterMapper.java | 1 + .../kafka/ui/model/KafkaCluster.java | 3 +- .../com/provectus/kafka/ui/model/Metrics.java | 34 +++++--- .../kafka/ui/model/MetricsConfig.java | 22 ----- .../ui/model/MetricsScrapeProperties.java | 29 +++++++ .../kafka/ui/service/KafkaClusterFactory.java | 28 ++---- .../kafka/ui/service/StatisticsService.java | 56 ++++++------ .../ui/service/metrics/MetricsCollector.java | 69 --------------- .../ui/service/metrics/MetricsRetriever.java | 9 -- .../kafka/ui/service/metrics/RawMetric.java | 24 +++++ .../metrics/v2/scrape/MetricsScrapping.java | 87 +++++++++++++++++++ .../v2/scrape/PerBrokerScrapedMetrics.java | 25 ++++++ .../v2/scrape/ScrapedClusterState.java | 45 +++++++++- .../metrics/v2/scrape/ScrapedMetrics.java | 21 ----- .../ui/service/metrics/v2/scrape/Scraper.java | 10 --- .../service/metrics/v2/scrape/Scrapping.java | 8 -- .../{ => v2/scrape}/WellKnownMetrics.java | 41 +++++---- .../v2/scrape/inferred/InferredMetrics.java | 16 ++-- .../inferred/InferredMetricsScraper.java | 14 ++- .../scrape/jmx}/JmxMetricsFormatter.java | 4 +- .../scrape/jmx}/JmxMetricsRetriever.java | 52 +++++------ .../v2/scrape/jmx/JmxMetricsScraper.java | 35 ++++++-- .../scrape/jmx}/JmxSslSocketFactory.java | 2 +- .../v2/scrape/prom/PrometheusScraper.java | 13 --- .../PrometheusEndpointMetricsParser.java | 3 +- .../PrometheusMetricsRetriever.java | 32 ++++--- .../scrape/prometheus/PrometheusScraper.java | 31 +++++++ .../kafka/ui/util/ReactiveFailover.java | 6 -- .../metrics/JmxMetricsFormatterTest.java | 3 +- .../PrometheusEndpointMetricsParserTest.java | 1 + .../PrometheusMetricsRetrieverTest.java | 13 +-- .../service/metrics/WellKnownMetricsTest.java | 6 +- 35 files changed, 526 insertions(+), 322 deletions(-) delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/MetricsScrapping.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/PerBrokerScrapedMetrics.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scraper.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/{ => v2/scrape}/WellKnownMetrics.java (61%) rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/{ => v2/scrape/jmx}/JmxMetricsFormatter.java (95%) rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/{ => v2/scrape/jmx}/JmxMetricsRetriever.java (63%) rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/{ => v2/scrape/jmx}/JmxSslSocketFactory.java (99%) delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prom/PrometheusScraper.java rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/{ => v2/scrape/prometheus}/PrometheusEndpointMetricsParser.java (92%) rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/{ => v2/scrape/prometheus}/PrometheusMetricsRetriever.java (72%) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusScraper.java diff --git a/documentation/compose/jmx-exporter/kafka-broker.yml b/documentation/compose/jmx-exporter/kafka-broker.yml index cb19fee9f8..8bc5057dcf 100644 --- a/documentation/compose/jmx-exporter/kafka-broker.yml +++ b/documentation/compose/jmx-exporter/kafka-broker.yml @@ -1,2 +1,69 @@ +lowercaseOutputName: true rules: - - pattern: ".*" \ No newline at end of file + # Special cases and very specific rules + - pattern: kafka.server<>Value + name: kafka_server_$1_$2 + type: GAUGE + labels: + clientId: '$3' + topic: '$4' + partition: '$5' + - pattern: kafka.server<>Value + name: kafka_server_$1_$2 + type: GAUGE + labels: + clientId: '$3' + broker: '$4:$5' + + - pattern: kafka.server<>OneMinuteRate + name: kafka_server_kafkarequesthandlerpool_requesthandleravgidlepercent_total + type: GAUGE + + - pattern: kafka.server<>connections + name: kafka_server_socketservermetrics_connections + type: GAUGE + labels: + client_software_name: '$1' + client_software_version: '$2' + listener: '$3' + network_processor: '$4' + + - pattern: 'kafka.server<>(.+):' + name: kafka_server_socketservermetrics_$3 + type: GAUGE + labels: + listener: '$1' + network_processor: '$2' + + # Count and Value + - pattern: kafka.(.*)<>(Count|Value) + name: kafka_$1_$2_$3 + labels: + '$4': '$5' + '$6': '$7' + - pattern: kafka.(.*)<>(Count|Value) + name: kafka_$1_$2_$3 + labels: + '$4': '$5' + - pattern: kafka.(.*)<>(Count|Value) + name: kafka_$1_$2_$3 + + # Percentile + - pattern: kafka.(.*)<>(\d+)thPercentile + name: kafka_$1_$2_$3 + type: GAUGE + labels: + '$4': '$5' + '$6': '$7' + quantile: '0.$8' + - pattern: kafka.(.*)<>(\d+)thPercentile + name: kafka_$1_$2_$3 + type: GAUGE + labels: + '$4': '$5' + quantile: '0.$6' + - pattern: kafka.(.*)<>(\d+)thPercentile + name: kafka_$1_$2_$3 + type: GAUGE + labels: + quantile: '0.$4' diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 4dbd0ae0f1..c8e5154c1c 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -244,6 +244,11 @@ simpleclient_common 0.16.0 + + io.prometheus + simpleclient_pushgateway + 0.16.0 + diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 64ec894cd5..07d39bbb1f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -1,6 +1,6 @@ package com.provectus.kafka.ui.config; -import com.provectus.kafka.ui.model.MetricsConfig; +import com.provectus.kafka.ui.model.MetricsScrapeProperties; import jakarta.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashMap; @@ -63,7 +63,7 @@ public class ClustersProperties { } @Data - @ToString(exclude = "password") + @ToString(exclude = {"password", "keystorePassword"}) public static class MetricsConfigData { String type; Integer port; @@ -72,6 +72,31 @@ public class ClustersProperties { String password; String keystoreLocation; String keystorePassword; + +// JmxScraper jmxScraper; +// PrometheusScraper prometheusScraper; +// +// @Data +// @ToString(exclude = "password") +// public static class JmxScraper { +// Integer port; +// Boolean ssl; +// String username; +// String password; +// String keystoreLocation; +// String keystorePassword; +// } +// +// @Data +// @ToString(exclude = "password") +// public static class PrometheusScraper { +// Integer port; +// Boolean ssl; +// String username; +// String password; +// String keystoreLocation; +// String keystorePassword; +// } } @Data @@ -155,7 +180,7 @@ public class ClustersProperties { private void setMetricsDefaults() { for (Cluster cluster : clusters) { if (cluster.getMetrics() != null && !StringUtils.hasText(cluster.getMetrics().getType())) { - cluster.getMetrics().setType(MetricsConfig.JMX_METRICS_TYPE); + cluster.getMetrics().setType(MetricsScrapeProperties.JMX_METRICS_TYPE); } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index a122a269a4..fe89dadd4d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -52,6 +52,7 @@ public interface ClusterMapper { ClusterStatsDTO toClusterStats(InternalClusterState clusterState); + @Deprecated default ClusterMetricsDTO toClusterMetrics(Metrics metrics) { return new ClusterMetricsDTO() .items(metrics.getSummarizedMetrics().map(this::convert).collect(Collectors.toList())); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index 1e2903dbcc..3d3c0e747a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -5,6 +5,7 @@ import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; import com.provectus.kafka.ui.emitter.PollingSettings; import com.provectus.kafka.ui.service.ksql.KsqlApiClient; import com.provectus.kafka.ui.service.masking.DataMasking; +import com.provectus.kafka.ui.service.metrics.v2.scrape.MetricsScrapping; import com.provectus.kafka.ui.sr.api.KafkaSrClientApi; import com.provectus.kafka.ui.util.ReactiveFailover; import java.util.Map; @@ -25,10 +26,10 @@ public class KafkaCluster { private final String bootstrapServers; private final Properties properties; private final boolean readOnly; - private final MetricsConfig metricsConfig; private final DataMasking masking; private final PollingSettings pollingSettings; private final ReactiveFailover schemaRegistryClient; private final Map> connectsClients; private final ReactiveFailover ksqlClient; + private final MetricsScrapping metricsScrapping; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java index 02bfe6dea1..d972cfdf6f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java @@ -1,8 +1,11 @@ package com.provectus.kafka.ui.model; +import static io.prometheus.client.Collector.*; import static java.util.stream.Collectors.toMap; import com.provectus.kafka.ui.service.metrics.RawMetric; +import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.InferredMetrics; +import io.prometheus.client.Collector; import java.math.BigDecimal; import java.util.Collection; import java.util.List; @@ -16,25 +19,32 @@ import lombok.Value; @Value public class Metrics { - Map brokerBytesInPerSec; - Map brokerBytesOutPerSec; - Map topicBytesInPerSec; - Map topicBytesOutPerSec; - Map> perBrokerMetrics; - public static Metrics empty() { return Metrics.builder() - .brokerBytesInPerSec(Map.of()) - .brokerBytesOutPerSec(Map.of()) - .topicBytesInPerSec(Map.of()) - .topicBytesOutPerSec(Map.of()) - .perBrokerMetrics(Map.of()) + .ioRates(null) //TODO: empty + .perBrokerScrapedMetrics(Map.of()) + .inferredMetrics(InferredMetrics.empty()) .build(); } + @Builder + public record IoRates(Map brokerBytesInPerSec, + Map brokerBytesOutPerSec, + Map topicBytesInPerSec, + Map topicBytesOutPerSec) { + } + + IoRates ioRates; + InferredMetrics inferredMetrics; + Map> perBrokerScrapedMetrics; + + @Deprecated public Stream getSummarizedMetrics() { - return perBrokerMetrics.values().stream() + return perBrokerScrapedMetrics + .values() + .stream() .flatMap(Collection::stream) + .flatMap(RawMetric::create) .collect(toMap(RawMetric::identityKey, m -> m, (m1, m2) -> m1.copyWithValue(m1.value().add(m2.value())))) .values() .stream(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java deleted file mode 100644 index d355144343..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsConfig.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.provectus.kafka.ui.model; - -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; - -@Data -@Builder(toBuilder = true) -@AllArgsConstructor(access = AccessLevel.PRIVATE) -public class MetricsConfig { - public static final String JMX_METRICS_TYPE = "JMX"; - public static final String PROMETHEUS_METRICS_TYPE = "PROMETHEUS"; - - private final String type; - private final Integer port; - private final boolean ssl; - private final String username; - private final String password; - private final String keystoreLocation; - private final String keystorePassword; -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java new file mode 100644 index 0000000000..aedff66df6 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java @@ -0,0 +1,29 @@ +package com.provectus.kafka.ui.model; + +import static com.provectus.kafka.ui.config.ClustersProperties.*; + +import com.provectus.kafka.ui.config.ClustersProperties; +import jakarta.annotation.Nullable; +import lombok.Builder; +import lombok.Data; +import lombok.Value; + +@Value +@Builder +public class MetricsScrapeProperties { + public static final String JMX_METRICS_TYPE = "JMX"; + public static final String PROMETHEUS_METRICS_TYPE = "PROMETHEUS"; + + Integer port; + boolean ssl; + String username; + String password; + + @Nullable + KeystoreConfig keystoreConfig; + + @Nullable + TruststoreConfig truststoreConfig; + + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java index 964b25473d..0594488a4a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java @@ -8,9 +8,10 @@ import com.provectus.kafka.ui.emitter.PollingSettings; import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO; import com.provectus.kafka.ui.model.ClusterConfigValidationDTO; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.MetricsConfig; import com.provectus.kafka.ui.service.ksql.KsqlApiClient; import com.provectus.kafka.ui.service.masking.DataMasking; +import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsRetriever; +import com.provectus.kafka.ui.service.metrics.v2.scrape.MetricsScrapping; import com.provectus.kafka.ui.sr.ApiClient; import com.provectus.kafka.ui.sr.api.KafkaSrClientApi; import com.provectus.kafka.ui.util.KafkaServicesValidation; @@ -22,7 +23,6 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.stream.Stream; -import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.unit.DataSize; @@ -39,11 +39,13 @@ public class KafkaClusterFactory { private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB"); private final DataSize webClientMaxBuffSize; + private final JmxMetricsRetriever jmxMetricsRetriever; - public KafkaClusterFactory(WebclientProperties webclientProperties) { + public KafkaClusterFactory(WebclientProperties webclientProperties, JmxMetricsRetriever jmxMetricsRetriever) { this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize()) .map(DataSize::parse) .orElse(DEFAULT_WEBCLIENT_BUFFER); + this.jmxMetricsRetriever = jmxMetricsRetriever; } public KafkaCluster create(ClustersProperties properties, @@ -56,6 +58,7 @@ public class KafkaClusterFactory { builder.readOnly(clusterProperties.isReadOnly()); builder.masking(DataMasking.create(clusterProperties.getMasking())); builder.pollingSettings(PollingSettings.create(clusterProperties, properties)); + builder.metricsScrapping(MetricsScrapping.create(clusterProperties, jmxMetricsRetriever)); if (schemaRegistryConfigured(clusterProperties)) { builder.schemaRegistryClient(schemaRegistryClient(clusterProperties)); @@ -66,9 +69,6 @@ public class KafkaClusterFactory { if (ksqlConfigured(clusterProperties)) { builder.ksqlClient(ksqlClient(clusterProperties)); } - if (metricsConfigured(clusterProperties)) { - builder.metricsConfig(metricsConfigDataToMetricsConfig(clusterProperties.getMetrics())); - } builder.originalProperties(clusterProperties); return builder.build(); } @@ -202,20 +202,4 @@ public class KafkaClusterFactory { return clusterProperties.getMetrics() != null; } - @Nullable - private MetricsConfig metricsConfigDataToMetricsConfig(ClustersProperties.MetricsConfigData metricsConfigData) { - if (metricsConfigData == null) { - return null; - } - MetricsConfig.MetricsConfigBuilder builder = MetricsConfig.builder(); - builder.type(metricsConfigData.getType()); - builder.port(metricsConfigData.getPort()); - builder.ssl(Optional.ofNullable(metricsConfigData.getSsl()).orElse(false)); - builder.username(metricsConfigData.getUsername()); - builder.password(metricsConfigData.getPassword()); - builder.keystoreLocation(metricsConfigData.getKeystoreLocation()); - builder.keystorePassword(metricsConfigData.getKeystorePassword()); - return builder.build(); - } - } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java index 0076ebbb88..274158a04b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/StatisticsService.java @@ -8,7 +8,6 @@ import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Metrics; import com.provectus.kafka.ui.model.ServerStatusDTO; import com.provectus.kafka.ui.model.Statistics; -import com.provectus.kafka.ui.service.metrics.MetricsCollector; import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState; import java.util.List; import java.util.Map; @@ -20,13 +19,13 @@ import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; @Service @RequiredArgsConstructor @Slf4j public class StatisticsService { - private final MetricsCollector metricsCollector; private final AdminClientService adminClientService; private final FeatureService featureService; private final StatisticsCache cache; @@ -37,30 +36,24 @@ public class StatisticsService { private Mono getStatistics(KafkaCluster cluster) { return adminClientService.get(cluster).flatMap(ac -> - ac.describeCluster().flatMap(description -> - ac.updateInternalStats(description.getController()).then( - Mono.zip( - List.of( - metricsCollector.getBrokerMetrics(cluster, description.getNodes()), - getLogDirInfo(description, ac), - featureService.getAvailableFeatures(ac, cluster, description), - loadTopicConfigs(cluster), - describeTopics(cluster), - loadClusterState(ac) - ), - results -> - Statistics.builder() - .status(ServerStatusDTO.ONLINE) - .clusterDescription(description) - .version(ac.getVersion()) - .metrics((Metrics) results[0]) - .logDirInfo((InternalLogDirStats) results[1]) - .features((List) results[2]) - .topicConfigs((Map>) results[3]) - .topicDescriptions((Map) results[4]) - .clusterState((ScrapedClusterState) results[5]) - .build() - )))) + ac.describeCluster() + .flatMap(description -> + ac.updateInternalStats(description.getController()) + .then( + Mono.zip( + featureService.getAvailableFeatures(ac, cluster, description), + loadClusterState(description, ac) + ).flatMap(featuresAndState -> + scrapeMetrics(cluster, featuresAndState.getT2(), description) + .map(metrics -> + Statistics.builder() + .status(ServerStatusDTO.ONLINE) + .clusterDescription(description) + .version(ac.getVersion()) + .metrics(metrics) + .features(featuresAndState.getT1()) + .clusterState(featuresAndState.getT2()) + .build()))))) .doOnError(e -> log.error("Failed to collect cluster {} info", cluster.getName(), e)) .onErrorResume( @@ -80,8 +73,15 @@ public class StatisticsService { return adminClientService.get(c).flatMap(ReactiveAdminClient::getTopicsConfig); } - private Mono loadClusterState(ReactiveAdminClient ac){ - return ScrapedClusterState.scrape(ac); + private Mono loadClusterState(ClusterDescription clusterDescription, + ReactiveAdminClient ac) { + return ScrapedClusterState.scrape(clusterDescription, ac); + } + + private Mono scrapeMetrics(KafkaCluster c, + ScrapedClusterState clusterState, + ClusterDescription clusterDescription) { + return c.getMetricsScrapping().scrape(clusterState, clusterDescription.getNodes()); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java deleted file mode 100644 index fca7ab1fea..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.provectus.kafka.ui.service.metrics; - -import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.Metrics; -import com.provectus.kafka.ui.model.MetricsConfig; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.Node; -import org.springframework.stereotype.Component; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; - -@Component -@Slf4j -@RequiredArgsConstructor -public class MetricsCollector { - - private final JmxMetricsRetriever jmxMetricsRetriever; - private final PrometheusMetricsRetriever prometheusMetricsRetriever; - - public Mono getBrokerMetrics(KafkaCluster cluster, Collection nodes) { - return Flux.fromIterable(nodes) - .flatMap(n -> getMetrics(cluster, n).map(lst -> Tuples.of(n, lst))) - .collectMap(Tuple2::getT1, Tuple2::getT2) - .map(nodeMetrics -> collectMetrics(cluster, nodeMetrics)) - .defaultIfEmpty(Metrics.empty()); - } - - private Mono> getMetrics(KafkaCluster kafkaCluster, Node node) { - Flux metricFlux = Flux.empty(); - if (kafkaCluster.getMetricsConfig() != null) { - String type = kafkaCluster.getMetricsConfig().getType(); - if (type == null || type.equalsIgnoreCase(MetricsConfig.JMX_METRICS_TYPE)) { - metricFlux = jmxMetricsRetriever.retrieve(kafkaCluster, node); - } else if (type.equalsIgnoreCase(MetricsConfig.PROMETHEUS_METRICS_TYPE)) { - metricFlux = prometheusMetricsRetriever.retrieve(kafkaCluster, node); - } - } - return metricFlux.collectList(); - } - - public Metrics collectMetrics(KafkaCluster cluster, Map> perBrokerMetrics) { - Metrics.MetricsBuilder builder = Metrics.builder() - .perBrokerMetrics( - perBrokerMetrics.entrySet() - .stream() - .collect(Collectors.toMap(e -> e.getKey().id(), Map.Entry::getValue))); - - populateWellknowMetrics(cluster, perBrokerMetrics) - .apply(builder); - - return builder.build(); - } - - private WellKnownMetrics populateWellknowMetrics(KafkaCluster cluster, Map> perBrokerMetrics) { - WellKnownMetrics wellKnownMetrics = new WellKnownMetrics(); - perBrokerMetrics.forEach((node, metrics) -> - metrics.forEach(metric -> - wellKnownMetrics.populate(node, metric))); - return wellKnownMetrics; - } - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java deleted file mode 100644 index 7e1e126fa0..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsRetriever.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.provectus.kafka.ui.service.metrics; - -import com.provectus.kafka.ui.model.KafkaCluster; -import org.apache.kafka.common.Node; -import reactor.core.publisher.Flux; - -interface MetricsRetriever { - Flux retrieve(KafkaCluster c, Node node); -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java index 32fe2fa2a6..47452d24f0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java @@ -1,7 +1,14 @@ package com.provectus.kafka.ui.service.metrics; +import static io.prometheus.client.Collector.*; + +import io.prometheus.client.Collector; import java.math.BigDecimal; +import java.util.Collection; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -23,10 +30,27 @@ public interface RawMetric { //-------------------------------------------------- + static Stream groupIntoMFS(Collection lst) { + //TODO: impl + return null; + } + static RawMetric create(String name, Map labels, BigDecimal value) { return new SimpleMetric(name, labels, value); } + static Stream create(MetricFamilySamples samples) { + return samples.samples.stream() + .map(s -> create( + s.name, + IntStream.range(0, s.labelNames.size()) + .boxed() + .collect(Collectors.toMap(s.labelNames::get, s.labelValues::get)), + BigDecimal.valueOf(s.value) + ) + ); + } + record SimpleMetric(String name, Map labels, BigDecimal value) implements RawMetric { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/MetricsScrapping.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/MetricsScrapping.java new file mode 100644 index 0000000000..4d3a497c12 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/MetricsScrapping.java @@ -0,0 +1,87 @@ +package com.provectus.kafka.ui.service.metrics.v2.scrape; + +import static com.provectus.kafka.ui.config.ClustersProperties.*; +import static com.provectus.kafka.ui.model.MetricsScrapeProperties.*; + +import com.provectus.kafka.ui.model.Metrics; +import com.provectus.kafka.ui.model.MetricsScrapeProperties; +import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.InferredMetrics; +import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.InferredMetricsScraper; +import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsRetriever; +import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsScraper; +import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusScraper; +import jakarta.annotation.Nullable; +import java.util.Collection; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.Node; +import reactor.core.publisher.Mono; + +@RequiredArgsConstructor +public class MetricsScrapping { + + private final InferredMetricsScraper inferredMetricsScraper; + + @Nullable + private final JmxMetricsScraper jmxMetricsScraper; + + @Nullable + private final PrometheusScraper prometheusScraper; + + public static MetricsScrapping create(Cluster cluster, + JmxMetricsRetriever jmxMetricsRetriever) { + InferredMetricsScraper inferredMetricsScraper = new InferredMetricsScraper(); + JmxMetricsScraper jmxMetricsScraper = null; + PrometheusScraper prometheusScraper = null; + + var metrics = cluster.getMetrics(); + if (cluster.getMetrics() != null) { + var scrapeProperties = createScrapeProps(cluster); + if (metrics.getType() == null || metrics.getType().equalsIgnoreCase(JMX_METRICS_TYPE)) { + jmxMetricsScraper = new JmxMetricsScraper(scrapeProperties, jmxMetricsRetriever); + } else if (metrics.getType().equalsIgnoreCase(PROMETHEUS_METRICS_TYPE)) { + prometheusScraper = new PrometheusScraper(scrapeProperties); + } + } + return new MetricsScrapping(inferredMetricsScraper, jmxMetricsScraper, prometheusScraper); + } + + private static MetricsScrapeProperties createScrapeProps(Cluster cluster) { + var metrics = cluster.getMetrics(); + return MetricsScrapeProperties.builder() + .port(metrics.getPort()) + .ssl(metrics.getSsl()) + .username(metrics.getUsername()) + .password(metrics.getPassword()) + .truststoreConfig(cluster.getSsl()) + .keystoreConfig( + metrics.getKeystoreLocation() != null + ? new KeystoreConfig(metrics.getKeystoreLocation(), metrics.getKeystorePassword()) + : null + ) + .build(); + } + + public Mono scrape(ScrapedClusterState clusterState, Collection nodes) { + Mono inferred = inferredMetricsScraper.scrape(clusterState); + Mono external = scrapeExternal(nodes); + return inferred.zipWith( + external, + (inf, ext) -> Metrics.builder() + .ioRates(ext.ioRates()) + .perBrokerScrapedMetrics(ext.getPerBrokerMetrics()) + .inferredMetrics(inf) + .build() + ); + } + + private Mono scrapeExternal(Collection nodes) { + if (jmxMetricsScraper != null) { + return jmxMetricsScraper.scrape(nodes); + } + if (prometheusScraper != null) { + return prometheusScraper.scrape(nodes); + } + return Mono.just(PerBrokerScrapedMetrics.empty()); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/PerBrokerScrapedMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/PerBrokerScrapedMetrics.java new file mode 100644 index 0000000000..07dfe559e9 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/PerBrokerScrapedMetrics.java @@ -0,0 +1,25 @@ +package com.provectus.kafka.ui.service.metrics.v2.scrape; + +import com.provectus.kafka.ui.model.Metrics; +import io.prometheus.client.Collector; +import java.util.List; +import java.util.Map; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class PerBrokerScrapedMetrics { + + @Getter + private final Map> perBrokerMetrics; + + public static PerBrokerScrapedMetrics empty() { + return new PerBrokerScrapedMetrics(Map.of()); + } + + Metrics.IoRates ioRates() { + //TODO: rename WKMetrics + return new WellKnownMetrics(perBrokerMetrics).ioRates(); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java index ab8da8e1ff..f9ece26c6f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java @@ -1,16 +1,27 @@ package com.provectus.kafka.ui.service.metrics.v2.scrape; +import static com.provectus.kafka.ui.service.ReactiveAdminClient.*; + import com.google.common.collect.Table; +import com.provectus.kafka.ui.model.InternalLogDirStats; import com.provectus.kafka.ui.service.ReactiveAdminClient; import java.time.Instant; +import java.util.Collection; import java.util.List; import java.util.Map; +import lombok.Builder; import lombok.Value; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo; +import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Mono; +@Builder @Value public class ScrapedClusterState { @@ -22,7 +33,7 @@ public class ScrapedClusterState { String name, List configs, TopicDescription description, - Map offsets, + Map endOffsets, SegmentStats segmentStats, Map partitionsSegmentStats) { } @@ -30,6 +41,7 @@ public class ScrapedClusterState { record ConsumerGroupState( Instant scrapeTime, String group, + org.apache.kafka.common.ConsumerGroupState state, ConsumerGroupDescription description, Table committedOffsets, Map lastTopicActivity) { @@ -45,11 +57,36 @@ public class ScrapedClusterState { Map consumerGroupsStates; public static ScrapedClusterState empty() { - //TODO impl - return null; + return ScrapedClusterState.builder() + .scrapeStartTime(Instant.now()) + .nodesStates(Map.of()) + .topicStates(Map.of()) + .consumerGroupsStates(Map.of()) + .build(); } - public static Mono scrape(ReactiveAdminClient ac) { + public static Mono scrape(ClusterDescription clusterDescription, + ReactiveAdminClient ac) { + + Mono segmentStatsMono = ac.describeLogDirs().map(InternalLogDirStats::new); + Mono> cgListingsMono = ac.listConsumerGroups().map(l -> l.stream().map(ConsumerGroupListing::groupId).toList()); + Mono> topicDescriptionsMono = ac.describeTopics(); + Mono>> topicConfigsMono = ac.getTopicsConfig(); + + Mono.zip( + segmentStatsMono, + cgListingsMono, + topicDescriptionsMono, + topicConfigsMono + ).flatMap(tuple -> { + InternalLogDirStats segmentStats = tuple.getT1(); + List consumerGroups = tuple.getT2(); + Map topicDescriptions = tuple.getT3(); + Map> topicConfigs = tuple.getT4(); + + Mono<> + }) + return null;//TODO impl } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java deleted file mode 100644 index 9155ad4f6b..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.provectus.kafka.ui.service.metrics.v2.scrape; - -import io.prometheus.client.Collector.MetricFamilySamples; -import java.util.Collection; - -import java.util.List; -import java.util.stream.Stream; - -public interface ScrapedMetrics { - - static ScrapedMetrics create(Collection lst) { - return lst::stream; - } - - static ScrapedMetrics empty() { - return create(List.of()); - } - - Stream asStream(); - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scraper.java deleted file mode 100644 index 4eba7ee93b..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scraper.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.provectus.kafka.ui.service.metrics.v2.scrape; - - -import reactor.core.publisher.Mono; - -public interface Scraper { - - Mono scrape(); - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java deleted file mode 100644 index a0048c626f..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.provectus.kafka.ui.service.metrics.v2.scrape; - -public class Scrapping { - - - - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/WellKnownMetrics.java similarity index 61% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/WellKnownMetrics.java index 10ad128c65..f6bedf673d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/WellKnownMetrics.java @@ -1,15 +1,18 @@ -package com.provectus.kafka.ui.service.metrics; +package com.provectus.kafka.ui.service.metrics.v2.scrape; import static org.apache.commons.lang3.StringUtils.containsIgnoreCase; import static org.apache.commons.lang3.StringUtils.endsWithIgnoreCase; import com.provectus.kafka.ui.model.Metrics; +import com.provectus.kafka.ui.service.metrics.RawMetric; +import io.prometheus.client.Collector; import java.math.BigDecimal; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.kafka.common.Node; -class WellKnownMetrics { +public class WellKnownMetrics { // per broker final Map brokerBytesInFifteenMinuteRate = new HashMap<>(); @@ -19,33 +22,41 @@ class WellKnownMetrics { final Map bytesInFifteenMinuteRate = new HashMap<>(); final Map bytesOutFifteenMinuteRate = new HashMap<>(); - void populate(Node node, RawMetric rawMetric) { - updateBrokerIOrates(node, rawMetric); - updateTopicsIOrates(rawMetric); + public WellKnownMetrics(Map> perBrokerMetrics) { + perBrokerMetrics.forEach((nodeId, metrics) -> { + metrics.forEach(m -> { + RawMetric.create(m).forEach(rawMetric -> { + updateBrokerIOrates(nodeId, rawMetric); + updateTopicsIOrates(rawMetric); + }); + }); + }); } - void apply(Metrics.MetricsBuilder metricsBuilder) { - metricsBuilder.topicBytesInPerSec(bytesInFifteenMinuteRate); - metricsBuilder.topicBytesOutPerSec(bytesOutFifteenMinuteRate); - metricsBuilder.brokerBytesInPerSec(brokerBytesInFifteenMinuteRate); - metricsBuilder.brokerBytesOutPerSec(brokerBytesOutFifteenMinuteRate); + public Metrics.IoRates ioRates() { + return Metrics.IoRates.builder() + .topicBytesInPerSec(bytesInFifteenMinuteRate) + .topicBytesOutPerSec(bytesOutFifteenMinuteRate) + .brokerBytesInPerSec(brokerBytesInFifteenMinuteRate) + .brokerBytesOutPerSec(brokerBytesOutFifteenMinuteRate) + .build(); } - private void updateBrokerIOrates(Node node, RawMetric rawMetric) { + private void updateBrokerIOrates(int nodeId, RawMetric rawMetric) { String name = rawMetric.name(); - if (!brokerBytesInFifteenMinuteRate.containsKey(node.id()) + if (!brokerBytesInFifteenMinuteRate.containsKey(nodeId) && rawMetric.labels().size() == 1 && "BytesInPerSec".equalsIgnoreCase(rawMetric.labels().get("name")) && containsIgnoreCase(name, "BrokerTopicMetrics") && endsWithIgnoreCase(name, "FifteenMinuteRate")) { - brokerBytesInFifteenMinuteRate.put(node.id(), rawMetric.value()); + brokerBytesInFifteenMinuteRate.put(nodeId, rawMetric.value()); } - if (!brokerBytesOutFifteenMinuteRate.containsKey(node.id()) + if (!brokerBytesOutFifteenMinuteRate.containsKey(nodeId) && rawMetric.labels().size() == 1 && "BytesOutPerSec".equalsIgnoreCase(rawMetric.labels().get("name")) && containsIgnoreCase(name, "BrokerTopicMetrics") && endsWithIgnoreCase(name, "FifteenMinuteRate")) { - brokerBytesOutFifteenMinuteRate.put(node.id(), rawMetric.value()); + brokerBytesOutFifteenMinuteRate.put(nodeId, rawMetric.value()); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java index 2810d58ac9..e9b6fd4f33 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java @@ -1,23 +1,23 @@ package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred; -import static io.prometheus.client.Collector.*; +import static io.prometheus.client.Collector.MetricFamilySamples; -import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState; -import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics; import java.util.List; -import java.util.stream.Stream; -public class InferredMetrics implements ScrapedMetrics { +public class InferredMetrics { private final List metrics; + public static InferredMetrics empty() { + return new InferredMetrics(List.of()); + } + public InferredMetrics(List metrics) { this.metrics = metrics; } - @Override - public Stream asStream() { - return metrics.stream(); + public List asList() { + return metrics; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java index fcd7d7d968..bb3b4a85b6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java @@ -1,25 +1,20 @@ package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred; import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState; -import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper; import java.util.List; -import java.util.function.Supplier; import lombok.RequiredArgsConstructor; import reactor.core.publisher.Mono; @RequiredArgsConstructor -public class InferredMetricsScraper implements Scraper { +public class InferredMetricsScraper { - private final Supplier currentStateSupplier; private ScrapedClusterState prevState = null; - @Override - public synchronized Mono scrape() { + public synchronized Mono scrape(ScrapedClusterState newState) { if (prevState == null) { - prevState = currentStateSupplier.get(); - return Mono.empty(); + prevState = newState; + return Mono.just(InferredMetrics.empty()); } - var newState = currentStateSupplier.get(); var inferred = infer(prevState, newState); prevState = newState; return Mono.just(inferred); @@ -27,6 +22,7 @@ public class InferredMetricsScraper implements Scraper { private static InferredMetrics infer(ScrapedClusterState prevState, ScrapedClusterState newState) { + //TODO: impl return new InferredMetrics(List.of()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsFormatter.java similarity index 95% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatter.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsFormatter.java index 4d3d31f50f..98b97e746d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsFormatter.java @@ -1,5 +1,7 @@ -package com.provectus.kafka.ui.service.metrics; +package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx; +import com.provectus.kafka.ui.service.metrics.RawMetric; +import io.prometheus.client.Collector; import java.math.BigDecimal; import java.util.ArrayList; import java.util.LinkedHashMap; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsRetriever.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsRetriever.java similarity index 63% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsRetriever.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsRetriever.java index e7a58cbae2..9c74abe66e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxMetricsRetriever.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsRetriever.java @@ -1,6 +1,8 @@ -package com.provectus.kafka.ui.service.metrics; +package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.MetricsScrapeProperties; +import com.provectus.kafka.ui.service.metrics.RawMetric; import java.io.Closeable; import java.util.ArrayList; import java.util.HashMap; @@ -18,14 +20,13 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.Node; import org.springframework.stereotype.Service; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @Service @Slf4j -class JmxMetricsRetriever implements MetricsRetriever, Closeable { +public class JmxMetricsRetriever implements Closeable { private static final boolean SSL_JMX_SUPPORTED; @@ -43,35 +44,34 @@ class JmxMetricsRetriever implements MetricsRetriever, Closeable { JmxSslSocketFactory.clearFactoriesCache(); } - @Override - public Flux retrieve(KafkaCluster c, Node node) { - if (isSslJmxEndpoint(c) && !SSL_JMX_SUPPORTED) { - log.warn("Cluster {} has jmx ssl configured, but it is not supported", c.getName()); - return Flux.empty(); + public Mono> retrieveFromNode(MetricsScrapeProperties metricsConfig, Node node) { + if (isSslJmxEndpoint(metricsConfig) && !SSL_JMX_SUPPORTED) { + log.warn("Cluster has jmx ssl configured, but it is not supported by app"); + return Mono.just(List.of()); } - return Mono.fromSupplier(() -> retrieveSync(c, node)) - .subscribeOn(Schedulers.boundedElastic()) - .flatMapMany(Flux::fromIterable); + return Mono.fromSupplier(() -> retrieveSync(metricsConfig, node)) + .subscribeOn(Schedulers.boundedElastic()); } - private boolean isSslJmxEndpoint(KafkaCluster cluster) { - return cluster.getMetricsConfig().getKeystoreLocation() != null; + private boolean isSslJmxEndpoint(MetricsScrapeProperties metricsScrapeProperties) { + return metricsScrapeProperties.getKeystoreConfig() != null + && metricsScrapeProperties.getKeystoreConfig().getKeystoreLocation() != null; } @SneakyThrows - private List retrieveSync(KafkaCluster c, Node node) { - String jmxUrl = JMX_URL + node.host() + ":" + c.getMetricsConfig().getPort() + "/" + JMX_SERVICE_TYPE; + private List retrieveSync(MetricsScrapeProperties metricsConfig, Node node) { + String jmxUrl = JMX_URL + node.host() + ":" + metricsConfig.getPort() + "/" + JMX_SERVICE_TYPE; log.debug("Collection JMX metrics for {}", jmxUrl); List result = new ArrayList<>(); - withJmxConnector(jmxUrl, c, jmxConnector -> getMetricsFromJmx(jmxConnector, result)); + withJmxConnector(jmxUrl, metricsConfig, jmxConnector -> getMetricsFromJmx(jmxConnector, result)); log.debug("{} metrics collected for {}", result.size(), jmxUrl); return result; } private void withJmxConnector(String jmxUrl, - KafkaCluster c, + MetricsScrapeProperties metricsConfig, Consumer consumer) { - var env = prepareJmxEnvAndSetThreadLocal(c); + var env = prepareJmxEnvAndSetThreadLocal(metricsConfig); try (JMXConnector connector = JMXConnectorFactory.newJMXConnector(new JMXServiceURL(jmxUrl), env)) { try { connector.connect(env); @@ -87,16 +87,16 @@ class JmxMetricsRetriever implements MetricsRetriever, Closeable { } } - private Map prepareJmxEnvAndSetThreadLocal(KafkaCluster cluster) { - var metricsConfig = cluster.getMetricsConfig(); + private Map prepareJmxEnvAndSetThreadLocal(MetricsScrapeProperties metricsConfig) { Map env = new HashMap<>(); - if (isSslJmxEndpoint(cluster)) { - var clusterSsl = cluster.getOriginalProperties().getSsl(); + if (isSslJmxEndpoint(metricsConfig)) { + var truststoreConfig = metricsConfig.getTruststoreConfig(); + var keystoreConfig = metricsConfig.getKeystoreConfig(); JmxSslSocketFactory.setSslContextThreadLocal( - clusterSsl != null ? clusterSsl.getTruststoreLocation() : null, - clusterSsl != null ? clusterSsl.getTruststorePassword() : null, - metricsConfig.getKeystoreLocation(), - metricsConfig.getKeystorePassword() + truststoreConfig != null ? truststoreConfig.getTruststoreLocation() : null, + truststoreConfig != null ? truststoreConfig.getTruststorePassword() : null, + keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null, + keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null ); JmxSslSocketFactory.editJmxConnectorEnv(env); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java index cdec123729..a2dabe8307 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java @@ -1,13 +1,36 @@ package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx; -import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics; -import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper; +import static io.prometheus.client.Collector.*; + +import com.provectus.kafka.ui.model.MetricsScrapeProperties; +import com.provectus.kafka.ui.service.metrics.RawMetric; +import com.provectus.kafka.ui.service.metrics.v2.scrape.PerBrokerScrapedMetrics; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.Node; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; -public class JmxMetricsScraper implements Scraper { +public class JmxMetricsScraper { - @Override - public Mono scrape() { - return null; + private final JmxMetricsRetriever jmxMetricsRetriever; + private final MetricsScrapeProperties scrapeProperties; + + public JmxMetricsScraper(MetricsScrapeProperties scrapeProperties, + JmxMetricsRetriever jmxMetricsRetriever) { + this.scrapeProperties = scrapeProperties; + this.jmxMetricsRetriever = jmxMetricsRetriever; + } + + public Mono scrape(Collection nodes) { + Mono>> collected = Flux.fromIterable(nodes) + .flatMap(n -> jmxMetricsRetriever.retrieveFromNode(scrapeProperties, n).map(metrics -> Tuples.of(n, metrics))) + .collectMap( + t -> t.getT1().id(), + t -> RawMetric.groupIntoMFS(t.getT2()).toList() + ); + return collected.map(PerBrokerScrapedMetrics::new); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxSslSocketFactory.java similarity index 99% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxSslSocketFactory.java index fa84fc361c..4f27c4b4f0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxSslSocketFactory.java @@ -1,4 +1,4 @@ -package com.provectus.kafka.ui.service.metrics; +package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx; import com.google.common.base.Preconditions; import java.io.FileInputStream; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prom/PrometheusScraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prom/PrometheusScraper.java deleted file mode 100644 index de71effc35..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prom/PrometheusScraper.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.provectus.kafka.ui.service.metrics.v2.scrape.prom; - -import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics; -import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper; -import reactor.core.publisher.Mono; - -public class PrometheusScraper implements Scraper { - - @Override - public Mono scrape() { - return null; - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusEndpointMetricsParser.java similarity index 92% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParser.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusEndpointMetricsParser.java index 1a51ca0afa..6fabb5b695 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParser.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusEndpointMetricsParser.java @@ -1,5 +1,6 @@ -package com.provectus.kafka.ui.service.metrics; +package com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus; +import com.provectus.kafka.ui.service.metrics.RawMetric; import java.math.BigDecimal; import java.util.Arrays; import java.util.Optional; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusMetricsRetriever.java similarity index 72% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusMetricsRetriever.java index 33ef1b8072..881e74fff9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetriever.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusMetricsRetriever.java @@ -1,12 +1,14 @@ -package com.provectus.kafka.ui.service.metrics; +package com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus; + +import static io.prometheus.client.Collector.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.provectus.kafka.ui.config.ClustersProperties; -import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.MetricsConfig; +import com.provectus.kafka.ui.model.MetricsScrapeProperties; +import com.provectus.kafka.ui.service.metrics.RawMetric; import com.provectus.kafka.ui.util.WebClientConfigurator; import java.util.Arrays; +import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.Node; @@ -19,33 +21,29 @@ import reactor.core.publisher.Mono; @Service @Slf4j -class PrometheusMetricsRetriever implements MetricsRetriever { +class PrometheusMetricsRetriever { private static final String METRICS_ENDPOINT_PATH = "/metrics"; private static final int DEFAULT_EXPORTER_PORT = 11001; - @Override - public Flux retrieve(KafkaCluster c, Node node) { - log.debug("Retrieving metrics from prometheus exporter: {}:{}", node.host(), c.getMetricsConfig().getPort()); + public Mono> retrieve(MetricsScrapeProperties metricsConfig, Node node) { + log.debug("Retrieving metrics from prometheus exporter: {}:{}", node.host(), metricsConfig.getPort()); - MetricsConfig metricsConfig = c.getMetricsConfig(); var webClient = new WebClientConfigurator() .configureBufferSize(DataSize.ofMegabytes(20)) .configureBasicAuth(metricsConfig.getUsername(), metricsConfig.getPassword()) - .configureSsl( - c.getOriginalProperties().getSsl(), - new ClustersProperties.KeystoreConfig( - metricsConfig.getKeystoreLocation(), - metricsConfig.getKeystorePassword())) + .configureSsl(metricsConfig.getTruststoreConfig(), metricsConfig.getKeystoreConfig()) .build(); - return retrieve(webClient, node.host(), c.getMetricsConfig()); + return retrieve(webClient, node.host(), metricsConfig) + .collectList() + .map(metrics -> RawMetric.groupIntoMFS(metrics).toList()); } @VisibleForTesting - Flux retrieve(WebClient webClient, String host, MetricsConfig metricsConfig) { + Flux retrieve(WebClient webClient, String host, MetricsScrapeProperties metricsConfig) { int port = Optional.ofNullable(metricsConfig.getPort()).orElse(DEFAULT_EXPORTER_PORT); - boolean sslEnabled = metricsConfig.isSsl() || metricsConfig.getKeystoreLocation() != null; + boolean sslEnabled = metricsConfig.isSsl() || metricsConfig.getKeystoreConfig() != null; var request = webClient.get() .uri(UriComponentsBuilder.newInstance() .scheme(sslEnabled ? "https" : "http") diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusScraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusScraper.java new file mode 100644 index 0000000000..71ddc1700d --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prometheus/PrometheusScraper.java @@ -0,0 +1,31 @@ +package com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus; + +import com.provectus.kafka.ui.model.MetricsScrapeProperties; +import com.provectus.kafka.ui.service.metrics.v2.scrape.PerBrokerScrapedMetrics; +import io.prometheus.client.Collector; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.Node; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; + +public class PrometheusScraper { + + private final static PrometheusMetricsRetriever RETRIEVER = new PrometheusMetricsRetriever(); + + private final MetricsScrapeProperties metricsConfig; + + public PrometheusScraper(MetricsScrapeProperties metricsConfig) { + this.metricsConfig = metricsConfig; + } + + public Mono scrape(Collection clusterNodes) { + Mono>> collected = Flux.fromIterable(clusterNodes) + .flatMap(n -> RETRIEVER.retrieve(metricsConfig, n).map(metrics -> Tuples.of(n, metrics))) + .collectMap(t -> t.getT1().id(), t -> t.getT2()); + + return collected.map(PerBrokerScrapedMetrics::new); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java index 0293e4f925..373a85d8bf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java @@ -81,9 +81,6 @@ public class ReactiveFailover { .flatMap(f) .onErrorResume(failoverExceptionsPredicate, th -> { publisher.markFailed(); - if (candidates.size() == 1) { - return Mono.error(th); - } var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList(); if (newCandidates.isEmpty()) { return Mono.error(th); @@ -106,9 +103,6 @@ public class ReactiveFailover { .flatMapMany(f) .onErrorResume(failoverExceptionsPredicate, th -> { publisher.markFailed(); - if (candidates.size() == 1) { - return Flux.error(th); - } var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList(); if (newCandidates.isEmpty()) { return Flux.error(th); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java index 1a4ff5134e..5823b51ddc 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/JmxMetricsFormatterTest.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service.metrics; import static org.assertj.core.api.Assertions.assertThat; +import com.provectus.kafka.ui.service.metrics.v2.scrape.jmx.JmxMetricsFormatter; import java.math.BigDecimal; import java.util.List; import java.util.Map; @@ -74,4 +75,4 @@ class JmxMetricsFormatterTest { assertThat(actual.value()).isCloseTo(expected.value(), Offset.offset(new BigDecimal("0.001"))); } -} \ No newline at end of file +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java index e2f02f41f4..1b2d992f8d 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service.metrics; import static org.assertj.core.api.Assertions.assertThat; +import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusEndpointMetricsParser; import java.util.Map; import java.util.Optional; import org.junit.jupiter.api.Test; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java index 9cc0494039..ddbe04b283 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusMetricsRetrieverTest.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service.metrics; -import com.provectus.kafka.ui.model.MetricsConfig; +import com.provectus.kafka.ui.model.MetricsScrapeProperties; +import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusMetricsRetriever; import java.io.IOException; import java.math.BigDecimal; import java.util.List; @@ -34,7 +35,7 @@ class PrometheusMetricsRetrieverTest { var url = mockWebServer.url("/metrics"); mockWebServer.enqueue(prepareResponse()); - MetricsConfig metricsConfig = prepareMetricsConfig(url.port(), null, null); + MetricsScrapeProperties metricsConfig = prepareMetricsConfig(url.port(), null, null); StepVerifier.create(retriever.retrieve(WebClient.create(), url.host(), metricsConfig)) .expectNextSequence(expectedRawMetrics()) @@ -48,7 +49,7 @@ class PrometheusMetricsRetrieverTest { mockWebServer.enqueue(prepareResponse()); - MetricsConfig metricsConfig = prepareMetricsConfig(url.port(), "username", "password"); + MetricsScrapeProperties metricsConfig = prepareMetricsConfig(url.port(), "username", "password"); StepVerifier.create(retriever.retrieve(WebClient.create(), url.host(), metricsConfig)) .expectNextSequence(expectedRawMetrics()) @@ -69,11 +70,11 @@ class PrometheusMetricsRetrieverTest { ); } - MetricsConfig prepareMetricsConfig(Integer port, String username, String password) { - return MetricsConfig.builder() + MetricsScrapeProperties prepareMetricsConfig(Integer port, String username, String password) { + return MetricsScrapeProperties.builder() .ssl(false) .port(port) - .type(MetricsConfig.PROMETHEUS_METRICS_TYPE) + .type(MetricsScrapeProperties.PROMETHEUS_METRICS_TYPE) .username(username) .password(password) .build(); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java index c1c9c04058..27467cbdbf 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/WellKnownMetricsTest.java @@ -3,6 +3,8 @@ package com.provectus.kafka.ui.service.metrics; import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.model.Metrics; +import com.provectus.kafka.ui.service.metrics.v2.scrape.WellKnownMetrics; +import com.provectus.kafka.ui.service.metrics.v2.scrape.prometheus.PrometheusEndpointMetricsParser; import java.math.BigDecimal; import java.util.Arrays; import java.util.Map; @@ -68,7 +70,7 @@ class WellKnownMetricsTest { wellKnownMetrics.brokerBytesOutFifteenMinuteRate.put(2, new BigDecimal(20)); Metrics.MetricsBuilder builder = Metrics.builder(); - wellKnownMetrics.apply(builder); + wellKnownMetrics.ioRates(builder); var metrics = builder.build(); // checking per topic io rates @@ -90,4 +92,4 @@ class WellKnownMetricsTest { .forEach(m -> wellKnownMetrics.populate(n, m)); } -} \ No newline at end of file +}