|
@@ -5,6 +5,7 @@ import static java.util.stream.Collectors.toMap;
|
|
|
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import com.google.common.base.Preconditions;
|
|
|
import com.google.common.collect.ImmutableTable;
|
|
|
import com.google.common.collect.Iterables;
|
|
|
import com.google.common.collect.Table;
|
|
@@ -15,7 +16,6 @@ import com.provectus.kafka.ui.util.KafkaVersion;
|
|
|
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
|
|
import java.io.Closeable;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -61,16 +61,22 @@ import org.apache.kafka.common.Node;
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
import org.apache.kafka.common.TopicPartitionInfo;
|
|
|
import org.apache.kafka.common.TopicPartitionReplica;
|
|
|
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
|
|
|
+import org.apache.kafka.common.acl.AclBinding;
|
|
|
+import org.apache.kafka.common.acl.AclBindingFilter;
|
|
|
import org.apache.kafka.common.acl.AclOperation;
|
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
|
import org.apache.kafka.common.errors.ClusterAuthorizationException;
|
|
|
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
|
|
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
|
|
import org.apache.kafka.common.errors.InvalidRequestException;
|
|
|
+import org.apache.kafka.common.errors.SecurityDisabledException;
|
|
|
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
|
|
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
|
|
+import org.apache.kafka.common.resource.ResourcePattern;
|
|
|
+import org.apache.kafka.common.resource.ResourcePatternFilter;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
@@ -82,25 +88,28 @@ import reactor.util.function.Tuples;
|
|
|
@RequiredArgsConstructor
|
|
|
public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
- private enum SupportedFeature {
|
|
|
+ public enum SupportedFeature {
|
|
|
INCREMENTAL_ALTER_CONFIGS(2.3f),
|
|
|
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
|
|
|
- DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f);
|
|
|
+ DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
|
|
|
+ AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
|
|
|
|
|
|
- private final float sinceVersion;
|
|
|
+ private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;
|
|
|
|
|
|
- SupportedFeature(float sinceVersion) {
|
|
|
- this.sinceVersion = sinceVersion;
|
|
|
+ SupportedFeature(BiFunction<AdminClient, Float, Mono<Boolean>> predicate) {
|
|
|
+ this.predicate = predicate;
|
|
|
}
|
|
|
|
|
|
- static Set<SupportedFeature> forVersion(float kafkaVersion) {
|
|
|
- return Arrays.stream(SupportedFeature.values())
|
|
|
- .filter(f -> kafkaVersion >= f.sinceVersion)
|
|
|
- .collect(Collectors.toSet());
|
|
|
+ SupportedFeature(float fromVersion) {
|
|
|
+ this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
|
|
|
}
|
|
|
|
|
|
- static Set<SupportedFeature> defaultFeatures() {
|
|
|
- return Set.of();
|
|
|
+ static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, @Nullable Float kafkaVersion) {
|
|
|
+ return Flux.fromArray(SupportedFeature.values())
|
|
|
+ .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
|
|
|
+ .filter(Tuple2::getT2)
|
|
|
+ .map(Tuple2::getT1)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -110,25 +119,31 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
Node controller;
|
|
|
String clusterId;
|
|
|
Collection<Node> nodes;
|
|
|
+ @Nullable // null, if ACL is disabled
|
|
|
Set<AclOperation> authorizedOperations;
|
|
|
}
|
|
|
|
|
|
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
|
|
|
return getClusterVersion(adminClient)
|
|
|
- .map(ver ->
|
|
|
- new ReactiveAdminClient(
|
|
|
- adminClient,
|
|
|
- ver,
|
|
|
- getSupportedUpdateFeaturesForVersion(ver)));
|
|
|
- }
|
|
|
-
|
|
|
- private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
|
|
|
- try {
|
|
|
- float version = KafkaVersion.parse(versionStr);
|
|
|
- return SupportedFeature.forVersion(version);
|
|
|
- } catch (NumberFormatException e) {
|
|
|
- return SupportedFeature.defaultFeatures();
|
|
|
- }
|
|
|
+ .flatMap(ver ->
|
|
|
+ getSupportedUpdateFeaturesForVersion(adminClient, ver)
|
|
|
+ .map(features ->
|
|
|
+ new ReactiveAdminClient(adminClient, ver, features)));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Mono<Set<SupportedFeature>> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) {
|
|
|
+ @Nullable Float kafkaVersion = KafkaVersion.parse(versionStr).orElse(null);
|
|
|
+ return SupportedFeature.forVersion(ac, kafkaVersion);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
|
|
|
+ return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
|
|
|
+ .thenReturn(true)
|
|
|
+ .doOnError(th -> !(th instanceof SecurityDisabledException)
|
|
|
+ && !(th instanceof InvalidRequestException)
|
|
|
+ && !(th instanceof UnsupportedVersionException),
|
|
|
+ th -> log.warn("Error checking if security enabled", th))
|
|
|
+ .onErrorReturn(false);
|
|
|
}
|
|
|
|
|
|
// NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
|
|
@@ -162,6 +177,10 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
private final String version;
|
|
|
private final Set<SupportedFeature> features;
|
|
|
|
|
|
+ public Set<SupportedFeature> getClusterFeatures() {
|
|
|
+ return features;
|
|
|
+ }
|
|
|
+
|
|
|
public Mono<Set<String>> listTopics(boolean listInternal) {
|
|
|
return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
|
|
|
}
|
|
@@ -576,6 +595,22 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
|
|
|
+ Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
|
|
+ return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
|
|
|
+ Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
|
|
+ return toMono(client.createAcls(aclBindings).all());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
|
|
|
+ Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
|
|
+ var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
|
|
|
+ return toMono(client.deleteAcls(filters).all()).then();
|
|
|
+ }
|
|
|
+
|
|
|
public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
|
|
|
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
|
|
|
AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
|