|
@@ -25,7 +25,8 @@ public class FeatureService {
|
|
|
|
|
|
private final AdminClientService adminClientService;
|
|
private final AdminClientService adminClientService;
|
|
|
|
|
|
- public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster, @Nullable Node controller) {
|
|
|
|
|
|
+ public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster,
|
|
|
|
+ ReactiveAdminClient.ClusterDescription clusterDescription) {
|
|
List<Mono<ClusterFeature>> features = new ArrayList<>();
|
|
List<Mono<ClusterFeature>> features = new ArrayList<>();
|
|
|
|
|
|
if (Optional.ofNullable(cluster.getConnectsClients())
|
|
if (Optional.ofNullable(cluster.getConnectsClients())
|
|
@@ -42,17 +43,15 @@ public class FeatureService {
|
|
features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY));
|
|
features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY));
|
|
}
|
|
}
|
|
|
|
|
|
- if (controller != null) {
|
|
|
|
- features.add(
|
|
|
|
- isTopicDeletionEnabled(cluster, controller)
|
|
|
|
- .flatMap(r -> Boolean.TRUE.equals(r) ? Mono.just(ClusterFeature.TOPIC_DELETION) : Mono.empty())
|
|
|
|
- );
|
|
|
|
- }
|
|
|
|
|
|
+ features.add(topicDeletionEnabled(cluster, clusterDescription.getController()));
|
|
|
|
|
|
return Flux.fromIterable(features).flatMap(m -> m).collectList();
|
|
return Flux.fromIterable(features).flatMap(m -> m).collectList();
|
|
}
|
|
}
|
|
|
|
|
|
- private Mono<Boolean> isTopicDeletionEnabled(KafkaCluster cluster, Node controller) {
|
|
|
|
|
|
+ private Mono<ClusterFeature> topicDeletionEnabled(KafkaCluster cluster, @Nullable Node controller) {
|
|
|
|
+ if (controller == null) {
|
|
|
|
+ return Mono.just(ClusterFeature.TOPIC_DELETION); // assuming it is enabled by default
|
|
|
|
+ }
|
|
return adminClientService.get(cluster)
|
|
return adminClientService.get(cluster)
|
|
.flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id())))
|
|
.flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id())))
|
|
.map(config ->
|
|
.map(config ->
|
|
@@ -61,6 +60,9 @@ public class FeatureService {
|
|
.filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY))
|
|
.filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY))
|
|
.map(e -> Boolean.parseBoolean(e.value()))
|
|
.map(e -> Boolean.parseBoolean(e.value()))
|
|
.findFirst()
|
|
.findFirst()
|
|
- .orElse(true));
|
|
|
|
|
|
+ .orElse(true))
|
|
|
|
+ .flatMap(enabled -> enabled
|
|
|
|
+ ? Mono.just(ClusterFeature.TOPIC_DELETION)
|
|
|
|
+ : Mono.empty());
|
|
}
|
|
}
|
|
}
|
|
}
|