diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java index db3e75f93d..fa9b0f1616 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java @@ -10,6 +10,7 @@ import java.util.List; import java.util.LongSummaryStatistics; import java.util.Map; import java.util.concurrent.atomic.LongAdder; +import lombok.RequiredArgsConstructor; import lombok.Value; import org.apache.kafka.clients.admin.LogDirDescription; import org.apache.kafka.common.TopicPartition; @@ -21,13 +22,13 @@ import reactor.util.function.Tuples; public class InternalLogDirStats { @Value + @RequiredArgsConstructor public static class SegmentStats { Long segmentSize; Integer segmentsCount; private SegmentStats(LongSummaryStatistics s) { - segmentSize = s.getSum(); - segmentsCount = (int) s.getCount(); + this(s.getSum(), (int) s.getCount()); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java index 015d58e446..ac5587193d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/ScrapedClusterState.java @@ -16,6 +16,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import lombok.Builder; +import lombok.RequiredArgsConstructor; import lombok.Value; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; @@ -27,6 +28,7 @@ import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Mono; @Builder(toBuilder = true) +@RequiredArgsConstructor @Value public class ScrapedClusterState { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraperTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraperTest.java new file mode 100644 index 0000000000..6c4abb6be4 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraperTest.java @@ -0,0 +1,119 @@ +package com.provectus.kafka.ui.service.metrics.scrape.inferred; + +import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.ConsumerGroupState; +import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.NodeState; +import static com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState.TopicState; +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.model.InternalLogDirStats; +import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.MemberAssignment; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +class InferredMetricsScraperTest { + + final InferredMetricsScraper scraper = new InferredMetricsScraper(); + + @Test + void allExpectedMetricsScraped() { + var segmentStats = new InternalLogDirStats.SegmentStats(1234L, 3); + var logDirStats = new InternalLogDirStats.LogDirSpaceStats(234L, 345L, Map.of(), Map.of()); + + Node node1 = new Node(1, "node1", 9092); + Node node2 = new Node(2, "node2", 9092); + + Mono scraped = scraper.scrape( + ScrapedClusterState.builder() + .scrapeFinishedAt(Instant.now()) + .nodesStates( + Map.of( + 1, new NodeState(1, node1, segmentStats, logDirStats), + 2, new NodeState(2, node2, segmentStats, logDirStats) + ) + ) + .topicStates( + Map.of( + "t1", + new TopicState( + "t1", + new TopicDescription( + "t1", + false, + List.of( + new TopicPartitionInfo(0, node1, List.of(node1, node2), List.of(node1, node2)), + new TopicPartitionInfo(1, node1, List.of(node1, node2), List.of(node1)) + ) + ), + List.of(), + Map.of(0, 100L, 1, 101L), + Map.of(0, 200L, 1, 201L), + segmentStats, + Map.of(0, segmentStats, 1, segmentStats) + ) + ) + ) + .consumerGroupsStates( + Map.of( + "cg1", + new ConsumerGroupState( + "cg1", + new ConsumerGroupDescription( + "cg1", + true, + List.of( + new MemberDescription( + "memb1", Optional.empty(), "client1", "hst1", + new MemberAssignment(Set.of(new TopicPartition("t1", 0))) + ) + ), + null, + org.apache.kafka.common.ConsumerGroupState.STABLE, + node1 + ), + Map.of(new TopicPartition("t1", 0), 150L) + ) + ) + ) + .build() + ); + + StepVerifier.create(scraped) + .assertNext(inferredMetrics -> + assertThat(inferredMetrics.asStream().map(m -> m.name)).containsExactlyInAnyOrder( + "broker_count", + "broker_bytes_disk", + "broker_bytes_usable", + "broker_bytes_total", + "topic_count", + "kafka_topic_partitions", + "kafka_topic_partition_current_offset", + "kafka_topic_partition_oldest_offset", + "kafka_topic_partition_in_sync_replica", + "kafka_topic_partition_replicas", + "kafka_topic_partition_leader", + "topic_bytes_disk", + "group_count", + "group_state", + "group_member_count", + "group_host_count", + "kafka_consumergroup_current_offset", + "kafka_consumergroup_lag" + ) + ) + .verifyComplete(); + } + +}