|
@@ -43,6 +43,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
|
|
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
|
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
|
import org.apache.kafka.clients.admin.ConsumerGroupListing;
|
|
import org.apache.kafka.clients.admin.ConsumerGroupListing;
|
|
import org.apache.kafka.clients.admin.DescribeClusterOptions;
|
|
import org.apache.kafka.clients.admin.DescribeClusterOptions;
|
|
|
|
+import org.apache.kafka.clients.admin.DescribeClusterResult;
|
|
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
|
|
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
|
|
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
|
|
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
|
|
import org.apache.kafka.clients.admin.ListOffsetsResult;
|
|
import org.apache.kafka.clients.admin.ListOffsetsResult;
|
|
@@ -80,7 +81,8 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
private enum SupportedFeature {
|
|
private enum SupportedFeature {
|
|
INCREMENTAL_ALTER_CONFIGS(2.3f),
|
|
INCREMENTAL_ALTER_CONFIGS(2.3f),
|
|
- CONFIG_DOCUMENTATION_RETRIEVAL(2.6f);
|
|
|
|
|
|
+ CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
|
|
|
|
+ DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f);
|
|
|
|
|
|
private final float sinceVersion;
|
|
private final float sinceVersion;
|
|
|
|
|
|
@@ -300,11 +302,14 @@ public class ReactiveAdminClient implements Closeable {
|
|
}
|
|
}
|
|
|
|
|
|
public Mono<ClusterDescription> describeCluster() {
|
|
public Mono<ClusterDescription> describeCluster() {
|
|
- return describeClusterImpl(client);
|
|
|
|
|
|
+ return describeClusterImpl(client, features);
|
|
}
|
|
}
|
|
|
|
|
|
- private static Mono<ClusterDescription> describeClusterImpl(AdminClient client) {
|
|
|
|
- var result = client.describeCluster(new DescribeClusterOptions().includeAuthorizedOperations(true));
|
|
|
|
|
|
+ private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
|
|
|
|
+ boolean includeAuthorizedOperations =
|
|
|
|
+ features.contains(SupportedFeature.DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS);
|
|
|
|
+ DescribeClusterResult result = client.describeCluster(
|
|
|
|
+ new DescribeClusterOptions().includeAuthorizedOperations(includeAuthorizedOperations));
|
|
var allOfFuture = KafkaFuture.allOf(
|
|
var allOfFuture = KafkaFuture.allOf(
|
|
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
|
|
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
|
|
return toMono(allOfFuture).then(
|
|
return toMono(allOfFuture).then(
|
|
@@ -320,7 +325,7 @@ public class ReactiveAdminClient implements Closeable {
|
|
}
|
|
}
|
|
|
|
|
|
private static Mono<String> getClusterVersion(AdminClient client) {
|
|
private static Mono<String> getClusterVersion(AdminClient client) {
|
|
- return describeClusterImpl(client)
|
|
|
|
|
|
+ return describeClusterImpl(client, Set.of())
|
|
// choosing node from which we will get configs (starting with controller)
|
|
// choosing node from which we will get configs (starting with controller)
|
|
.flatMap(descr -> descr.controller != null
|
|
.flatMap(descr -> descr.controller != null
|
|
? Mono.just(descr.controller)
|
|
? Mono.just(descr.controller)
|