|
@@ -18,6 +18,7 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.CompletionException;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -54,6 +55,7 @@ import org.apache.kafka.common.acl.AclOperation;
|
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
|
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
|
|
import org.apache.kafka.common.errors.GroupNotEmptyException;
|
|
|
+import org.apache.kafka.common.errors.InvalidRequestException;
|
|
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
|
|
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
|
|
import reactor.core.publisher.Mono;
|
|
@@ -81,7 +83,7 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
}
|
|
|
|
|
|
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
|
|
|
- return getClusterVersionImpl(adminClient)
|
|
|
+ return getClusterVersion(adminClient)
|
|
|
.map(ver ->
|
|
|
new ReactiveAdminClient(
|
|
|
adminClient,
|
|
@@ -105,7 +107,13 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
private static <T> Mono<T> toMono(KafkaFuture<T> future) {
|
|
|
return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
|
|
|
if (ex != null) {
|
|
|
- sink.error(ex);
|
|
|
+ // KafkaFuture doc is unclear about what exception wrapper will be used
|
|
|
+ // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
|
|
|
+ if (ex instanceof CompletionException || ex instanceof ExecutionException) {
|
|
|
+ sink.error(ex.getCause()); //unwrapping exception
|
|
|
+ } else {
|
|
|
+ sink.error(ex);
|
|
|
+ }
|
|
|
} else {
|
|
|
sink.success(res);
|
|
|
}
|
|
@@ -166,17 +174,29 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
c -> List.copyOf(c.getValue().entries()))));
|
|
|
}
|
|
|
|
|
|
- public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
|
|
|
+ private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClient client, List<Integer> brokerIds) {
|
|
|
List<ConfigResource> resources = brokerIds.stream()
|
|
|
.map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
|
|
|
.collect(toList());
|
|
|
return toMono(client.describeConfigs(resources).all())
|
|
|
+ .doOnError(InvalidRequestException.class,
|
|
|
+ th -> log.trace("Error while getting broker {} configs", brokerIds, th))
|
|
|
+ // some kafka backends (like MSK serverless) do not support broker's configs retrieval,
|
|
|
+ // in that case InvalidRequestException will be thrown
|
|
|
+ .onErrorResume(InvalidRequestException.class, th -> Mono.just(Map.of()))
|
|
|
.map(config -> config.entrySet().stream()
|
|
|
.collect(toMap(
|
|
|
c -> Integer.valueOf(c.getKey().name()),
|
|
|
c -> new ArrayList<>(c.getValue().entries()))));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Return per-broker configs or empty map if broker's configs retrieval not supported.
|
|
|
+ */
|
|
|
+ public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
|
|
|
+ return loadBrokersConfig(client, brokerIds);
|
|
|
+ }
|
|
|
+
|
|
|
public Mono<Map<String, TopicDescription>> describeTopics() {
|
|
|
return listTopics(true).flatMap(this::describeTopics);
|
|
|
}
|
|
@@ -280,20 +300,16 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
}));
|
|
|
}
|
|
|
|
|
|
- private static Mono<String> getClusterVersionImpl(AdminClient client) {
|
|
|
- return toMono(client.describeCluster().controller()).flatMap(controller ->
|
|
|
- toMono(client.describeConfigs(
|
|
|
- List.of(new ConfigResource(
|
|
|
- ConfigResource.Type.BROKER, String.valueOf(controller.id()))))
|
|
|
- .all()
|
|
|
- .thenApply(configs ->
|
|
|
- configs.values().stream()
|
|
|
- .map(Config::entries)
|
|
|
- .flatMap(Collection::stream)
|
|
|
- .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
|
|
|
- .findFirst().map(ConfigEntry::value)
|
|
|
- .orElse("1.0-UNKNOWN")
|
|
|
- )));
|
|
|
+ private static Mono<String> getClusterVersion(AdminClient client) {
|
|
|
+ return toMono(client.describeCluster().controller())
|
|
|
+ .flatMap(controller -> loadBrokersConfig(client, List.of(controller.id())))
|
|
|
+ .map(configs -> configs.values().stream()
|
|
|
+ .flatMap(Collection::stream)
|
|
|
+ .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
|
|
|
+ .findFirst()
|
|
|
+ .map(ConfigEntry::value)
|
|
|
+ .orElse("1.0-UNKNOWN")
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
|
|
@@ -306,10 +322,14 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
public Mono<Void> createTopic(String name,
|
|
|
int numPartitions,
|
|
|
- short replicationFactor,
|
|
|
+ @Nullable Integer replicationFactor,
|
|
|
Map<String, String> configs) {
|
|
|
- return toMono(client.createTopics(
|
|
|
- List.of(new NewTopic(name, numPartitions, replicationFactor).configs(configs))).all());
|
|
|
+ var newTopic = new NewTopic(
|
|
|
+ name,
|
|
|
+ Optional.of(numPartitions),
|
|
|
+ Optional.ofNullable(replicationFactor).map(Integer::shortValue)
|
|
|
+ ).configs(configs);
|
|
|
+ return toMono(client.createTopics(List.of(newTopic)).all());
|
|
|
}
|
|
|
|
|
|
public Mono<Void> alterPartitionReassignments(
|