|
@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
|
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
|
|
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
|
|
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
|
|
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
|
|
|
|
+import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
|
|
import com.provectus.kafka.ui.kafka.KafkaService;
|
|
import com.provectus.kafka.ui.kafka.KafkaService;
|
|
import com.provectus.kafka.ui.model.*;
|
|
import com.provectus.kafka.ui.model.*;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
@@ -12,14 +13,12 @@ import lombok.SneakyThrows;
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
-import org.apache.kafka.common.Node;
|
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.TopicPartition;
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
-import java.math.BigDecimal;
|
|
|
|
import java.util.*;
|
|
import java.util.*;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Stream;
|
|
import java.util.stream.Stream;
|
|
@@ -157,27 +156,27 @@ public class ClusterService {
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- public Mono<JmxMetric> getJmxMetric(String clusterName, Integer nodeId, JmxMetric metric) {
|
|
|
|
|
|
+ public Mono<JmxMetric> getJmxMetric(String clusterName, Integer nodeId, String canonicalName) {
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
.map(c -> kafkaService.getOrCreateAdminClient(c)
|
|
.map(c -> kafkaService.getOrCreateAdminClient(c)
|
|
.flatMap(a -> ClusterUtil.toMono(a.getAdminClient().describeCluster().nodes())
|
|
.flatMap(a -> ClusterUtil.toMono(a.getAdminClient().describeCluster().nodes())
|
|
.map(n -> n.stream().filter(s -> s.id() == nodeId).findFirst().orElseThrow().host()))
|
|
.map(n -> n.stream().filter(s -> s.id() == nodeId).findFirst().orElseThrow().host()))
|
|
- .map(host -> kafkaService.getJmxMetric(c, c.getJmxPort(), host, metric))).orElseThrow();
|
|
|
|
|
|
+ .map(host -> kafkaService.getJmxMetric(c, c.getJmxPort(), host, canonicalName))).orElseThrow();
|
|
}
|
|
}
|
|
|
|
|
|
- public Mono<JmxMetric> getClusterJmxMetric(String clusterName, JmxMetric metric) {
|
|
|
|
|
|
+ public Mono<JmxMetric> getClusterJmxMetric(String clusterName, String canonicalName) {
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
.map(c -> kafkaService.getOrCreateAdminClient(c)
|
|
.map(c -> kafkaService.getOrCreateAdminClient(c)
|
|
.flatMap(eac -> ClusterUtil.toMono(eac.getAdminClient().describeCluster().nodes()))
|
|
.flatMap(eac -> ClusterUtil.toMono(eac.getAdminClient().describeCluster().nodes()))
|
|
.flatMapIterable(n -> n.stream().flatMap(node -> Stream.of(node.host())).collect(Collectors.toList()))
|
|
.flatMapIterable(n -> n.stream().flatMap(node -> Stream.of(node.host())).collect(Collectors.toList()))
|
|
- .map(host -> kafkaService.getJmxMetric(c, c.getJmxPort(), host, metric))
|
|
|
|
|
|
+ .map(host -> kafkaService.getJmxMetric(c, c.getJmxPort(), host, canonicalName))
|
|
.collectList()
|
|
.collectList()
|
|
- .map(s -> s.stream().filter(metric1 -> metric1.getCanonicalName().equals(metric.getCanonicalName()))
|
|
|
|
|
|
+ .map(s -> s.stream().filter(metric1 -> JmxClusterUtil.metricNamesEquals(metric1.getCanonicalName(), canonicalName))
|
|
.collect(Collectors.toList())
|
|
.collect(Collectors.toList())
|
|
.stream().reduce((jmx, jmx1) -> {
|
|
.stream().reduce((jmx, jmx1) -> {
|
|
if (jmx.getCanonicalName().equals(jmx1.getCanonicalName())) {
|
|
if (jmx.getCanonicalName().equals(jmx1.getCanonicalName())) {
|
|
jmx.getValue().keySet().forEach(k -> jmx1.getValue().compute(k, (k1, v1) ->
|
|
jmx.getValue().keySet().forEach(k -> jmx1.getValue().compute(k, (k1, v1) ->
|
|
- ((BigDecimal) v1).add(((BigDecimal) jmx1.getValue().get(k)))));
|
|
|
|
|
|
+ JmxClusterUtil.metricValueReduce(v1, jmx1.getValue().get(k))));
|
|
}
|
|
}
|
|
return jmx;
|
|
return jmx;
|
|
})
|
|
})
|