|
@@ -31,7 +31,7 @@ public class MetricsService {
|
|
|
|
|
|
private Mono<MetricsCache.Metrics> getMetrics(KafkaCluster cluster) {
|
|
|
return adminClientService.get(cluster).flatMap(ac ->
|
|
|
- ac.describeCluster().flatMap(description ->
|
|
|
+ ac.describeCluster().flatMap(description ->
|
|
|
Mono.zip(
|
|
|
List.of(
|
|
|
jmxClusterUtil.getBrokerMetrics(cluster, description.getNodes()),
|
|
@@ -52,11 +52,11 @@ public class MetricsService {
|
|
|
.topicConfigs((Map<String, List<ConfigEntry>>) results[4])
|
|
|
.topicDescriptions((Map<String, TopicDescription>) results[5])
|
|
|
.build()
|
|
|
- ))
|
|
|
- .doOnError(e ->
|
|
|
- log.error("Failed to collect cluster {} info", cluster.getName(), e))
|
|
|
- .onErrorResume(
|
|
|
- e -> Mono.just(MetricsCache.empty().toBuilder().lastKafkaException(e).build())));
|
|
|
+ )))
|
|
|
+ .doOnError(e ->
|
|
|
+ log.error("Failed to collect cluster {} info", cluster.getName(), e))
|
|
|
+ .onErrorResume(
|
|
|
+ e -> Mono.just(MetricsCache.empty().toBuilder().lastKafkaException(e).build()));
|
|
|
}
|
|
|
|
|
|
private Mono<InternalLogDirStats> getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) {
|