diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index dc5dad08e2..4dbd0ae0f1 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -234,6 +234,17 @@
spring-security-ldap
+
+ io.prometheus
+ simpleclient
+ 0.16.0
+
+
+ io.prometheus
+ simpleclient_common
+ 0.16.0
+
+
org.codehaus.groovy
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
index e70547f143..7845149c07 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Statistics.java
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.model;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.ScrapedClusterState;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -22,6 +23,8 @@ public class Statistics {
Map topicDescriptions;
Map> topicConfigs;
+ ScrapedClusterState clusterState;
+
public static Statistics empty() {
return builder()
.status(ServerStatusDTO.OFFLINE)
@@ -33,6 +36,7 @@ public class Statistics {
.logDirInfo(InternalLogDirStats.empty())
.topicDescriptions(Map.of())
.topicConfigs(Map.of())
+ .clusterState(ScrapedClusterState.empty())
.build();
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
index 0b6f16a223..1d76080998 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
@@ -12,9 +12,12 @@ import com.google.common.collect.Table;
import com.provectus.kafka.ui.exception.IllegalEntityStateException;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.util.KafkaVersion;
+import com.provectus.kafka.ui.util.SslPropertiesUtil;
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
import java.io.Closeable;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -22,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -55,6 +59,8 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@@ -77,6 +83,8 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -178,18 +186,18 @@ public class ReactiveAdminClient implements Closeable {
// (see MonoSink.success(..) javadoc for details)
public static Mono toMono(KafkaFuture future) {
return Mono.create(sink -> future.whenComplete((res, ex) -> {
- if (ex != null) {
- // KafkaFuture doc is unclear about what exception wrapper will be used
- // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
- if (ex instanceof CompletionException || ex instanceof ExecutionException) {
- sink.error(ex.getCause()); //unwrapping exception
- } else {
- sink.error(ex);
- }
- } else {
- sink.success(res);
- }
- })).doOnCancel(() -> future.cancel(true))
+ if (ex != null) {
+ // KafkaFuture doc is unclear about what exception wrapper will be used
+ // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
+ if (ex instanceof CompletionException || ex instanceof ExecutionException) {
+ sink.error(ex.getCause()); //unwrapping exception
+ } else {
+ sink.error(ex);
+ }
+ } else {
+ sink.success(res);
+ }
+ })).doOnCancel(() -> future.cancel(true))
// AdminClient is using single thread for kafka communication
// and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
// If some of downstream operation are blocking (by mistake) this can lead to
@@ -401,12 +409,12 @@ public class ReactiveAdminClient implements Closeable {
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
return toMono(allOfFuture).then(
Mono.fromCallable(() ->
- new ClusterDescription(
- result.controller().get(),
- result.clusterId().get(),
- result.nodes().get(),
- result.authorizedOperations().get()
- )
+ new ClusterDescription(
+ result.controller().get(),
+ result.clusterId().get(),
+ result.nodes().get(),
+ result.authorizedOperations().get()
+ )
)
);
}
@@ -560,8 +568,8 @@ public class ReactiveAdminClient implements Closeable {
@VisibleForTesting
static Set filterPartitionsWithLeaderCheck(Collection topicDescriptions,
- Predicate partitionPredicate,
- boolean failOnUnknownLeader) {
+ Predicate partitionPredicate,
+ boolean failOnUnknownLeader) {
var goodPartitions = new HashSet();
for (TopicDescription description : topicDescriptions) {
var goodTopicPartitions = new ArrayList();
@@ -727,4 +735,26 @@ public class ReactiveAdminClient implements Closeable {
public void close() {
client.close();
}
+
+
+ public static void main(String[] args) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group_1");
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
+
+ try (var consumer = new KafkaConsumer(props)) {
+ consumer.subscribe(List.of("test"));
+ while (true) {
+ consumer.poll(Duration.ofMillis(500));
+ //consumer.commitSync();
+ }
+ }
+ }
+
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java
index 659212f23f..32fe2fa2a6 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/RawMetric.java
@@ -27,29 +27,9 @@ public interface RawMetric {
return new SimpleMetric(name, labels, value);
}
- @AllArgsConstructor
- @EqualsAndHashCode
- @ToString
- class SimpleMetric implements RawMetric {
-
- private final String name;
- private final Map labels;
- private final BigDecimal value;
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public Map labels() {
- return labels;
- }
-
- @Override
- public BigDecimal value() {
- return value;
- }
+ record SimpleMetric(String name,
+ Map labels,
+ BigDecimal value) implements RawMetric {
@Override
public RawMetric copyWithValue(BigDecimal newValue) {
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java
new file mode 100644
index 0000000000..aba5c5d50e
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/ScrapedMetrics.java
@@ -0,0 +1,16 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape;
+
+import io.prometheus.client.Collector.MetricFamilySamples;
+import java.util.Collection;
+
+import java.util.stream.Stream;
+
+public interface ScrapedMetrics {
+
+ Stream asStream();
+
+ static ScrapedMetrics create(Collection lst) {
+ return lst::stream;
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scraper.java
new file mode 100644
index 0000000000..4eba7ee93b
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/Scraper.java
@@ -0,0 +1,10 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape;
+
+
+import reactor.core.publisher.Mono;
+
+public interface Scraper {
+
+ Mono scrape();
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java
new file mode 100644
index 0000000000..c332d7c9a9
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetrics.java
@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
+
+import static io.prometheus.client.Collector.*;
+
+import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics;
+import java.util.stream.Stream;
+
+public class InferredMetrics implements ScrapedMetrics {
+
+ @Override
+ public Stream asStream() {
+ return null;
+ }
+
+ public ScrapedClusterState clusterState() {
+ //todo: impl
+ return null;
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScrapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScrapper.java
new file mode 100644
index 0000000000..83669ba729
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/InferredMetricsScrapper.java
@@ -0,0 +1,22 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
+
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper;
+import reactor.core.publisher.Mono;
+
+public class InferredMetricsScrapper implements Scraper {
+
+ private final ReactiveAdminClient adminClient;
+
+ private volatile ScrapedClusterState clusterState;
+
+ public InferredMetricsScrapper(ReactiveAdminClient adminClient) {
+ this.adminClient = adminClient;
+ }
+
+ @Override
+ public Mono scrape() {
+ return null;
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/ScrapedClusterState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/ScrapedClusterState.java
new file mode 100644
index 0000000000..e212bda99b
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/ScrapedClusterState.java
@@ -0,0 +1,19 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred;
+
+import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states.ConsumerGroupsState;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states.TopicsState;
+import java.time.Instant;
+import lombok.Value;
+
+@Value
+public class ScrapedClusterState {
+
+ Instant scrapeStart;
+ TopicsState topicsState;
+ ConsumerGroupsState consumerGroupsState;
+
+ public static ScrapedClusterState empty() {
+ return new ScrapedClusterState(null, null, null);
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/ConsumerGroupsState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/ConsumerGroupsState.java
new file mode 100644
index 0000000000..f11c51ca65
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/ConsumerGroupsState.java
@@ -0,0 +1,4 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states;
+
+public class ConsumerGroupsState {
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/TopicsState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/TopicsState.java
new file mode 100644
index 0000000000..1fcb220017
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/inferred/states/TopicsState.java
@@ -0,0 +1,4 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape.inferred.states;
+
+public class TopicsState {
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java
new file mode 100644
index 0000000000..cdec123729
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/jmx/JmxMetricsScraper.java
@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape.jmx;
+
+import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper;
+import reactor.core.publisher.Mono;
+
+public class JmxMetricsScraper implements Scraper {
+
+ @Override
+ public Mono scrape() {
+ return null;
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prom/PrometheusScraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prom/PrometheusScraper.java
new file mode 100644
index 0000000000..de71effc35
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/v2/scrape/prom/PrometheusScraper.java
@@ -0,0 +1,13 @@
+package com.provectus.kafka.ui.service.metrics.v2.scrape.prom;
+
+import com.provectus.kafka.ui.service.metrics.v2.scrape.ScrapedMetrics;
+import com.provectus.kafka.ui.service.metrics.v2.scrape.Scraper;
+import reactor.core.publisher.Mono;
+
+public class PrometheusScraper implements Scraper {
+
+ @Override
+ public Mono scrape() {
+ return null;
+ }
+}
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
index 1938f93044..af7804befe 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
@@ -32,7 +32,7 @@ public abstract class AbstractIntegrationTest {
public static final String LOCAL = "local";
public static final String SECOND_LOCAL = "secondLocal";
- private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0";
+ private static final String CONFLUENT_PLATFORM_VERSION = "7.2.1";
public static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION))
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java
index cde000ac6e..f74eccea53 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java
@@ -12,6 +12,7 @@ import com.provectus.kafka.ui.container.KsqlDbContainer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import org.junit.Ignore;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -19,6 +20,7 @@ import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import reactor.test.StepVerifier;
+@Ignore
class KsqlApiClientTest extends AbstractIntegrationTest {
private static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java
index 294215c8b1..e2f02f41f4 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/metrics/PrometheusEndpointMetricsParserTest.java
@@ -27,4 +27,4 @@ class PrometheusEndpointMetricsParserTest {
});
}
-}
\ No newline at end of file
+}