|
@@ -18,10 +18,8 @@ import reactor.core.publisher.Mono;
|
|
|
import reactor.util.function.Tuple2;
|
|
|
import reactor.util.function.Tuples;
|
|
|
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Properties;
|
|
|
+import javax.swing.*;
|
|
|
+import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
@@ -39,14 +37,14 @@ public class KafkaService {
|
|
|
@SneakyThrows
|
|
|
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
|
|
|
return getOrCreateAdminClient(cluster).flatMap(
|
|
|
- ac -> getClusterState(ac)
|
|
|
+ ac -> getClusterMetrics(ac)
|
|
|
|
|
|
- .flatMap( clusterState ->
|
|
|
+ .flatMap( clusterMetrics ->
|
|
|
getTopicsData(ac).flatMap( topics ->
|
|
|
loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
|
|
|
.map( configs -> mergeWithConfigs(topics, configs))
|
|
|
- .flatMap(it -> updateTopicSegmentSize(ac, clusterState, it))
|
|
|
- ).map( topics -> buildFromData(cluster, clusterState.getClusterMetrics(), topics))
|
|
|
+ .flatMap(it -> updateTopicSegmentSize(ac, clusterMetrics, it))
|
|
|
+ ).map( topics -> buildFromData(cluster, clusterMetrics, topics))
|
|
|
)
|
|
|
).onErrorResume(
|
|
|
e -> Mono.just(cluster.toBuilder()
|
|
@@ -130,20 +128,18 @@ public class KafkaService {
|
|
|
.map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
|
|
|
}
|
|
|
|
|
|
- private Mono<InternalClusterState> getClusterState(AdminClient client) {
|
|
|
+ private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
|
|
|
return ClusterUtil.toMono(client.describeCluster().nodes())
|
|
|
.flatMap(brokers ->
|
|
|
ClusterUtil.toMono(client.describeCluster().controller()).map(
|
|
|
c -> {
|
|
|
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
|
|
|
- InternalClusterState.InternalClusterStateBuilder stateBuilder = InternalClusterState.builder();
|
|
|
metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
|
|
|
// TODO: fill bytes in/out metrics
|
|
|
- stateBuilder
|
|
|
- .brokersIds(brokers.stream().map(Node::id).collect(Collectors.toList()))
|
|
|
- .clusterMetrics(metricsBuilder.build());
|
|
|
+ metricsBuilder
|
|
|
+ .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))));
|
|
|
|
|
|
- return stateBuilder.build();
|
|
|
+ return metricsBuilder.build();
|
|
|
}
|
|
|
)
|
|
|
).flatMap(c -> updateClusterSegmentSize(client, c));
|
|
@@ -251,8 +247,8 @@ public class KafkaService {
|
|
|
.next());
|
|
|
}
|
|
|
|
|
|
- private Mono<Map<String, InternalTopic>> updateTopicSegmentSize(AdminClient ac, InternalClusterState clusterState, Map<String, InternalTopic> internalTopic) {
|
|
|
- return ClusterUtil.toMono(ac.describeLogDirs(clusterState.getBrokersIds()).all())
|
|
|
+ private Mono<Map<String, InternalTopic>> updateTopicSegmentSize(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
|
|
|
+ return ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
|
|
|
.map(l -> {
|
|
|
var topicsInfo = l.values().stream()
|
|
|
.flatMap(v -> Stream.of(v.values()))
|
|
@@ -271,8 +267,8 @@ public class KafkaService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private Mono<InternalClusterState> updateClusterSegmentSize (AdminClient client, InternalClusterState clusterState) {
|
|
|
- return ClusterUtil.toMono(client.describeLogDirs(clusterState.getBrokersIds()).all())
|
|
|
+ private Mono<InternalClusterMetrics> updateClusterSegmentSize (AdminClient client, InternalClusterMetrics clusterMetrics) {
|
|
|
+ return ClusterUtil.toMono(client.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
|
|
|
.map(l -> {
|
|
|
var replicasInfo = l.values().stream()
|
|
|
.flatMap(v -> Stream.of(v.values()))
|
|
@@ -284,8 +280,17 @@ public class KafkaService {
|
|
|
.mapToLong(e -> e.size).sum())
|
|
|
.sum();
|
|
|
|
|
|
- var clusterMetrics = clusterState.getClusterMetrics().toBuilder().segmentSize(segmentSize).build();
|
|
|
- return clusterState.toBuilder().clusterMetrics(clusterMetrics).build();
|
|
|
+ var internalBrokerMetrics = clusterMetrics.getInternalBrokerMetrics().entrySet().stream().map(
|
|
|
+ e -> {
|
|
|
+ var brokerSegmentSize = l.get(e.getKey()).values().stream()
|
|
|
+ .mapToLong(v -> v.replicaInfos.values().stream()
|
|
|
+ .mapToLong(r -> r.size).sum()).sum();
|
|
|
+ InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build();
|
|
|
+ return Collections.singletonMap(e.getKey(), tempBrokerMetrics);
|
|
|
+ })
|
|
|
+ .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
|
|
|
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
|
|
|
+ return clusterMetrics.toBuilder().segmentSize(segmentSize).internalBrokerMetrics(internalBrokerMetrics.orElseThrow()).build();
|
|
|
});
|
|
|
- }
|
|
|
+ }
|
|
|
}
|