|
@@ -158,7 +158,7 @@ public class KafkaService {
|
|
|
.map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
|
|
|
}
|
|
|
|
|
|
- private Mono<InternalClusterMetrics> getClusterMetrics(KafkaCluster cluster, AdminClient client) {
|
|
|
+ private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
|
|
|
return ClusterUtil.toMono(client.describeCluster().nodes())
|
|
|
.flatMap(brokers ->
|
|
|
ClusterUtil.toMono(client.describeCluster().controller()).map(
|
|
@@ -347,23 +347,23 @@ public class KafkaService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Mono<List<JmxMetric>> getJmxMetric(String clusterName, Integer nodeId) {
|
|
|
+ public List<JmxMetric> getJmxMetric(String clusterName, Node node) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
- .map(c -> getOrCreateAdminClient(c)
|
|
|
- .flatMap(a -> ClusterUtil.toMono(a.getAdminClient().describeCluster().nodes())
|
|
|
- .map(n -> n.stream().filter(s -> s.id() == nodeId).findFirst().orElseThrow().host()))
|
|
|
- .map(host -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), host))).orElseThrow();
|
|
|
+ .map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host())).orElseThrow();
|
|
|
}
|
|
|
|
|
|
private Mono<InternalClusterMetrics> fillJmxMetrics (InternalClusterMetrics internalClusterMetrics, String clusterName) {
|
|
|
return Flux.fromIterable(internalClusterMetrics.getInternalBrokerMetrics().keySet())
|
|
|
- .flatMap(id -> getJmxMetric(clusterName, id)
|
|
|
+ .flatMap(id -> getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
|
|
|
+ .flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().describeCluster().nodes()))
|
|
|
+ .map(node -> getJmxMetric(clusterName, node.stream().filter(n -> n.id() == id).findFirst().orElseThrow()))
|
|
|
.map(jmx -> {
|
|
|
var jmxMetric = internalClusterMetrics.getInternalBrokerMetrics().get(id).toBuilder().jmxMetrics(jmx).build();
|
|
|
var tempBrokerMetrics = internalClusterMetrics.getInternalBrokerMetrics();
|
|
|
tempBrokerMetrics.put(id, jmxMetric);
|
|
|
return internalClusterMetrics.toBuilder().internalBrokerMetrics(tempBrokerMetrics).build();
|
|
|
- })).collectList()
|
|
|
+ })
|
|
|
+ ).collectList()
|
|
|
.map(s -> s.stream().reduce((s1, s2) -> {
|
|
|
s1.getInternalBrokerMetrics().putAll(s2.getInternalBrokerMetrics().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
|
|
|
return s1;
|