소스 검색

Reduce to map moved to clusterutil, leader moved in cache

Roman Nedzvetskiy 5 년 전
부모
커밋
404004d9c5

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -9,7 +9,9 @@ import org.apache.kafka.common.KafkaFuture;
 import reactor.core.publisher.Mono;
 
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static com.provectus.kafka.ui.kafka.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
 import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
@@ -96,4 +98,9 @@ public class ClusterUtil {
         return topic.build();
     }
 
+    public static <T, R> Map<T, R> toSingleMap (Stream<Map<T, R>> streamOfMaps) {
+        return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
+    }
+
 }

+ 50 - 49
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -33,6 +33,7 @@ public class KafkaService {
 
     private final ZookeeperService zookeeperService;
     private final Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
+    private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
 
     @SneakyThrows
     public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
@@ -128,6 +129,19 @@ public class KafkaService {
     private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
         return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names())
                     .flatMap(topics -> ClusterUtil.toMono(adminClient.describeTopics(topics).all()))
+                    .map(topic -> {
+                        var leadersMap = topic.values().stream()
+                            .flatMap(t -> t.partitions().stream()
+                                    .flatMap(t1 -> {
+                                        Map<TopicPartition, Integer> result = new HashMap<>();
+                                        result.put(new TopicPartition(t.name(), t1.partition()), t1.leader().id());
+                                        return Stream.of(result);
+                                    }))
+                            .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
+                                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
+                        leadersCache.put(adminClient, leadersMap);
+                        return topic;
+                    })
                     .map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
     }
 
@@ -250,56 +264,43 @@ public class KafkaService {
     }
 
     private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
-        return ClusterUtil.toMono(ac.describeTopics(internalTopic.keySet()).all()).flatMap(topic -> {
-            Map<TopicPartition, Integer> leaders = topic.values().stream()
-                    .flatMap(t -> t.partitions().stream()
-                            .flatMap(t1 -> {
-                                Map<TopicPartition, Integer> result = new HashMap<>();
-                                result.put(new TopicPartition(t.name(), t1.partition()), t1.leader().id());
+        return ClusterUtil.toMono(ac.describeTopics(internalTopic.keySet()).all()).flatMap(topic ->
+            ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
+                .map(log -> {
+                    var partitionSegmentSizeStream = leadersCache.get(ac).entrySet().stream()
+                            .flatMap(l -> {
+                                Map<TopicPartition, Long> result = new HashMap<>();
+                                result.put(l.getKey(), log.get(l.getValue()).values().stream().mapToLong(e -> e.replicaInfos.get(l.getKey()).size).sum());
                                 return Stream.of(result);
-                            }))
-                    .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
-                            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
-
-            return ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
-                    .map(log -> {
-                        var partitionSegmentSize = leaders.entrySet().stream()
-                                .flatMap(l -> {
-                                    Map<TopicPartition, Long> result = new HashMap<>();
-                                    result.put(l.getKey(), log.get(l.getValue()).values().stream().mapToLong(e -> e.replicaInfos.get(l.getKey()).size).sum());
-                                    return Stream.of(result);
-                                }).reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
-                                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
-
-                        var resultTopicMetrics = internalTopic.keySet().stream().flatMap(k -> {
-                            Map<String, InternalTopic> result = new HashMap<>();
-                            result.put(k, internalTopic.get(k).toBuilder()
-                                    .segmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).mapToLong(Map.Entry::getValue).sum())
-                                    .partitionSegmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).build());
-                            return Stream.of(result);
-                        }).reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
-                                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
-
-                        var resultBrokerMetrics = clusterMetrics.getInternalBrokerMetrics().entrySet().stream().map(
-                                e -> {
-                                    var brokerSegmentSize = log.get(e.getKey()).values().stream()
-                                            .mapToLong(v -> v.replicaInfos.values().stream()
-                                                    .mapToLong(r -> r.size).sum()).sum();
-                                    InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build();
-                                    return Collections.singletonMap(e.getKey(), tempBrokerMetrics);
-                                })
-                                .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
-                                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
-
-                        var resultClusterMetrics = clusterMetrics.toBuilder()
-                                .internalBrokerMetrics(resultBrokerMetrics)
-                                .segmentSize(partitionSegmentSize.values().stream().reduce(Long::sum).orElseThrow())
-                                .build();
-
-                        return InternalSegmentSizeDto.builder()
-                                .clusterMetricsWithSegmentSize(resultClusterMetrics)
-                                .internalTopicWithSegmentSize(resultTopicMetrics).build();
+                            });
+                    var partitionSegmentSize = ClusterUtil.toSingleMap(partitionSegmentSizeStream);
+
+                    var resultTopicMetricsStream = internalTopic.keySet().stream().flatMap(k -> {
+                        Map<String, InternalTopic> result = new HashMap<>();
+                        result.put(k, internalTopic.get(k).toBuilder()
+                                .segmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).mapToLong(Map.Entry::getValue).sum())
+                                .partitionSegmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).build());
+                        return Stream.of(result);
                     });
-        });
+
+                    var resultBrokerMetricsStream = clusterMetrics.getInternalBrokerMetrics().entrySet().stream().map(
+                            e -> {
+                                var brokerSegmentSize = log.get(e.getKey()).values().stream()
+                                        .mapToLong(v -> v.replicaInfos.values().stream()
+                                                .mapToLong(r -> r.size).sum()).sum();
+                                InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build();
+                                return Collections.singletonMap(e.getKey(), tempBrokerMetrics);
+                            });
+
+                    var resultClusterMetrics = clusterMetrics.toBuilder()
+                            .internalBrokerMetrics(ClusterUtil.toSingleMap(resultBrokerMetricsStream))
+                            .segmentSize(partitionSegmentSize.values().stream().reduce(Long::sum).orElseThrow())
+                            .build();
+
+                    return InternalSegmentSizeDto.builder()
+                            .clusterMetricsWithSegmentSize(resultClusterMetrics)
+                            .internalTopicWithSegmentSize(ClusterUtil.toSingleMap(resultTopicMetricsStream)).build();
+                })
+            );
     }
 }