diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ExtendedAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ExtendedAdminClient.java index 3c9ed6a3df..63507baafa 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ExtendedAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ExtendedAdminClient.java @@ -15,6 +15,7 @@ public class ExtendedAdminClient { private final Set supportedFeatures; public static Mono extendedAdminClient(AdminClient adminClient) { + return ClusterUtil.getSupportedFeatures(adminClient) .map(s -> new ExtendedAdminClient(adminClient, s)); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index fcc6fa1f99..cc0477f6bb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -11,6 +11,7 @@ import lombok.Data; @Builder(toBuilder = true) public class KafkaCluster { private final String name; + private final String version; private final Integer jmxPort; private final String bootstrapServers; private final String zookeeper; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index c480d043f7..f83be936b8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -104,13 +104,16 @@ public class KafkaService { public Mono getUpdatedCluster(KafkaCluster cluster) { return getOrCreateAdminClient(cluster) .flatMap( - ac -> getClusterMetrics(ac.getAdminClient()) - .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient())) - .flatMap(clusterMetrics -> - getTopicsData(ac.getAdminClient()).flatMap(it -> - updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it) - ).map(segmentSizeDto -> buildFromData(cluster, segmentSizeDto)) - ) + ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap( + version -> + getClusterMetrics(ac.getAdminClient()) + .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient())) + .flatMap(clusterMetrics -> + getTopicsData(ac.getAdminClient()).flatMap(it -> + updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it) + ).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto)) + ) + ) ).onErrorResume( e -> Mono.just(cluster.toBuilder() .status(ServerStatus.OFFLINE) @@ -120,6 +123,7 @@ public class KafkaService { } private KafkaCluster buildFromData(KafkaCluster currentCluster, + String version, InternalSegmentSizeDto segmentSizeDto) { var topics = segmentSizeDto.getInternalTopicWithSegmentSize(); @@ -152,6 +156,7 @@ public class KafkaService { .build(); return currentCluster.toBuilder() + .version(version) .status(ServerStatus.ONLINE) .zookeeperStatus(zookeeperStatus) .lastZookeeperException(zookeeperException) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index 920ff46a0c..affb2f49c2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -237,23 +237,12 @@ public class ClusterUtil { public static Mono> getSupportedFeatures( AdminClient adminClient) { - return ClusterUtil.toMono(adminClient.describeCluster().controller()) - .map(Node::id) - .map(id -> Collections - .singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString()))) - .map(brokerCR -> adminClient.describeConfigs(brokerCR).all()) - .flatMap(ClusterUtil::toMono) + return getClusterVersion(adminClient) .map(ClusterUtil::getSupportedUpdateFeature) .map(Collections::singleton); } - private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature( - Map configs) { - String version = configs.values().stream() - .map(Config::entries) - .flatMap(Collection::stream) - .filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY)) - .findFirst().orElseThrow().value(); + private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(String version) { try { final String[] parts = version.split("\\."); if (parts.length > 2) { @@ -268,6 +257,25 @@ public class ClusterUtil { } } + public static Mono getClusterVersion(AdminClient adminClient) { + return ClusterUtil.toMono(adminClient.describeCluster().controller()) + .map(Node::id) + .map(id -> Collections + .singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString()))) + .map(brokerCR -> adminClient.describeConfigs(brokerCR).all()) + .flatMap(ClusterUtil::toMono) + .map(ClusterUtil::getClusterVersion); + } + + public static String getClusterVersion(Map configs) { + return configs.values().stream() + .map(Config::entries) + .flatMap(Collection::stream) + .filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY)) + .findFirst().orElseThrow().value(); + } + + public static Map toSingleMap(Stream> streamOfMaps) { return streamOfMaps .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream()) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 3a7a6fcd50..04ad5bda55 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1281,6 +1281,8 @@ components: type: number readOnly: type: boolean + version: + type: string features: type: array items: