|
@@ -1,5 +1,7 @@
|
|
|
package com.provectus.kafka.ui.service;
|
|
|
|
|
|
+import static com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
|
|
|
+
|
|
|
import com.provectus.kafka.ui.model.Feature;
|
|
|
import com.provectus.kafka.ui.model.InternalLogDirStats;
|
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
@@ -9,10 +11,12 @@ import com.provectus.kafka.ui.model.Statistics;
|
|
|
import com.provectus.kafka.ui.service.metrics.MetricsCollector;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.stream.Collectors;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.admin.ConfigEntry;
|
|
|
import org.apache.kafka.clients.admin.TopicDescription;
|
|
|
+import org.apache.kafka.common.Node;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
@@ -21,7 +25,7 @@ import reactor.core.publisher.Mono;
|
|
|
@Slf4j
|
|
|
public class StatisticsService {
|
|
|
|
|
|
- private final MetricsCollector metricsClusterUtil;
|
|
|
+ private final MetricsCollector metricsCollector;
|
|
|
private final AdminClientService adminClientService;
|
|
|
private final FeatureService featureService;
|
|
|
private final StatisticsCache cache;
|
|
@@ -35,8 +39,8 @@ public class StatisticsService {
|
|
|
ac.describeCluster().flatMap(description ->
|
|
|
Mono.zip(
|
|
|
List.of(
|
|
|
- metricsClusterUtil.getBrokerMetrics(cluster, description.getNodes()),
|
|
|
- getLogDirInfo(cluster, ac),
|
|
|
+ metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
|
|
|
+ getLogDirInfo(description, ac),
|
|
|
featureService.getAvailableFeatures(cluster, description.getController()),
|
|
|
loadTopicConfigs(cluster),
|
|
|
describeTopics(cluster)),
|
|
@@ -58,11 +62,9 @@ public class StatisticsService {
|
|
|
e -> Mono.just(Statistics.empty().toBuilder().lastKafkaException(e).build()));
|
|
|
}
|
|
|
|
|
|
- private Mono<InternalLogDirStats> getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) {
|
|
|
- if (!cluster.isDisableLogDirsCollection()) {
|
|
|
- return c.describeLogDirs().map(InternalLogDirStats::new);
|
|
|
- }
|
|
|
- return Mono.just(InternalLogDirStats.empty());
|
|
|
+ private Mono<InternalLogDirStats> getLogDirInfo(ClusterDescription desc, ReactiveAdminClient ac) {
|
|
|
+ var brokerIds = desc.getNodes().stream().map(Node::id).collect(Collectors.toSet());
|
|
|
+ return ac.describeLogDirs(brokerIds).map(InternalLogDirStats::new);
|
|
|
}
|
|
|
|
|
|
private Mono<Map<String, TopicDescription>> describeTopics(KafkaCluster c) {
|