Jelajahi Sumber

Changed topic segment counting logic by finding the partition leader, added partition collecting in topic

Roman Nedzvetskiy 5 tahun lalu
induk
melakukan
716d4d8836

+ 3 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java

@@ -2,8 +2,10 @@ package com.provectus.kafka.ui.cluster.model;
 
 import lombok.Builder;
 import lombok.Data;
+import org.apache.kafka.common.TopicPartition;
 
 import java.util.List;
+import java.util.Map;
 
 @Data
 @Builder(toBuilder = true)
@@ -22,4 +24,5 @@ public class InternalTopic {
     //TODO: find way to fill
     private final long segmentSize;
     private final int segmentCount;
+    private final Map<TopicPartition, Long> partitionSegmentSize;
 }

+ 32 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -12,13 +12,13 @@ import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
-import javax.swing.*;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -248,23 +248,37 @@ public class KafkaService {
     }
 
     private Mono<Map<String, InternalTopic>> updateTopicSegmentSize(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
-                return ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
-                        .map(l -> {
-                            var topicsInfo = l.values().stream()
-                                .flatMap(v -> Stream.of(v.values()))
-                                .flatMap(v -> Stream.of(v.stream().map(s -> s.replicaInfos)))
-                                .findFirst().orElseThrow().collect(Collectors.toList());
-                            var internalSegments = topicsInfo.stream()
-                                    .flatMap(t -> t.entrySet().stream()
-                                        .flatMap(e -> Stream.of(new InternalSegmentSize(e.getKey().topic(), e.getValue().size))))
-                                    .collect(Collectors.toList());
-                            return internalTopic.values().stream().flatMap(k ->
-                                    Stream.of(k.toBuilder().segmentSize(internalSegments.stream()
-                                            .filter(key -> key.getReplicaName().equals(k.getName()))
-                                            .mapToLong(InternalSegmentSize::getSegmentSize)
-                                            .sum()).build()))
-                                    .collect(Collectors.toMap(InternalTopic::getName, v -> v));
-                        });
+            return ClusterUtil.toMono(ac.describeTopics(internalTopic.keySet()).all()).flatMap(topic ->
+                {
+                    Map<TopicPartition, Integer> leaders = topic.values().stream()
+                            .flatMap(t -> t.partitions().stream()
+                                .flatMap(t1 -> {
+                                    Map <TopicPartition, Integer > result = new HashMap<>();
+                                    result.put(new TopicPartition(t.name(), t1.partition()), t1.leader().id());
+                                    return Stream.of(result);
+                                }))
+                            .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
+                                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
+                    return ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
+                        .map(log -> {
+                            Map<TopicPartition, Long> partitionSegmentSize = leaders.entrySet().stream()
+                                .flatMap(l -> {
+                                    Map<TopicPartition, Long> result = new HashMap<>();
+                                    result.put(l.getKey(), log.get(l.getValue()).values().stream().mapToLong(e -> e.replicaInfos.get(l.getKey()).size).sum());
+                                    return Stream.of(result);
+                                }).reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
+                                            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
+                            return internalTopic.keySet().stream().flatMap(k -> {
+                                Map<String, InternalTopic> result = new HashMap<>();
+                                result.put(k, internalTopic.get(k).toBuilder()
+                                        .segmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).mapToLong(Map.Entry::getValue).sum())
+                                        .partitionSegmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).build());
+                                return Stream.of(result);
+                            }).reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
+                                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
+
+                            });
+                });
     }
 
     private Mono<InternalClusterMetrics> updateClusterSegmentSize (AdminClient client, InternalClusterMetrics clusterMetrics) {