Fixing AdminClients requesting stucking
* jmx metrics collection parallelized on elastic scheduler (#1836) * explicitly setting publishing Scheduler for KafkaFuture-based Monos Co-authored-by: iliax <ikuramshin@provectus.com>
This commit is contained in:
parent
b4e52afbdb
commit
76c5fae4dd
2 changed files with 14 additions and 2 deletions
|
@ -52,6 +52,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
|
||||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||||
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
import reactor.util.function.Tuple2;
|
import reactor.util.function.Tuple2;
|
||||||
import reactor.util.function.Tuples;
|
import reactor.util.function.Tuples;
|
||||||
|
|
||||||
|
@ -98,7 +99,13 @@ public class ReactiveAdminClient implements Closeable {
|
||||||
} else {
|
} else {
|
||||||
sink.success(res);
|
sink.success(res);
|
||||||
}
|
}
|
||||||
})).doOnCancel(() -> future.cancel(true));
|
})).doOnCancel(() -> future.cancel(true))
|
||||||
|
// AdminClient is using single thread for kafka communication
|
||||||
|
// and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
|
||||||
|
// If some of downstream operation are blocking (by mistake) this can lead to
|
||||||
|
// other AdminClient's requests stucking, which can cause timeout exceptions.
|
||||||
|
// So, we explicitly setting Scheduler for downstream processing.
|
||||||
|
.publishOn(Schedulers.parallel());
|
||||||
}
|
}
|
||||||
|
|
||||||
//---------------------------------------------------------------------------------
|
//---------------------------------------------------------------------------------
|
||||||
|
@ -171,7 +178,7 @@ public class ReactiveAdminClient implements Closeable {
|
||||||
/**
|
/**
|
||||||
* Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
|
* Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
|
||||||
* logic resulting Mono will be failing if any of Futures finished with error.
|
* logic resulting Mono will be failing if any of Futures finished with error.
|
||||||
* In some situations it is not what we what, ex. we call describeTopics(List names) method and
|
* In some situations it is not what we want, ex. we call describeTopics(List names) method and
|
||||||
* we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
|
* we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
|
||||||
* such topics in resulting map.
|
* such topics in resulting map.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.jetbrains.annotations.Nullable;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
import reactor.util.function.Tuple2;
|
import reactor.util.function.Tuple2;
|
||||||
import reactor.util.function.Tuples;
|
import reactor.util.function.Tuples;
|
||||||
|
|
||||||
|
@ -69,8 +70,12 @@ public class JmxClusterUtil {
|
||||||
|
|
||||||
public Mono<JmxMetrics> getBrokerMetrics(KafkaCluster cluster, Collection<Node> nodes) {
|
public Mono<JmxMetrics> getBrokerMetrics(KafkaCluster cluster, Collection<Node> nodes) {
|
||||||
return Flux.fromIterable(nodes)
|
return Flux.fromIterable(nodes)
|
||||||
|
// jmx is a blocking api, so we trying to parallelize its execution on boundedElastic scheduler
|
||||||
|
.parallel()
|
||||||
|
.runOn(Schedulers.boundedElastic())
|
||||||
.map(n -> Map.entry(n.id(),
|
.map(n -> Map.entry(n.id(),
|
||||||
JmxBrokerMetrics.builder().metrics(getJmxMetric(cluster, n)).build()))
|
JmxBrokerMetrics.builder().metrics(getJmxMetric(cluster, n)).build()))
|
||||||
|
.sequential()
|
||||||
.collectMap(Map.Entry::getKey, Map.Entry::getValue)
|
.collectMap(Map.Entry::getKey, Map.Entry::getValue)
|
||||||
.map(this::collectMetrics);
|
.map(this::collectMetrics);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue