|
@@ -248,7 +248,7 @@ public class KafkaService {
|
|
|
.flatMap(s -> ClusterUtil.toMono(ac.getAdminClient()
|
|
|
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
|
|
|
.map(s -> s.values().stream()
|
|
|
- .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList())));
|
|
|
+ .map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())));
|
|
|
}
|
|
|
|
|
|
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
|
|
@@ -347,26 +347,6 @@ public class KafkaService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Mono<List<JmxMetric>> getClusterJmxMetric(String clusterName) {
|
|
|
- return clustersStorage.getClusterByName(clusterName)
|
|
|
- .map(c -> getOrCreateAdminClient(c)
|
|
|
- .flatMap(eac -> ClusterUtil.toMono(eac.getAdminClient().describeCluster().nodes()))
|
|
|
- .flatMapIterable(n -> n.stream().flatMap(node -> Stream.of(node.host())).collect(Collectors.toList()))
|
|
|
- .map(host -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), host))
|
|
|
- .collectList()
|
|
|
- .map(s -> s.stream().reduce((s1, s2) -> {
|
|
|
- s1.forEach(j1 -> {
|
|
|
- s2.forEach(j2 -> {
|
|
|
- if (j1.getCanonicalName().equals(j2.getCanonicalName())) {
|
|
|
- j1.getValue().keySet().forEach(k -> j2.getValue().compute(k, (k1, v1) ->
|
|
|
- JmxClusterUtil.metricValueReduce(j1, j2.getValue().get(k1))));
|
|
|
- }
|
|
|
- });
|
|
|
- });
|
|
|
- return s1;
|
|
|
- }).orElseThrow())).orElseThrow();
|
|
|
- }
|
|
|
-
|
|
|
public Mono<List<JmxMetric>> getJmxMetric(String clusterName, Integer nodeId) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
.map(c -> getOrCreateAdminClient(c)
|
|
@@ -390,7 +370,7 @@ public class KafkaService {
|
|
|
}).orElseThrow())
|
|
|
.map(i -> {
|
|
|
var tempMetrics = new HashMap<>(i.getInternalBrokerMetrics());
|
|
|
- tempMetrics.values().stream().flatMap(s -> Stream.of(s.getJmxMetrics())).reduce((s1, s2) -> {
|
|
|
+ var resultMetrics = tempMetrics.values().stream().flatMap(s -> Stream.of(s.getJmxMetrics())).reduce((s1, s2) -> {
|
|
|
s1.forEach(j1 -> {
|
|
|
s2.forEach(j2 -> {
|
|
|
if (j1.getCanonicalName().equals(j2.getCanonicalName())) {
|
|
@@ -400,8 +380,8 @@ public class KafkaService {
|
|
|
});
|
|
|
});
|
|
|
return s1;
|
|
|
- });
|
|
|
- return i.toBuilder().jmxMetrics(tempMetrics.values().stream().findFirst().orElseThrow().getJmxMetrics()).build();
|
|
|
+ }).orElseThrow();
|
|
|
+ return i.toBuilder().jmxMetrics(resultMetrics).build();
|
|
|
});
|
|
|
}
|
|
|
}
|