From 54a5398413c771c11f18111efeb9ea7cd36edea3 Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 21 Jun 2023 11:09:44 +0400 Subject: [PATCH] wip --- .../provectus/kafka/ui/model/Statistics.java | 7 ++- .../v2/scrape/ScrapedClusterState.java | 56 +++++++++++++++++++ .../metrics/v2/scrape/ScrapedMetrics.java | 9 ++- .../service/metrics/v2/scrape/Scrapping.java | 8 +++ .../v2/scrape/inferred/InferredMetrics.java | 15 +++-- .../inferred/InferredMetricsScraper.java | 34 +++++++++++ .../inferred/InferredMetricsScrapper.java | 22 -------- .../scrape/inferred/ScrapedClusterState.java | 19 ------- .../inferred/states/ConsumerGroupsState.java | 4 -- .../scrape/inferred/states/TopicsState.java | 4 -- 10 files changed, 118 insertions(+), 60 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScrapper.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/ScrapedClusterState.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/ConsumerGroupsState.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/TopicsState.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java index 7845149c07..e52e6642ce 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java @@ -1,7 +1,7 @@ package com.provectus.kafka.ui.model; import com.provectus.kafka.ui.service.ReactiveAdminClient; -import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.ScrapedClusterState; +import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState; import java.util.List; import java.util.Map; import java.util.Set; @@ -19,12 +19,13 @@ public class Statistics { List features; ReactiveAdminClient.ClusterDescription clusterDescription; Metrics metrics; + ScrapedClusterState clusterState; + + //TODO: to be removed -->> InternalLogDirStats logDirInfo; Map topicDescriptions; Map> topicConfigs; - ScrapedClusterState clusterState; - public static Statistics empty() { return builder() .status(ServerStatusDTO.OFFLINE) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java new file mode 100644 index 0000000000..ab8da8e1ff --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedClusterState.java @@ -0,0 +1,56 @@ +package com.provectus.kafka.ui.service.metrics.v2.scrape; + +import com.google.common.collect.Table; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import lombok.Value; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.TopicDescription; +import reactor.core.publisher.Mono; + +@Value +public class ScrapedClusterState { + + record NodeState(SegmentStats segmentStats) { + } + + record TopicState( + Instant scrapeTime, + String name, + List configs, + TopicDescription description, + Map offsets, + SegmentStats segmentStats, + Map partitionsSegmentStats) { + } + + record ConsumerGroupState( + Instant scrapeTime, + String group, + ConsumerGroupDescription description, + Table committedOffsets, + Map lastTopicActivity) { + } + + record SegmentStats(long segmentSize, + int segmentsCount) { + } + + Instant scrapeStartTime; + Map nodesStates; + Map topicStates; + Map consumerGroupsStates; + + public static ScrapedClusterState empty() { + //TODO impl + return null; + } + + public static Mono scrape(ReactiveAdminClient ac) { + return null;//TODO impl + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java index aba5c5d50e..9155ad4f6b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java @@ -3,14 +3,19 @@ package com.provectus.kafka.ui.service.metrics.v2.scrape; import io.prometheus.client.Collector.MetricFamilySamples; import java.util.Collection; +import java.util.List; import java.util.stream.Stream; public interface ScrapedMetrics { - Stream asStream(); - static ScrapedMetrics create(Collection lst) { return lst::stream; } + static ScrapedMetrics empty() { + return create(List.of()); + } + + Stream asStream(); + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java new file mode 100644 index 0000000000..a0048c626f --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scrapping.java @@ -0,0 +1,8 @@ +package com.provectus.kafka.ui.service.metrics.v2.scrape; + +public class Scrapping { + + + + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java index c332d7c9a9..2810d58ac9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java @@ -2,19 +2,22 @@ package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred; import static io.prometheus.client.Collector.*; +import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState; import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics; +import java.util.List; import java.util.stream.Stream; public class InferredMetrics implements ScrapedMetrics { - @Override - public Stream asStream() { - return null; + private final List metrics; + + public InferredMetrics(List metrics) { + this.metrics = metrics; } - public ScrapedClusterState clusterState() { - //todo: impl - return null; + @Override + public Stream asStream() { + return metrics.stream(); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java new file mode 100644 index 0000000000..fcd7d7d968 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScraper.java @@ -0,0 +1,34 @@ +package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred; + +import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedClusterState; +import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper; +import java.util.List; +import java.util.function.Supplier; +import lombok.RequiredArgsConstructor; +import reactor.core.publisher.Mono; + +@RequiredArgsConstructor +public class InferredMetricsScraper implements Scraper { + + private final Supplier currentStateSupplier; + private ScrapedClusterState prevState = null; + + @Override + public synchronized Mono scrape() { + if (prevState == null) { + prevState = currentStateSupplier.get(); + return Mono.empty(); + } + var newState = currentStateSupplier.get(); + var inferred = infer(prevState, newState); + prevState = newState; + return Mono.just(inferred); + } + + private static InferredMetrics infer(ScrapedClusterState prevState, + ScrapedClusterState newState) { + //TODO: impl + return new InferredMetrics(List.of()); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScrapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScrapper.java deleted file mode 100644 index 83669ba729..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScrapper.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred; - -import com.provectus.kafka.ui.service.ReactiveAdminClient; -import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper; -import reactor.core.publisher.Mono; - -public class InferredMetricsScrapper implements Scraper { - - private final ReactiveAdminClient adminClient; - - private volatile ScrapedClusterState clusterState; - - public InferredMetricsScrapper(ReactiveAdminClient adminClient) { - this.adminClient = adminClient; - } - - @Override - public Mono scrape() { - return null; - } - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/ScrapedClusterState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/ScrapedClusterState.java deleted file mode 100644 index e212bda99b..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/ScrapedClusterState.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred; - -import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states.ConsumerGroupsState; -import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states.TopicsState; -import java.time.Instant; -import lombok.Value; - -@Value -public class ScrapedClusterState { - - Instant scrapeStart; - TopicsState topicsState; - ConsumerGroupsState consumerGroupsState; - - public static ScrapedClusterState empty() { - return new ScrapedClusterState(null, null, null); - } - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/ConsumerGroupsState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/ConsumerGroupsState.java deleted file mode 100644 index f11c51ca65..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/ConsumerGroupsState.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states; - -public class ConsumerGroupsState { -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/TopicsState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/TopicsState.java deleted file mode 100644 index 1fcb220017..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/TopicsState.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states; - -public class TopicsState { -}