diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index d01b8efb07..dedd609743 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -52,6 +52,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -98,7 +99,13 @@ public class ReactiveAdminClient implements Closeable { } else { sink.success(res); } - })).doOnCancel(() -> future.cancel(true)); + })).doOnCancel(() -> future.cancel(true)) + // AdminClient is using single thread for kafka communication + // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread. + // If some of downstream operation are blocking (by mistake) this can lead to + // other AdminClient's requests stucking, which can cause timeout exceptions. + // So, we explicitly setting Scheduler for downstream processing. + .publishOn(Schedulers.parallel()); } //--------------------------------------------------------------------------------- @@ -171,7 +178,7 @@ public class ReactiveAdminClient implements Closeable { /** * Kafka API often returns Map responses with KafkaFuture values. If we do allOf() * logic resulting Mono will be failing if any of Futures finished with error. - * In some situations it is not what we what, ex. we call describeTopics(List names) method and + * In some situations it is not what we want, ex. we call describeTopics(List names) method and * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put * such topics in resulting map. *

diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java index 1edd85c245..163ebb5e9e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java @@ -35,6 +35,7 @@ import org.jetbrains.annotations.Nullable; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -69,8 +70,12 @@ public class JmxClusterUtil { public Mono getBrokerMetrics(KafkaCluster cluster, Collection nodes) { return Flux.fromIterable(nodes) + // jmx is a blocking api, so we trying to parallelize its execution on boundedElastic scheduler + .parallel() + .runOn(Schedulers.boundedElastic()) .map(n -> Map.entry(n.id(), JmxBrokerMetrics.builder().metrics(getJmxMetric(cluster, n)).build())) + .sequential() .collectMap(Map.Entry::getKey, Map.Entry::getValue) .map(this::collectMetrics); }