|
@@ -4,6 +4,8 @@ import static java.util.stream.Collectors.toList;
|
|
import static java.util.stream.Collectors.toMap;
|
|
import static java.util.stream.Collectors.toMap;
|
|
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
|
|
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
|
|
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+import com.google.common.base.Preconditions;
|
|
import com.google.common.collect.ImmutableTable;
|
|
import com.google.common.collect.ImmutableTable;
|
|
import com.google.common.collect.Iterables;
|
|
import com.google.common.collect.Iterables;
|
|
import com.google.common.collect.Table;
|
|
import com.google.common.collect.Table;
|
|
@@ -14,7 +16,6 @@ import com.provectus.kafka.ui.util.KafkaVersion;
|
|
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
|
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
|
import java.io.Closeable;
|
|
import java.io.Closeable;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
-import java.util.Arrays;
|
|
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
@@ -31,8 +32,9 @@ import java.util.stream.Collectors;
|
|
import java.util.stream.Stream;
|
|
import java.util.stream.Stream;
|
|
import javax.annotation.Nullable;
|
|
import javax.annotation.Nullable;
|
|
import lombok.AccessLevel;
|
|
import lombok.AccessLevel;
|
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
|
+import lombok.Builder;
|
|
import lombok.Getter;
|
|
import lombok.Getter;
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
|
import lombok.Value;
|
|
import lombok.Value;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.kafka.clients.admin.AdminClient;
|
|
import org.apache.kafka.clients.admin.AdminClient;
|
|
@@ -60,16 +62,21 @@ import org.apache.kafka.common.Node;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.TopicPartitionInfo;
|
|
import org.apache.kafka.common.TopicPartitionInfo;
|
|
import org.apache.kafka.common.TopicPartitionReplica;
|
|
import org.apache.kafka.common.TopicPartitionReplica;
|
|
|
|
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
|
|
|
|
+import org.apache.kafka.common.acl.AclBinding;
|
|
|
|
+import org.apache.kafka.common.acl.AclBindingFilter;
|
|
import org.apache.kafka.common.acl.AclOperation;
|
|
import org.apache.kafka.common.acl.AclOperation;
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
import org.apache.kafka.common.errors.ClusterAuthorizationException;
|
|
import org.apache.kafka.common.errors.ClusterAuthorizationException;
|
|
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
|
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
|
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
|
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
|
import org.apache.kafka.common.errors.InvalidRequestException;
|
|
import org.apache.kafka.common.errors.InvalidRequestException;
|
|
|
|
+import org.apache.kafka.common.errors.SecurityDisabledException;
|
|
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
|
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
|
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
|
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
|
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
|
|
|
+import org.apache.kafka.common.resource.ResourcePatternFilter;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.scheduler.Schedulers;
|
|
import reactor.core.scheduler.Schedulers;
|
|
@@ -78,28 +85,32 @@ import reactor.util.function.Tuples;
|
|
|
|
|
|
|
|
|
|
@Slf4j
|
|
@Slf4j
|
|
-@RequiredArgsConstructor
|
|
|
|
|
|
+@AllArgsConstructor
|
|
public class ReactiveAdminClient implements Closeable {
|
|
public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
- private enum SupportedFeature {
|
|
|
|
|
|
+ public enum SupportedFeature {
|
|
INCREMENTAL_ALTER_CONFIGS(2.3f),
|
|
INCREMENTAL_ALTER_CONFIGS(2.3f),
|
|
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
|
|
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
|
|
- DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f);
|
|
|
|
|
|
+ DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
|
|
|
|
+ AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
|
|
|
|
|
|
- private final float sinceVersion;
|
|
|
|
|
|
+ private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;
|
|
|
|
|
|
- SupportedFeature(float sinceVersion) {
|
|
|
|
- this.sinceVersion = sinceVersion;
|
|
|
|
|
|
+ SupportedFeature(BiFunction<AdminClient, Float, Mono<Boolean>> predicate) {
|
|
|
|
+ this.predicate = predicate;
|
|
}
|
|
}
|
|
|
|
|
|
- static Set<SupportedFeature> forVersion(float kafkaVersion) {
|
|
|
|
- return Arrays.stream(SupportedFeature.values())
|
|
|
|
- .filter(f -> kafkaVersion >= f.sinceVersion)
|
|
|
|
- .collect(Collectors.toSet());
|
|
|
|
|
|
+ SupportedFeature(float fromVersion) {
|
|
|
|
+ this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
|
|
}
|
|
}
|
|
|
|
|
|
- static Set<SupportedFeature> defaultFeatures() {
|
|
|
|
- return Set.of();
|
|
|
|
|
|
+ static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersionStr) {
|
|
|
|
+ @Nullable Float kafkaVersion = KafkaVersion.parse(kafkaVersionStr).orElse(null);
|
|
|
|
+ return Flux.fromArray(SupportedFeature.values())
|
|
|
|
+ .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
|
|
|
|
+ .filter(Tuple2::getT2)
|
|
|
|
+ .map(Tuple2::getT1)
|
|
|
|
+ .collect(Collectors.toSet());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -109,27 +120,60 @@ public class ReactiveAdminClient implements Closeable {
|
|
Node controller;
|
|
Node controller;
|
|
String clusterId;
|
|
String clusterId;
|
|
Collection<Node> nodes;
|
|
Collection<Node> nodes;
|
|
|
|
+ @Nullable // null, if ACL is disabled
|
|
Set<AclOperation> authorizedOperations;
|
|
Set<AclOperation> authorizedOperations;
|
|
}
|
|
}
|
|
|
|
|
|
- public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
|
|
|
|
- return getClusterVersion(adminClient)
|
|
|
|
- .map(ver ->
|
|
|
|
- new ReactiveAdminClient(
|
|
|
|
- adminClient,
|
|
|
|
- ver,
|
|
|
|
- getSupportedUpdateFeaturesForVersion(ver)));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
|
|
|
|
- try {
|
|
|
|
- float version = KafkaVersion.parse(versionStr);
|
|
|
|
- return SupportedFeature.forVersion(version);
|
|
|
|
- } catch (NumberFormatException e) {
|
|
|
|
- return SupportedFeature.defaultFeatures();
|
|
|
|
|
|
+ @Builder
|
|
|
|
+ private record ConfigRelatedInfo(String version,
|
|
|
|
+ Set<SupportedFeature> features,
|
|
|
|
+ boolean topicDeletionIsAllowed) {
|
|
|
|
+
|
|
|
|
+ private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
|
|
|
|
+ return loadBrokersConfig(ac, List.of(controllerId))
|
|
|
|
+ .map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
|
|
|
|
+ .flatMap(configs -> {
|
|
|
|
+ String version = "1.0-UNKNOWN";
|
|
|
|
+ boolean topicDeletionEnabled = true;
|
|
|
|
+ for (ConfigEntry entry : configs) {
|
|
|
|
+ if (entry.name().contains("inter.broker.protocol.version")) {
|
|
|
|
+ version = entry.value();
|
|
|
|
+ }
|
|
|
|
+ if (entry.name().equals("delete.topic.enable")) {
|
|
|
|
+ topicDeletionEnabled = Boolean.parseBoolean(entry.value());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ var builder = ConfigRelatedInfo.builder()
|
|
|
|
+ .version(version)
|
|
|
|
+ .topicDeletionIsAllowed(topicDeletionEnabled);
|
|
|
|
+ return SupportedFeature.forVersion(ac, version)
|
|
|
|
+ .map(features -> builder.features(features).build());
|
|
|
|
+ });
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
|
|
|
|
+ return describeClusterImpl(adminClient, Set.of())
|
|
|
|
+ // choosing node from which we will get configs (starting with controller)
|
|
|
|
+ .flatMap(descr -> descr.controller != null
|
|
|
|
+ ? Mono.just(descr.controller)
|
|
|
|
+ : Mono.justOrEmpty(descr.nodes.stream().findFirst())
|
|
|
|
+ )
|
|
|
|
+ .flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
|
|
|
|
+ .map(info -> new ReactiveAdminClient(adminClient, info));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
|
|
|
|
+ return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
|
|
|
|
+ .thenReturn(true)
|
|
|
|
+ .doOnError(th -> !(th instanceof SecurityDisabledException)
|
|
|
|
+ && !(th instanceof InvalidRequestException)
|
|
|
|
+ && !(th instanceof UnsupportedVersionException),
|
|
|
|
+ th -> log.warn("Error checking if security enabled", th))
|
|
|
|
+ .onErrorReturn(false);
|
|
|
|
+ }
|
|
|
|
+
|
|
// NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
|
|
// NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
|
|
// (see MonoSink.success(..) javadoc for details)
|
|
// (see MonoSink.success(..) javadoc for details)
|
|
public static <T> Mono<T> toMono(KafkaFuture<T> future) {
|
|
public static <T> Mono<T> toMono(KafkaFuture<T> future) {
|
|
@@ -158,8 +202,11 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
@Getter(AccessLevel.PACKAGE) // visible for testing
|
|
@Getter(AccessLevel.PACKAGE) // visible for testing
|
|
private final AdminClient client;
|
|
private final AdminClient client;
|
|
- private final String version;
|
|
|
|
- private final Set<SupportedFeature> features;
|
|
|
|
|
|
+ private volatile ConfigRelatedInfo configRelatedInfo;
|
|
|
|
+
|
|
|
|
+ public Set<SupportedFeature> getClusterFeatures() {
|
|
|
|
+ return configRelatedInfo.features();
|
|
|
|
+ }
|
|
|
|
|
|
public Mono<Set<String>> listTopics(boolean listInternal) {
|
|
public Mono<Set<String>> listTopics(boolean listInternal) {
|
|
return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
|
|
return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
|
|
@@ -170,7 +217,20 @@ public class ReactiveAdminClient implements Closeable {
|
|
}
|
|
}
|
|
|
|
|
|
public String getVersion() {
|
|
public String getVersion() {
|
|
- return version;
|
|
|
|
|
|
+ return configRelatedInfo.version();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public boolean isTopicDeletionEnabled() {
|
|
|
|
+ return configRelatedInfo.topicDeletionIsAllowed();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public Mono<Void> updateInternalStats(@Nullable Node controller) {
|
|
|
|
+ if (controller == null) {
|
|
|
|
+ return Mono.empty();
|
|
|
|
+ }
|
|
|
|
+ return ConfigRelatedInfo.extract(client, controller.id())
|
|
|
|
+ .doOnNext(info -> this.configRelatedInfo = info)
|
|
|
|
+ .then();
|
|
}
|
|
}
|
|
|
|
|
|
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
|
|
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
|
|
@@ -180,7 +240,7 @@ public class ReactiveAdminClient implements Closeable {
|
|
//NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
|
|
//NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
|
|
//and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
|
|
//and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
|
|
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
|
|
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
|
|
- var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
|
|
|
|
|
|
+ var includeDocFixed = includeDoc && getClusterFeatures().contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL);
|
|
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
|
|
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
|
|
return partitionCalls(
|
|
return partitionCalls(
|
|
topicNames,
|
|
topicNames,
|
|
@@ -329,7 +389,7 @@ public class ReactiveAdminClient implements Closeable {
|
|
}
|
|
}
|
|
|
|
|
|
public Mono<ClusterDescription> describeCluster() {
|
|
public Mono<ClusterDescription> describeCluster() {
|
|
- return describeClusterImpl(client, features);
|
|
|
|
|
|
+ return describeClusterImpl(client, getClusterFeatures());
|
|
}
|
|
}
|
|
|
|
|
|
private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
|
|
private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
|
|
@@ -351,23 +411,6 @@ public class ReactiveAdminClient implements Closeable {
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
- private static Mono<String> getClusterVersion(AdminClient client) {
|
|
|
|
- return describeClusterImpl(client, Set.of())
|
|
|
|
- // choosing node from which we will get configs (starting with controller)
|
|
|
|
- .flatMap(descr -> descr.controller != null
|
|
|
|
- ? Mono.just(descr.controller)
|
|
|
|
- : Mono.justOrEmpty(descr.nodes.stream().findFirst())
|
|
|
|
- )
|
|
|
|
- .flatMap(node -> loadBrokersConfig(client, List.of(node.id())))
|
|
|
|
- .flatMap(configs -> configs.values().stream()
|
|
|
|
- .flatMap(Collection::stream)
|
|
|
|
- .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
|
|
|
|
- .findFirst()
|
|
|
|
- .map(configEntry -> Mono.just(configEntry.value()))
|
|
|
|
- .orElse(Mono.empty()))
|
|
|
|
- .switchIfEmpty(Mono.just("1.0-UNKNOWN"));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
|
|
public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
|
|
return toMono(client.deleteConsumerGroups(groupIds).all())
|
|
return toMono(client.deleteConsumerGroups(groupIds).all())
|
|
.onErrorResume(GroupIdNotFoundException.class,
|
|
.onErrorResume(GroupIdNotFoundException.class,
|
|
@@ -401,7 +444,7 @@ public class ReactiveAdminClient implements Closeable {
|
|
// NOTE: places whole current topic config with new one. Entries that were present in old config,
|
|
// NOTE: places whole current topic config with new one. Entries that were present in old config,
|
|
// but missed in new will be set to default
|
|
// but missed in new will be set to default
|
|
public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
|
|
public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
|
|
- if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
|
|
|
|
|
|
+ if (getClusterFeatures().contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
|
|
return getTopicsConfigImpl(List.of(topicName), false)
|
|
return getTopicsConfigImpl(List.of(topicName), false)
|
|
.map(conf -> conf.getOrDefault(topicName, List.of()))
|
|
.map(conf -> conf.getOrDefault(topicName, List.of()))
|
|
.flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
|
|
.flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
|
|
@@ -498,6 +541,14 @@ public class ReactiveAdminClient implements Closeable {
|
|
.flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
|
|
.flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * List offset for the specified topics, skipping no-leader partitions.
|
|
|
|
+ */
|
|
|
|
+ public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
|
|
|
|
+ OffsetSpec offsetSpec) {
|
|
|
|
+ return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
|
|
|
|
+ }
|
|
|
|
+
|
|
private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
|
|
private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
|
|
boolean failOnUnknownLeader) {
|
|
boolean failOnUnknownLeader) {
|
|
var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
|
|
var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
|
|
@@ -507,34 +558,44 @@ public class ReactiveAdminClient implements Closeable {
|
|
descriptions.values(), partitions::contains, failOnUnknownLeader));
|
|
descriptions.values(), partitions::contains, failOnUnknownLeader));
|
|
}
|
|
}
|
|
|
|
|
|
- private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
|
|
Predicate<TopicPartition> partitionPredicate,
|
|
Predicate<TopicPartition> partitionPredicate,
|
|
boolean failOnUnknownLeader) {
|
|
boolean failOnUnknownLeader) {
|
|
var goodPartitions = new HashSet<TopicPartition>();
|
|
var goodPartitions = new HashSet<TopicPartition>();
|
|
for (TopicDescription description : topicDescriptions) {
|
|
for (TopicDescription description : topicDescriptions) {
|
|
|
|
+ var goodTopicPartitions = new ArrayList<TopicPartition>();
|
|
for (TopicPartitionInfo partitionInfo : description.partitions()) {
|
|
for (TopicPartitionInfo partitionInfo : description.partitions()) {
|
|
TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
|
|
TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
|
|
- if (!partitionPredicate.test(topicPartition)) {
|
|
|
|
- continue;
|
|
|
|
|
|
+ if (partitionInfo.leader() == null) {
|
|
|
|
+ if (failOnUnknownLeader) {
|
|
|
|
+ throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
|
|
|
|
+ } else {
|
|
|
|
+ // if ANY of topic partitions has no leader - we have to skip all topic partitions
|
|
|
|
+ goodTopicPartitions.clear();
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- if (partitionInfo.leader() != null) {
|
|
|
|
- goodPartitions.add(topicPartition);
|
|
|
|
- } else if (failOnUnknownLeader) {
|
|
|
|
- throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
|
|
|
|
|
|
+ if (partitionPredicate.test(topicPartition)) {
|
|
|
|
+ goodTopicPartitions.add(topicPartition);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ goodPartitions.addAll(goodTopicPartitions);
|
|
}
|
|
}
|
|
return goodPartitions;
|
|
return goodPartitions;
|
|
}
|
|
}
|
|
|
|
|
|
- // 1. NOTE(!): should only apply for partitions with existing leader,
|
|
|
|
|
|
+ // 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
|
|
// otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
|
|
// otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
|
|
// 2. NOTE(!): Skips partitions that were not initialized yet
|
|
// 2. NOTE(!): Skips partitions that were not initialized yet
|
|
// (UnknownTopicOrPartitionException thrown, ex. after topic creation)
|
|
// (UnknownTopicOrPartitionException thrown, ex. after topic creation)
|
|
// 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
|
|
// 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
|
|
@KafkaClientInternalsDependant
|
|
@KafkaClientInternalsDependant
|
|
- public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
|
|
|
|
- OffsetSpec offsetSpec) {
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
|
|
|
|
+ if (partitions.isEmpty()) {
|
|
|
|
+ return Mono.just(Map.of());
|
|
|
|
+ }
|
|
|
|
|
|
Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
|
|
Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
|
|
parts -> {
|
|
parts -> {
|
|
@@ -557,6 +618,22 @@ public class ReactiveAdminClient implements Closeable {
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
|
|
|
|
+ Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
|
|
|
+ return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
|
|
|
|
+ Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
|
|
|
+ return toMono(client.createAcls(aclBindings).all());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
|
|
|
|
+ Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
|
|
|
+ var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
|
|
|
|
+ return toMono(client.deleteAcls(filters).all()).then();
|
|
|
|
+ }
|
|
|
|
+
|
|
public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
|
|
public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
|
|
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
|
|
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
|
|
AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
|
|
AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
|