From 725c8239474ada191d55d9187c3edb7182c5ffee Mon Sep 17 00:00:00 2001 From: iliax Date: Mon, 24 Jul 2023 13:33:19 +0400 Subject: [PATCH] wip --- .../kafka/ui/controller/GraphsController.java | 7 +- .../kafka/ui/mapper/ClusterMapper.java | 12 +-- .../com/provectus/kafka/ui/model/Metrics.java | 59 --------------- .../ui/model/MetricsScrapeProperties.java | 2 +- .../ui/service/graphs/GraphDescription.java | 8 ++ .../ui/service/graphs/GraphDescriptions.java | 24 +++--- .../ui/service/metrics/SummarizedMetrics.java | 73 +++++++++++++++++++ .../metrics/prometheus/PrometheusExpose.java | 31 +++++--- .../ui/service/metrics/sink/KafkaSink.java | 18 +++-- .../ui/service/metrics/sink/MetricsSink.java | 27 ++++--- .../prometheus/PrometheusExposeTest.java | 53 ++++++++++++++ 11 files changed, 199 insertions(+), 115 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/SummarizedMetrics.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExposeTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java index 9c014b30ab..8a0bf24635 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/GraphsController.java @@ -7,14 +7,13 @@ import com.provectus.kafka.ui.model.GraphDescriptionsDTO; import com.provectus.kafka.ui.model.GraphParameterDTO; import com.provectus.kafka.ui.model.PrometheusApiQueryResponseDTO; import com.provectus.kafka.ui.model.rbac.AccessContext; +import com.provectus.kafka.ui.service.audit.AuditService; import com.provectus.kafka.ui.service.graphs.GraphDescription; import com.provectus.kafka.ui.service.graphs.GraphsService; -import com.provectus.kafka.ui.service.audit.AuditService; import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.time.Duration; import java.time.OffsetDateTime; import java.util.Optional; -import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; @@ -41,7 +40,7 @@ public class GraphsController extends AbstractController implements GraphsApi { @Override public Mono> getGraphData(String clusterName, - Mono graphDataRequestDTO, + Mono graphDataRequestDto, ServerWebExchange exchange) { var context = AccessContext.builder() .cluster(clusterName) @@ -50,7 +49,7 @@ public class GraphsController extends AbstractController implements GraphsApi { return accessControlService.validateAccess(context) .then( - graphDataRequestDTO.flatMap(req -> + graphDataRequestDto.flatMap(req -> graphsService.getGraphData( getCluster(clusterName), req.getId(), diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index e4e15d7136..7b337f20da 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -32,11 +32,11 @@ import com.provectus.kafka.ui.model.ReplicaDTO; import com.provectus.kafka.ui.model.TopicConfigDTO; import com.provectus.kafka.ui.model.TopicDTO; import com.provectus.kafka.ui.model.TopicDetailsDTO; +import com.provectus.kafka.ui.service.metrics.SummarizedMetrics; import java.math.BigDecimal; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.kafka.clients.admin.ConfigEntry; @@ -60,11 +60,11 @@ public interface ClusterMapper { @Deprecated default ClusterMetricsDTO toClusterMetrics(Metrics metrics) { return new ClusterMetricsDTO() - .items(convert(metrics.getSummarizedMetrics().toList())); + .items(convert(new SummarizedMetrics(metrics).asStream()).toList()); } - private List convert(List metrics) { - return metrics.stream() + private Stream convert(Stream metrics) { + return metrics .flatMap(m -> m.samples.stream()) .map(s -> new MetricDTO() @@ -74,11 +74,11 @@ public interface ClusterMapper { //collecting to map, keeping order .collect(toMap(s.labelNames::get, s.labelValues::get, (m1, m2) -> null, LinkedHashMap::new))) .value(BigDecimal.valueOf(s.value)) - ).toList(); + ); } default BrokerMetricsDTO toBrokerMetrics(List metrics) { - return new BrokerMetricsDTO().metrics(convert(metrics)); + return new BrokerMetricsDTO().metrics(convert(metrics.stream()).toList()); } @Mapping(target = "isSensitive", source = "sensitive") diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java index 07ecf0a7ab..c0f9737da0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java @@ -1,18 +1,11 @@ package com.provectus.kafka.ui.model; import static io.prometheus.client.Collector.MetricFamilySamples; -import static java.util.stream.Collectors.toMap; -import com.google.common.collect.Streams; import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetrics; -import groovy.lang.Tuple; import java.math.BigDecimal; -import java.util.Collection; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.stream.Stream; import lombok.Builder; import lombok.Value; @@ -49,56 +42,4 @@ public class Metrics { } } - public Stream getSummarizedMetrics() { - return Streams.concat( - inferredMetrics.asStream(), - perBrokerScrapedMetrics - .values() - .stream() - .flatMap(Collection::stream) - .collect(toMap(mfs -> mfs.name, Optional::of, Metrics::summarizeMfs, LinkedHashMap::new)) - .values() - .stream() - .filter(Optional::isPresent) - .map(Optional::get) - ); - } - - //returns Optional.empty if merging not supported for metric type - private static Optional summarizeMfs(Optional mfs1opt, - Optional mfs2opt) { - if ((mfs1opt.isEmpty() || mfs2opt.isEmpty()) || (mfs1opt.get().type != mfs2opt.get().type)) { - return Optional.empty(); - } - var mfs1 = mfs1opt.get(); - return switch (mfs1.type) { - case GAUGE, COUNTER -> Optional.of( - new MetricFamilySamples( - mfs1.name, - mfs1.type, - mfs1.help, - Stream.concat(mfs1.samples.stream(), mfs2opt.get().samples.stream()) - .collect( - toMap( - // merging samples with same labels - s -> Tuple.tuple(s.name, s.labelNames, s.labelValues), - s -> s, - (s1, s2) -> new MetricFamilySamples.Sample( - s1.name, - s1.labelNames, - s1.labelValues, - s1.value + s2.value - ), - LinkedHashMap::new - ) - ) - .values() - .stream() - .toList() - ) - ); - default -> Optional.empty(); - }; - } - } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java index ec40bcf1f3..2e64b0253d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/MetricsScrapeProperties.java @@ -27,7 +27,7 @@ public class MetricsScrapeProperties { @Nullable TruststoreConfig truststoreConfig; - public static MetricsScrapeProperties create(ClustersProperties.Cluster cluster){ + public static MetricsScrapeProperties create(ClustersProperties.Cluster cluster) { var metrics = Objects.requireNonNull(cluster.getMetrics()); return MetricsScrapeProperties.builder() .port(metrics.getPort()) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescription.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescription.java index 21555ea7eb..497f84b576 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescription.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescription.java @@ -11,6 +11,14 @@ public record GraphDescription(String id, String prometheusQuery, Set params) { + public static GraphDescriptionBuilder instant() { + return builder(); + } + + public static GraphDescriptionBuilder range(Duration defaultInterval) { + return builder().defaultInterval(defaultInterval); + } + public boolean isRange() { return defaultInterval != null; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescriptions.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescriptions.java index 2a061882ba..a6045e3585 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescriptions.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/graphs/GraphDescriptions.java @@ -1,5 +1,7 @@ package com.provectus.kafka.ui.service.graphs; +import static java.util.stream.Collectors.toMap; + import com.provectus.kafka.ui.exception.ValidationException; import java.time.Duration; import java.util.HashMap; @@ -7,7 +9,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.springframework.stereotype.Component; @@ -19,9 +20,8 @@ class GraphDescriptions { private final Map graphsById; GraphDescriptions() { - validateGraphDescr(PREDEFINED_GRAPHS); - this.graphsById = PREDEFINED_GRAPHS.stream() - .collect(Collectors.toMap(GraphDescription::id, d -> d)); + validate(); + this.graphsById = PREDEFINED_GRAPHS.stream().collect(toMap(GraphDescription::id, d -> d)); } Optional getById(String id) { @@ -32,9 +32,9 @@ class GraphDescriptions { return graphsById.values().stream(); } - private void validateGraphDescr(List descriptions) { + private void validate() { Map errors = new HashMap<>(); - for (GraphDescription description : descriptions) { + for (GraphDescription description : PREDEFINED_GRAPHS) { new PromQueryTemplate(description) .validateSyntax() .ifPresent(err -> errors.put(description.id(), err)); @@ -46,33 +46,29 @@ class GraphDescriptions { private static final List PREDEFINED_GRAPHS = List.of( - GraphDescription.builder() + GraphDescription.range(DEFAULT_RANGE_DURATION) .id("broker_bytes_disk_ts") - .defaultInterval(DEFAULT_RANGE_DURATION) .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}") .params(Set.of()) .build(), - GraphDescription.builder() + GraphDescription.instant() .id("broker_bytes_disk") .prometheusQuery("broker_bytes_disk{cluster=\"${cluster}\"}") .params(Set.of()) .build(), - GraphDescription.builder() + GraphDescription.instant() .id("kafka_topic_partition_current_offset") .prometheusQuery("kafka_topic_partition_current_offset{cluster=\"${cluster}\"}") .params(Set.of()) .build(), - GraphDescription.builder() + GraphDescription.range(DEFAULT_RANGE_DURATION) .id("kafka_topic_partition_current_offset_per_topic_ts") - .defaultInterval(DEFAULT_RANGE_DURATION) .prometheusQuery("kafka_topic_partition_current_offset{cluster=\"${cluster}\",topic = \"${topic}\"}") .params(Set.of("topic")) .build() - - //TODO: add ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/SummarizedMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/SummarizedMetrics.java new file mode 100644 index 0000000000..47bc65beeb --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/SummarizedMetrics.java @@ -0,0 +1,73 @@ +package com.provectus.kafka.ui.service.metrics; + +import static java.util.stream.Collectors.toMap; + +import com.google.common.collect.Streams; +import com.provectus.kafka.ui.model.Metrics; +import groovy.lang.Tuple; +import io.prometheus.client.Collector.MetricFamilySamples; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Optional; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class SummarizedMetrics { + + private final Metrics metrics; + + public Stream asStream() { + return Streams.concat( + metrics.getInferredMetrics().asStream(), + metrics.getPerBrokerScrapedMetrics() + .values() + .stream() + .flatMap(Collection::stream) + .collect(toMap(mfs -> mfs.name, Optional::of, SummarizedMetrics::summarizeMfs, LinkedHashMap::new)) + .values() + .stream() + .filter(Optional::isPresent) + .map(Optional::get) + ); + } + + //returns Optional.empty if merging not supported for metric type + private static Optional summarizeMfs(Optional mfs1opt, + Optional mfs2opt) { + if ((mfs1opt.isEmpty() || mfs2opt.isEmpty()) || (mfs1opt.get().type != mfs2opt.get().type)) { + return Optional.empty(); + } + var mfs1 = mfs1opt.get(); + return switch (mfs1.type) { + case GAUGE, COUNTER -> Optional.of( + new MetricFamilySamples( + mfs1.name, + mfs1.type, + mfs1.help, + Stream.concat(mfs1.samples.stream(), mfs2opt.get().samples.stream()) + .collect( + toMap( + // merging samples with same labels + s -> Tuple.tuple(s.name, s.labelNames, s.labelValues), + s -> s, + (s1, s2) -> new MetricFamilySamples.Sample( + s1.name, + s1.labelNames, + s1.labelValues, + s1.value + s2.value + ), + LinkedHashMap::new + ) + ) + .values() + .stream() + .toList() + ) + ); + default -> Optional.empty(); + }; + } + + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExpose.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExpose.java index 6c4b4be8e8..2a1464a1bf 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExpose.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExpose.java @@ -22,6 +22,7 @@ import org.springframework.http.ResponseEntity; public final class PrometheusExpose { private static final String CLUSTER_EXPOSE_LBL_NAME = "cluster"; + private static final String BROKER_EXPOSE_LBL_NAME = "broker_id"; private static final HttpHeaders PROMETHEUS_EXPOSE_ENDPOINT_HEADERS; @@ -49,9 +50,19 @@ public final class PrometheusExpose { } public static Stream prepareMetricsForGlobalExpose(String clusterName, Metrics metrics) { - return metrics - .getSummarizedMetrics() - .map(mfs -> addLbl(mfs, CLUSTER_EXPOSE_LBL_NAME, clusterName)); + return Stream.concat( + metrics.getInferredMetrics().asStream(), + extractBrokerMetricsWithLabel(metrics) + ) + .map(mfs -> appendLabel(mfs, CLUSTER_EXPOSE_LBL_NAME, clusterName)); + } + + private static Stream extractBrokerMetricsWithLabel(Metrics metrics) { + return metrics.getPerBrokerScrapedMetrics().entrySet().stream() + .flatMap(e -> { + String brokerId = String.valueOf(e.getKey()); + return e.getValue().stream().map(mfs -> appendLabel(mfs, BROKER_EXPOSE_LBL_NAME, brokerId)); + }); } private static MetricFamilySamples concatSamples(MetricFamilySamples mfs1, @@ -62,7 +73,7 @@ public final class PrometheusExpose { ); } - private static MetricFamilySamples addLbl(MetricFamilySamples mfs, String lblName, String lblVal) { + private static MetricFamilySamples appendLabel(MetricFamilySamples mfs, String lblName, String lblVal) { return new MetricFamilySamples( mfs.name, mfs.unit, mfs.type, mfs.help, mfs.samples.stream() @@ -96,17 +107,17 @@ public final class PrometheusExpose { // copied from io.prometheus.client.exporter.common.TextFormat:writeEscapedLabelValue public static String escapedLabelValue(String s) { - StringWriter writer = new StringWriter(s.length()); + StringBuilder sb = new StringBuilder(s.length()); for (int i = 0; i < s.length(); i++) { char c = s.charAt(i); switch (c) { - case '\\' -> writer.append("\\\\"); - case '\"' -> writer.append("\\\""); - case '\n' -> writer.append("\\n"); - default -> writer.append(c); + case '\\' -> sb.append("\\\\"); + case '\"' -> sb.append("\\\""); + case '\n' -> sb.append("\\n"); + default -> sb.append(c); } } - return writer.toString(); + return sb.toString(); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java index cfce75550c..1002625916 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java @@ -1,11 +1,13 @@ package com.provectus.kafka.ui.service.metrics.sink; +import static com.google.common.base.Charsets.UTF_8; import static com.provectus.kafka.ui.service.MessagesService.createProducer; import static com.provectus.kafka.ui.service.metrics.prometheus.PrometheusExpose.escapedLabelValue; -import static io.prometheus.client.Collector.*; +import static io.prometheus.client.Collector.MetricFamilySamples; +import static io.prometheus.client.Collector.doubleToGoString; +import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; import com.fasterxml.jackson.databind.json.JsonMapper; -import com.google.common.base.Charsets; import com.provectus.kafka.ui.config.ClustersProperties; import java.time.Instant; import java.time.ZoneOffset; @@ -21,7 +23,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import reactor.core.publisher.Mono; /* - * Format and implementation are the same as in https://github.com/Telefonica/prometheus-kafka-adapter + * Format of records copied from https://github.com/Telefonica/prometheus-kafka-adapter */ @RequiredArgsConstructor class KafkaSink implements MetricsSink { @@ -30,11 +32,13 @@ class KafkaSink implements MetricsSink { private static final JsonMapper JSON_MAPPER = new JsonMapper(); + private static final Map PRODUCER_ADDITIONAL_CONFIGS = Map.of(COMPRESSION_TYPE_CONFIG, "gzip"); + private final String topic; private final Producer producer; static KafkaSink create(ClustersProperties.Cluster cluster, String targetTopic) { - return new KafkaSink(targetTopic, createProducer(cluster, Map.of())); + return new KafkaSink(targetTopic, createProducer(cluster, PRODUCER_ADDITIONAL_CONFIGS)); } @Override @@ -58,13 +62,13 @@ class KafkaSink implements MetricsSink { lbls.put(sample.labelNames.get(i), escapedLabelValue(sample.labelValues.get(i))); } var km = new KafkaMetric(ts, doubleToGoString(sample.value), sample.name, lbls); - return new ProducerRecord<>(topic, toJson(km)); + return new ProducerRecord<>(topic, toJsonBytes(km)); }); } @SneakyThrows - private static byte[] toJson(KafkaMetric m) { - return JSON_MAPPER.writeValueAsString(m).getBytes(Charsets.UTF_8); + private static byte[] toJsonBytes(KafkaMetric m) { + return JSON_MAPPER.writeValueAsString(m).getBytes(UTF_8); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java index 04f8a56084..50bbfb245c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java @@ -19,20 +19,19 @@ public interface MetricsSink { .flatMap(metrics -> Optional.ofNullable(metrics.getStore())) .flatMap(store -> Optional.ofNullable(store.getPrometheus())) .ifPresent(prometheusConf -> { - if (hasText(prometheusConf.getUrl()) && Boolean.TRUE.equals(prometheusConf.getRemoteWrite())) { - sinks.add(new PrometheusRemoteWriteSink(prometheusConf.getUrl())); - } - if (hasText(prometheusConf.getPushGatewayUrl())) { - sinks.add( - PrometheusPushGatewaySink.create( - prometheusConf.getPushGatewayUrl(), - prometheusConf.getPushGatewayJobName(), - prometheusConf.getPushGatewayUsername(), - prometheusConf.getPushGatewayPassword() - )); - } - } - ); + if (hasText(prometheusConf.getUrl()) && Boolean.TRUE.equals(prometheusConf.getRemoteWrite())) { + sinks.add(new PrometheusRemoteWriteSink(prometheusConf.getUrl())); + } + if (hasText(prometheusConf.getPushGatewayUrl())) { + sinks.add( + PrometheusPushGatewaySink.create( + prometheusConf.getPushGatewayUrl(), + prometheusConf.getPushGatewayJobName(), + prometheusConf.getPushGatewayUsername(), + prometheusConf.getPushGatewayPassword() + )); + } + }); Optional.ofNullable(cluster.getMetrics()) .flatMap(metrics -> Optional.ofNullable(metrics.getStore())) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExposeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExposeTest.java new file mode 100644 index 0000000000..d4403872fb --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/prometheus/PrometheusExposeTest.java @@ -0,0 +1,53 @@ +package com.provectus.kafka.ui.service.metrics.prometheus; + +import static com.provectus.kafka.ui.service.metrics.prometheus.PrometheusExpose.prepareMetricsForGlobalExpose; +import static io.prometheus.client.Collector.Type.GAUGE; +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.model.Metrics; +import com.provectus.kafka.ui.service.metrics.scrape.inferred.InferredMetrics; +import io.prometheus.client.Collector.MetricFamilySamples; +import io.prometheus.client.Collector.MetricFamilySamples.Sample; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class PrometheusExposeTest { + + @Test + void prepareMetricsForGlobalExposeAppendsClusterAndBrokerIdLabelsToMetrics() { + + var inferredMfs = new MetricFamilySamples("infer", GAUGE, "help", List.of( + new Sample("infer1", List.of("lbl1"), List.of("lblVal1"), 100))); + + var broker1Mfs = new MetricFamilySamples("brok", GAUGE, "help", List.of( + new Sample("brok", List.of("broklbl1"), List.of("broklblVal1"), 101))); + + var broker2Mfs = new MetricFamilySamples("brok", GAUGE, "help", List.of( + new Sample("brok", List.of("broklbl1"), List.of("broklblVal1"), 102))); + + List prepared = prepareMetricsForGlobalExpose( + "testCluster", + Metrics.builder() + .inferredMetrics(new InferredMetrics(List.of(inferredMfs))) + .perBrokerScrapedMetrics(Map.of(1, List.of(broker1Mfs), 2, List.of(broker2Mfs))) + .build() + ).toList(); + + assertThat(prepared) + .hasSize(3) + .contains(new MetricFamilySamples("infer", GAUGE, "help", List.of( + new Sample("infer1", List.of("cluster", "lbl1"), List.of("testCluster", "lblVal1"), 100)))) + .contains( + new MetricFamilySamples("brok", GAUGE, "help", List.of( + new Sample("brok", List.of("cluster", "broker_id", "broklbl1"), + List.of("testCluster", "1", "broklblVal1"), 101))) + ) + .contains( + new MetricFamilySamples("brok", GAUGE, "help", List.of( + new Sample("brok", List.of("cluster", "broker_id", "broklbl1"), + List.of("testCluster", "2", "broklblVal1"), 102))) + ); + } + +}