|
@@ -43,8 +43,8 @@ public class KafkaService {
|
|
|
getTopicsData(ac).flatMap( topics ->
|
|
|
loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
|
|
|
.map( configs -> mergeWithConfigs(topics, configs))
|
|
|
- .flatMap(it -> updateTopicSegmentSize(ac, clusterMetrics, it))
|
|
|
- ).map( topics -> buildFromData(cluster, clusterMetrics, topics))
|
|
|
+ .flatMap(it -> updateSegmentMetrics(ac, clusterMetrics, it))
|
|
|
+ ).map( segmentSizeDto -> buildFromData(cluster, segmentSizeDto))
|
|
|
)
|
|
|
).onErrorResume(
|
|
|
e -> Mono.just(cluster.toBuilder()
|
|
@@ -54,7 +54,10 @@ public class KafkaService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalClusterMetrics brokersMetrics, Map<String, InternalTopic> topics) {
|
|
|
+ private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalSegmentSizeDto segmentSizeDto) {
|
|
|
+
|
|
|
+ var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
|
|
|
+ var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize();
|
|
|
|
|
|
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = brokersMetrics.toBuilder();
|
|
|
|
|
@@ -142,7 +145,7 @@ public class KafkaService {
|
|
|
return metricsBuilder.build();
|
|
|
}
|
|
|
)
|
|
|
- ).flatMap(c -> updateClusterSegmentSize(client, c));
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
|
|
@@ -229,7 +232,6 @@ public class KafkaService {
|
|
|
|
|
|
public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
|
|
|
var adminClient = this.createAdminClient(cluster);
|
|
|
-
|
|
|
return ClusterUtil.toMono(adminClient.listConsumerGroups().all())
|
|
|
.flatMap(s -> ClusterUtil.toMono(adminClient
|
|
|
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
|
|
@@ -247,64 +249,57 @@ public class KafkaService {
|
|
|
.next());
|
|
|
}
|
|
|
|
|
|
- private Mono<Map<String, InternalTopic>> updateTopicSegmentSize(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
|
|
|
- return ClusterUtil.toMono(ac.describeTopics(internalTopic.keySet()).all()).flatMap(topic ->
|
|
|
- {
|
|
|
- Map<TopicPartition, Integer> leaders = topic.values().stream()
|
|
|
- .flatMap(t -> t.partitions().stream()
|
|
|
- .flatMap(t1 -> {
|
|
|
- Map <TopicPartition, Integer > result = new HashMap<>();
|
|
|
- result.put(new TopicPartition(t.name(), t1.partition()), t1.leader().id());
|
|
|
- return Stream.of(result);
|
|
|
- }))
|
|
|
- .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
|
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
|
|
|
- return ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
|
|
|
- .map(log -> {
|
|
|
- Map<TopicPartition, Long> partitionSegmentSize = leaders.entrySet().stream()
|
|
|
+ private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
|
|
|
+ return ClusterUtil.toMono(ac.describeTopics(internalTopic.keySet()).all()).flatMap(topic -> {
|
|
|
+ Map<TopicPartition, Integer> leaders = topic.values().stream()
|
|
|
+ .flatMap(t -> t.partitions().stream()
|
|
|
+ .flatMap(t1 -> {
|
|
|
+ Map<TopicPartition, Integer> result = new HashMap<>();
|
|
|
+ result.put(new TopicPartition(t.name(), t1.partition()), t1.leader().id());
|
|
|
+ return Stream.of(result);
|
|
|
+ }))
|
|
|
+ .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
|
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
|
|
|
+
|
|
|
+ return ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
|
|
|
+ .map(log -> {
|
|
|
+ var partitionSegmentSize = leaders.entrySet().stream()
|
|
|
.flatMap(l -> {
|
|
|
Map<TopicPartition, Long> result = new HashMap<>();
|
|
|
result.put(l.getKey(), log.get(l.getValue()).values().stream().mapToLong(e -> e.replicaInfos.get(l.getKey()).size).sum());
|
|
|
return Stream.of(result);
|
|
|
}).reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
|
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
|
|
|
- return internalTopic.keySet().stream().flatMap(k -> {
|
|
|
- Map<String, InternalTopic> result = new HashMap<>();
|
|
|
- result.put(k, internalTopic.get(k).toBuilder()
|
|
|
- .segmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).mapToLong(Map.Entry::getValue).sum())
|
|
|
- .partitionSegmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).build());
|
|
|
- return Stream.of(result);
|
|
|
- }).reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
|
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
|
|
|
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
|
|
|
+
|
|
|
+ var resultTopicMetrics = internalTopic.keySet().stream().flatMap(k -> {
|
|
|
+ Map<String, InternalTopic> result = new HashMap<>();
|
|
|
+ result.put(k, internalTopic.get(k).toBuilder()
|
|
|
+ .segmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).mapToLong(Map.Entry::getValue).sum())
|
|
|
+ .partitionSegmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).build());
|
|
|
+ return Stream.of(result);
|
|
|
+ }).reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
|
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
|
|
|
|
|
|
- });
|
|
|
- });
|
|
|
+ var resultBrokerMetrics = clusterMetrics.getInternalBrokerMetrics().entrySet().stream().map(
|
|
|
+ e -> {
|
|
|
+ var brokerSegmentSize = log.get(e.getKey()).values().stream()
|
|
|
+ .mapToLong(v -> v.replicaInfos.values().stream()
|
|
|
+ .mapToLong(r -> r.size).sum()).sum();
|
|
|
+ InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build();
|
|
|
+ return Collections.singletonMap(e.getKey(), tempBrokerMetrics);
|
|
|
+ })
|
|
|
+ .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
|
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
|
|
|
+
|
|
|
+ var resultClusterMetrics = clusterMetrics.toBuilder()
|
|
|
+ .internalBrokerMetrics(resultBrokerMetrics)
|
|
|
+ .segmentSize(partitionSegmentSize.values().stream().reduce(Long::sum).orElseThrow())
|
|
|
+ .build();
|
|
|
+
|
|
|
+ return InternalSegmentSizeDto.builder()
|
|
|
+ .clusterMetricsWithSegmentSize(resultClusterMetrics)
|
|
|
+ .internalTopicWithSegmentSize(resultTopicMetrics).build();
|
|
|
+ });
|
|
|
+ });
|
|
|
}
|
|
|
-
|
|
|
- private Mono<InternalClusterMetrics> updateClusterSegmentSize (AdminClient client, InternalClusterMetrics clusterMetrics) {
|
|
|
- return ClusterUtil.toMono(client.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
|
|
|
- .map(l -> {
|
|
|
- var replicasInfo = l.values().stream()
|
|
|
- .flatMap(v -> Stream.of(v.values()))
|
|
|
- .flatMap(v -> Stream.of(v.stream().map(s -> s.replicaInfos)))
|
|
|
- .findFirst().orElseThrow().collect(Collectors.toList());
|
|
|
-
|
|
|
- var segmentSize = replicasInfo.stream()
|
|
|
- .mapToLong(t -> t.values().stream()
|
|
|
- .mapToLong(e -> e.size).sum())
|
|
|
- .sum();
|
|
|
-
|
|
|
- var internalBrokerMetrics = clusterMetrics.getInternalBrokerMetrics().entrySet().stream().map(
|
|
|
- e -> {
|
|
|
- var brokerSegmentSize = l.get(e.getKey()).values().stream()
|
|
|
- .mapToLong(v -> v.replicaInfos.values().stream()
|
|
|
- .mapToLong(r -> r.size).sum()).sum();
|
|
|
- InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build();
|
|
|
- return Collections.singletonMap(e.getKey(), tempBrokerMetrics);
|
|
|
- })
|
|
|
- .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
|
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
|
|
|
- return clusterMetrics.toBuilder().segmentSize(segmentSize).internalBrokerMetrics(internalBrokerMetrics.orElseThrow()).build();
|
|
|
- });
|
|
|
- }
|
|
|
}
|