|
@@ -129,11 +129,21 @@ public class KafkaService {
|
|
|
getClusterMetrics(ac.getAdminClient())
|
|
|
.flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
|
|
|
.flatMap(clusterMetrics ->
|
|
|
- getTopicsData(ac.getAdminClient()).flatMap(it ->
|
|
|
- updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it)
|
|
|
+ getTopicsData(ac.getAdminClient()).flatMap(it -> {
|
|
|
+ if (cluster.getDisableLogDirsCollection() == null
|
|
|
+ || !cluster.getDisableLogDirsCollection()) {
|
|
|
+ return updateSegmentMetrics(
|
|
|
+ ac.getAdminClient(), clusterMetrics, it
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ return emptySegmentMetrics(clusterMetrics, it);
|
|
|
+ }
|
|
|
+ }
|
|
|
).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto))
|
|
|
)
|
|
|
)
|
|
|
+ ).doOnError(e ->
|
|
|
+ log.error("Failed to collect cluster {} info", cluster.getName(), e)
|
|
|
).onErrorResume(
|
|
|
e -> Mono.just(cluster.toBuilder()
|
|
|
.status(ServerStatus.OFFLINE)
|
|
@@ -484,6 +494,28 @@ public class KafkaService {
|
|
|
.build();
|
|
|
}
|
|
|
|
|
|
+ private Mono<InternalSegmentSizeDto> emptySegmentMetrics(InternalClusterMetrics clusterMetrics,
|
|
|
+ List<InternalTopic> internalTopics) {
|
|
|
+ return Mono.just(
|
|
|
+ InternalSegmentSizeDto.builder()
|
|
|
+ .clusterMetricsWithSegmentSize(
|
|
|
+ clusterMetrics.toBuilder()
|
|
|
+ .segmentSize(0)
|
|
|
+ .segmentCount(0)
|
|
|
+ .internalBrokerDiskUsage(Collections.emptyMap())
|
|
|
+ .build()
|
|
|
+ )
|
|
|
+ .internalTopicWithSegmentSize(
|
|
|
+ internalTopics.stream().collect(
|
|
|
+ Collectors.toMap(
|
|
|
+ InternalTopic::getName,
|
|
|
+ i -> i
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ).build()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac,
|
|
|
InternalClusterMetrics clusterMetrics,
|
|
|
List<InternalTopic> internalTopics) {
|
|
@@ -491,9 +523,11 @@ public class KafkaService {
|
|
|
internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList());
|
|
|
return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic ->
|
|
|
ClusterUtil.toMono(ac.describeCluster().nodes()).flatMap(nodes ->
|
|
|
+
|
|
|
ClusterUtil.toMono(
|
|
|
- ac.describeLogDirs(nodes.stream().map(Node::id).collect(Collectors.toList())).all())
|
|
|
- .map(log -> {
|
|
|
+ ac.describeLogDirs(
|
|
|
+ nodes.stream().map(Node::id).collect(Collectors.toList())).all()
|
|
|
+ ).map(log -> {
|
|
|
final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
|
|
|
log.entrySet().stream().flatMap(b ->
|
|
|
b.getValue().entrySet().stream().flatMap(topicMap ->
|