|
@@ -4,6 +4,7 @@ import static com.google.common.util.concurrent.Uninterruptibles.getUninterrupti
|
|
|
import static java.util.stream.Collectors.toList;
|
|
|
import static java.util.stream.Collectors.toMap;
|
|
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
import com.google.common.collect.Iterators;
|
|
|
import com.provectus.kafka.ui.exception.IllegalEntityStateException;
|
|
@@ -12,7 +13,6 @@ import com.provectus.kafka.ui.util.MapUtil;
|
|
|
import com.provectus.kafka.ui.util.NumberUtil;
|
|
|
import java.io.Closeable;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
@@ -52,13 +52,17 @@ import org.apache.kafka.common.KafkaFuture;
|
|
|
import org.apache.kafka.common.Node;
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
import org.apache.kafka.common.TopicPartitionReplica;
|
|
|
+import org.apache.kafka.common.acl.AclBinding;
|
|
|
+import org.apache.kafka.common.acl.AclBindingFilter;
|
|
|
import org.apache.kafka.common.acl.AclOperation;
|
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
|
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
|
|
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
|
|
import org.apache.kafka.common.errors.InvalidRequestException;
|
|
|
+import org.apache.kafka.common.errors.SecurityDisabledException;
|
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
|
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
import reactor.util.function.Tuple2;
|
|
@@ -69,24 +73,27 @@ import reactor.util.function.Tuples;
|
|
|
@RequiredArgsConstructor
|
|
|
public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
- private enum SupportedFeature {
|
|
|
+ public enum SupportedFeature {
|
|
|
INCREMENTAL_ALTER_CONFIGS(2.3f),
|
|
|
- CONFIG_DOCUMENTATION_RETRIEVAL(2.6f);
|
|
|
+ CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
|
|
|
+ AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
|
|
|
|
|
|
- private final float sinceVersion;
|
|
|
+ private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;
|
|
|
|
|
|
- SupportedFeature(float sinceVersion) {
|
|
|
- this.sinceVersion = sinceVersion;
|
|
|
+ SupportedFeature(BiFunction<AdminClient, Float, Mono<Boolean>> predicate) {
|
|
|
+ this.predicate = predicate;
|
|
|
}
|
|
|
|
|
|
- static Set<SupportedFeature> forVersion(float kafkaVersion) {
|
|
|
- return Arrays.stream(SupportedFeature.values())
|
|
|
- .filter(f -> kafkaVersion >= f.sinceVersion)
|
|
|
- .collect(Collectors.toSet());
|
|
|
+ SupportedFeature(float fromVersion) {
|
|
|
+ this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
|
|
|
}
|
|
|
|
|
|
- static Set<SupportedFeature> defaultFeatures() {
|
|
|
- return Set.of();
|
|
|
+ static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, @Nullable Float kafkaVersion) {
|
|
|
+ return Flux.fromArray(SupportedFeature.values())
|
|
|
+ .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
|
|
|
+ .filter(Tuple2::getT2)
|
|
|
+ .map(Tuple2::getT1)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -101,20 +108,28 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
|
|
|
return getClusterVersion(adminClient)
|
|
|
- .map(ver ->
|
|
|
- new ReactiveAdminClient(
|
|
|
- adminClient,
|
|
|
- ver,
|
|
|
- getSupportedUpdateFeaturesForVersion(ver)));
|
|
|
+ .flatMap(ver ->
|
|
|
+ getSupportedUpdateFeaturesForVersion(adminClient, ver)
|
|
|
+ .map(features ->
|
|
|
+ new ReactiveAdminClient(adminClient, ver, features)));
|
|
|
}
|
|
|
|
|
|
- private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
|
|
|
+ private static Mono<Set<SupportedFeature>> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) {
|
|
|
+ Float kafkaVersion = null;
|
|
|
try {
|
|
|
- float version = NumberUtil.parserClusterVersion(versionStr);
|
|
|
- return SupportedFeature.forVersion(version);
|
|
|
+ kafkaVersion = NumberUtil.parserClusterVersion(versionStr);
|
|
|
} catch (NumberFormatException e) {
|
|
|
- return SupportedFeature.defaultFeatures();
|
|
|
+ //Nothing to do here
|
|
|
}
|
|
|
+ return SupportedFeature.forVersion(ac, kafkaVersion);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
|
|
|
+ return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
|
|
|
+ .thenReturn(true)
|
|
|
+ .doOnError(th -> !(th instanceof SecurityDisabledException) && !(th instanceof InvalidRequestException),
|
|
|
+ th -> log.warn("Error checking if security enabled", th))
|
|
|
+ .onErrorReturn(false);
|
|
|
}
|
|
|
|
|
|
//TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here
|
|
@@ -146,6 +161,10 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
private final String version;
|
|
|
private final Set<SupportedFeature> features;
|
|
|
|
|
|
+ public Set<SupportedFeature> getClusterFeatures() {
|
|
|
+ return features;
|
|
|
+ }
|
|
|
+
|
|
|
public Mono<Set<String>> listTopics(boolean listInternal) {
|
|
|
return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
|
|
|
}
|
|
@@ -417,6 +436,22 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
.collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
|
|
|
}
|
|
|
|
|
|
+ public Mono<Collection<AclBinding>> listAcls() {
|
|
|
+ Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
|
|
+ return toMono(client.describeAcls(AclBindingFilter.ANY).values());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
|
|
|
+ Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
|
|
+ return toMono(client.createAcls(aclBindings).all());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
|
|
|
+ Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
|
|
+ var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
|
|
|
+ return toMono(client.deleteAcls(filters).all()).then();
|
|
|
+ }
|
|
|
+
|
|
|
private Mono<Set<TopicPartition>> topicPartitions(String topic) {
|
|
|
return toMono(client.describeTopics(List.of(topic)).all())
|
|
|
.map(r -> r.values().stream()
|