|
@@ -352,36 +352,22 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
private Mono<InternalClusterMetrics> fillJmxMetrics (InternalClusterMetrics internalClusterMetrics, String clusterName) {
|
|
|
+ return fillBrokerMetrics(internalClusterMetrics, clusterName);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<InternalClusterMetrics> fillBrokerMetrics(InternalClusterMetrics internalClusterMetrics, String clusterName) {
|
|
|
return getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
|
|
|
.flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().describeCluster().nodes()))
|
|
|
.flatMapIterable(nodes -> nodes)
|
|
|
- .flatMap(broker -> getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
|
|
|
- .flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().describeCluster().nodes()))
|
|
|
- .map(node -> getJmxMetric(clusterName, node.stream().filter(n -> n.id() == broker.id()).findFirst().orElseThrow()))
|
|
|
- .map(jmx -> {
|
|
|
- var jmxMetric = internalClusterMetrics.getInternalBrokerMetrics().get(broker.id()).toBuilder().jmxMetrics(jmx).build();
|
|
|
- var tempBrokerMetrics = internalClusterMetrics.getInternalBrokerMetrics();
|
|
|
- tempBrokerMetrics.put(broker.id(), jmxMetric);
|
|
|
- return internalClusterMetrics.toBuilder().internalBrokerMetrics(tempBrokerMetrics).build();
|
|
|
- })).collectList()
|
|
|
- .map(s -> s.stream().reduce((s1, s2) -> {
|
|
|
- s1.getInternalBrokerMetrics().putAll(s2.getInternalBrokerMetrics().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
|
|
|
- return s1;
|
|
|
- }).orElseThrow())
|
|
|
- .map(i -> {
|
|
|
- var tempMetrics = new HashMap<>(i.getInternalBrokerMetrics());
|
|
|
- var resultMetrics = tempMetrics.values().stream().flatMap(s -> Stream.of(s.getJmxMetrics())).reduce((s1, s2) -> {
|
|
|
- s1.forEach(j1 -> {
|
|
|
- s2.forEach(j2 -> {
|
|
|
- if (j1.getCanonicalName().equals(j2.getCanonicalName())) {
|
|
|
- j1.getValue().keySet().forEach(k -> j2.getValue().compute(k, (k1, v1) ->
|
|
|
- JmxClusterUtil.metricValueReduce(j1, j2.getValue().get(k1))));
|
|
|
- }
|
|
|
- });
|
|
|
- });
|
|
|
- return s1;
|
|
|
- }).orElseThrow();
|
|
|
- return i.toBuilder().jmxMetrics(resultMetrics).build();
|
|
|
+ .map(broker -> {
|
|
|
+ var jmx = getJmxMetric(clusterName, broker);
|
|
|
+ Map<Integer, InternalBrokerMetrics> result = new HashMap<>();
|
|
|
+ result.put(broker.id(), internalClusterMetrics.getInternalBrokerMetrics().get(broker.id()).toBuilder().jmxMetrics(jmx).build());
|
|
|
+ return result;
|
|
|
+ })
|
|
|
+ .collectList()
|
|
|
+ .map(s -> { var brokerMetrics = ClusterUtil.toSingleMap(s.stream());
|
|
|
+ return internalClusterMetrics.toBuilder().internalBrokerMetrics(brokerMetrics).build();
|
|
|
});
|
|
|
}
|
|
|
}
|