diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java index 89fa06484e..88cc259b99 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java @@ -38,7 +38,10 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable { .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); return AdminClient.create(properties); - }).flatMap(ReactiveAdminClient::create); + }) + .flatMap(ReactiveAdminClient::create) + .onErrorMap(th -> new IllegalStateException( + "Error while creating AdminClient for Cluster " + cluster.getName(), th)); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java index 514da582c7..cd909d4a1d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java @@ -31,7 +31,7 @@ public class MetricsService { private Mono getMetrics(KafkaCluster cluster) { return adminClientService.get(cluster).flatMap(ac -> - ac.describeCluster().flatMap(description -> + ac.describeCluster().flatMap(description -> Mono.zip( List.of( jmxClusterUtil.getBrokerMetrics(cluster, description.getNodes()), @@ -52,11 +52,11 @@ public class MetricsService { .topicConfigs((Map>) results[4]) .topicDescriptions((Map) results[5]) .build() - )) - .doOnError(e -> - log.error("Failed to collect cluster {} info", cluster.getName(), e)) - .onErrorResume( - e -> Mono.just(MetricsCache.empty().toBuilder().lastKafkaException(e).build()))); + ))) + .doOnError(e -> + log.error("Failed to collect cluster {} info", cluster.getName(), e)) + .onErrorResume( + e -> Mono.just(MetricsCache.empty().toBuilder().lastKafkaException(e).build())); } private Mono getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) {