|
@@ -61,6 +61,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
|
|
import org.apache.kafka.common.errors.InvalidRequestException;
|
|
import org.apache.kafka.common.errors.InvalidRequestException;
|
|
import org.apache.kafka.common.errors.SecurityDisabledException;
|
|
import org.apache.kafka.common.errors.SecurityDisabledException;
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
|
|
+import org.apache.kafka.common.errors.UnsupportedVersionException;
|
|
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
|
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
@@ -127,7 +128,9 @@ public class ReactiveAdminClient implements Closeable {
|
|
private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
|
|
private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
|
|
return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
|
|
return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
|
|
.thenReturn(true)
|
|
.thenReturn(true)
|
|
- .doOnError(th -> !(th instanceof SecurityDisabledException) && !(th instanceof InvalidRequestException),
|
|
|
|
|
|
+ .doOnError(th -> !(th instanceof SecurityDisabledException)
|
|
|
|
+ && !(th instanceof InvalidRequestException)
|
|
|
|
+ && !(th instanceof UnsupportedVersionException),
|
|
th -> log.warn("Error checking if security enabled", th))
|
|
th -> log.warn("Error checking if security enabled", th))
|
|
.onErrorReturn(false);
|
|
.onErrorReturn(false);
|
|
}
|
|
}
|