|
@@ -25,9 +25,7 @@ import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.CompletionException;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.function.BiFunction;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.function.Predicate;
|
|
@@ -260,37 +258,32 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
* such topics in resulting map.
|
|
|
* <p/>
|
|
|
* This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
|
|
|
- * finished with <code>clazz</code> exception.
|
|
|
+ * finished with <code>clazz</code> exception and empty Monos.
|
|
|
*/
|
|
|
static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
|
|
|
- Class<? extends KafkaException> clazz) {
|
|
|
+ Class<? extends KafkaException> clazz) {
|
|
|
if (values.isEmpty()) {
|
|
|
return Mono.just(Map.of());
|
|
|
}
|
|
|
|
|
|
- List<Mono<Tuple2<K, V>>> monos = values.entrySet().stream()
|
|
|
- .map(e -> toMono(e.getValue()).map(r -> Tuples.of(e.getKey(), r)))
|
|
|
- .collect(toList());
|
|
|
-
|
|
|
- return Mono.create(sink -> {
|
|
|
- var finishedCnt = new AtomicInteger();
|
|
|
- var results = new ConcurrentHashMap<K, V>();
|
|
|
- monos.forEach(mono -> mono.subscribe(
|
|
|
- r -> {
|
|
|
- results.put(r.getT1(), r.getT2());
|
|
|
- if (finishedCnt.incrementAndGet() == monos.size()) {
|
|
|
- sink.success(results);
|
|
|
- }
|
|
|
- },
|
|
|
- th -> {
|
|
|
- if (!th.getClass().isAssignableFrom(clazz)) {
|
|
|
- sink.error(th);
|
|
|
- } else if (finishedCnt.incrementAndGet() == monos.size()) {
|
|
|
- sink.success(results);
|
|
|
- }
|
|
|
- }
|
|
|
- ));
|
|
|
- });
|
|
|
+ List<Mono<Tuple2<K, Optional<V>>>> monos = values.entrySet().stream()
|
|
|
+ .map(e ->
|
|
|
+ toMono(e.getValue())
|
|
|
+ .map(r -> Tuples.of(e.getKey(), Optional.of(r)))
|
|
|
+ .defaultIfEmpty(Tuples.of(e.getKey(), Optional.empty())) //tracking empty Monos
|
|
|
+ .onErrorResume(
|
|
|
+ // tracking Monos with suppressible error
|
|
|
+ th -> th.getClass().isAssignableFrom(clazz),
|
|
|
+ th -> Mono.just(Tuples.of(e.getKey(), Optional.empty()))))
|
|
|
+ .toList();
|
|
|
+
|
|
|
+ return Mono.zip(
|
|
|
+ monos,
|
|
|
+ resultsArr -> Stream.of(resultsArr)
|
|
|
+ .map(obj -> (Tuple2<K, Optional<V>>) obj)
|
|
|
+ .filter(t -> t.getT2().isPresent()) //skipping empty & suppressible-errors
|
|
|
+ .collect(Collectors.toMap(Tuple2::getT1, t -> t.getT2().get()))
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
|
|
@@ -305,6 +298,10 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
}
|
|
|
|
|
|
public Mono<ClusterDescription> describeCluster() {
|
|
|
+ return describeClusterImpl(client);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Mono<ClusterDescription> describeClusterImpl(AdminClient client) {
|
|
|
var r = client.describeCluster();
|
|
|
var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations());
|
|
|
return Mono.create(sink -> all.whenComplete((res, ex) -> {
|
|
@@ -328,15 +325,20 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
}
|
|
|
|
|
|
private static Mono<String> getClusterVersion(AdminClient client) {
|
|
|
- return toMono(client.describeCluster().controller())
|
|
|
- .flatMap(controller -> loadBrokersConfig(client, List.of(controller.id())))
|
|
|
- .map(configs -> configs.values().stream()
|
|
|
+ return describeClusterImpl(client)
|
|
|
+ // choosing node from which we will get configs (starting with controller)
|
|
|
+ .flatMap(descr -> descr.controller != null
|
|
|
+ ? Mono.just(descr.controller)
|
|
|
+ : Mono.justOrEmpty(descr.nodes.stream().findFirst())
|
|
|
+ )
|
|
|
+ .flatMap(node -> loadBrokersConfig(client, List.of(node.id())))
|
|
|
+ .flatMap(configs -> configs.values().stream()
|
|
|
.flatMap(Collection::stream)
|
|
|
.filter(entry -> entry.name().contains("inter.broker.protocol.version"))
|
|
|
.findFirst()
|
|
|
- .map(ConfigEntry::value)
|
|
|
- .orElse("1.0-UNKNOWN")
|
|
|
- );
|
|
|
+ .map(configEntry -> Mono.just(configEntry.value()))
|
|
|
+ .orElse(Mono.empty()))
|
|
|
+ .switchIfEmpty(Mono.just("1.0-UNKNOWN"));
|
|
|
}
|
|
|
|
|
|
public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
|
|
@@ -419,6 +421,7 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
/**
|
|
|
* List offset for the topic's partitions and OffsetSpec.
|
|
|
+ *
|
|
|
* @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
|
|
|
* false - skip partitions with no leader
|
|
|
*/
|
|
@@ -432,6 +435,7 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
/**
|
|
|
* List offset for the specified partitions and OffsetSpec.
|
|
|
+ *
|
|
|
* @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
|
|
|
* false - skip partitions with no leader
|
|
|
*/
|