Переглянути джерело

Fixed after PR and added sum for number cluster metrics by num and persec keywords in canonicalname

Roman Nedzvetskiy 5 роки тому
батько
коміт
4edc5f8428

+ 17 - 17
docker/kafka-clusters-only.yaml

@@ -27,22 +27,22 @@ services:
       JMX_PORT: 9997
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
 
-  # kafka01:
-  #   image: confluentinc/cp-kafka:5.1.0
-  #   depends_on:
-  #     - zookeeper0
-  #   ports:
-  #     - 29093:29093
-  #     - 9999:9999
-  #   environment:
-  #     KAFKA_BROKER_ID: 2
-  #     KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
-  #     KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
-  #     KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
-  #     KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-  #     KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
-  #     JMX_PORT: 9997
-  #     KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
+  kafka01:
+     image: confluentinc/cp-kafka:5.1.0
+     depends_on:
+       - zookeeper0
+     ports:
+       - 29093:29093
+       - 9999:9999
+     environment:
+       KAFKA_BROKER_ID: 2
+       KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
+       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
+       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
+       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
+       JMX_PORT: 9997
+       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
 
   kafka-init-topics0:
     image: confluentinc/cp-kafka:5.1.0
@@ -102,7 +102,7 @@ services:
     depends_on:
       - zookeeper0
       - kafka0
-      # - kafka01
+      - kafka01
     ports:
       - 8085:8085
     environment:

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -39,14 +39,14 @@ public class ClusterService {
     }
 
     public Mono<BrokersMetrics> getBrokersMetrics(String name, Integer id) {
-        return Mono.just(clustersStorage.getClusterByName(name)
+        return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
                 .map(KafkaCluster::getMetrics)
                 .map(s -> {
                     var brokerMetrics = clusterMapper.toBrokerMetrics(s);
                     brokerMetrics.setJmxMetrics(s.getInternalBrokerMetrics().get(id).getJmxMetrics());
                     brokerMetrics.setSegmentZise(Long.valueOf(s.getSegmentSize()).intValue());
                     return brokerMetrics;
-                }).orElseThrow());
+                }));
     }
 
     public List<Topic> getTopics(String name) {

+ 44 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java

@@ -1,8 +1,11 @@
 package com.provectus.kafka.ui.cluster.util;
 
+import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.JmxMetric;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.springframework.stereotype.Component;
 
@@ -36,12 +39,12 @@ public class JmxClusterUtil {
             srv = pool.borrowObject(jmxUrl);
             MBeanServerConnection msc = srv.getMBeanServerConnection();
             var jmxMetrics = msc.queryNames(null, null).stream().filter(q -> q.getCanonicalName().startsWith(KAFKA_SERVER_PARAM)).collect(Collectors.toList());
-            jmxMetrics.forEach(j -> {
+            for (ObjectName jmxMetric : jmxMetrics) {
                 JmxMetric metric = new JmxMetric();
-                metric.setCanonicalName(j.getCanonicalName());
-                metric.setValue(getJmxMetric(jmxPort, jmxHost, j.getCanonicalName()));
+                metric.setCanonicalName(jmxMetric.getCanonicalName());
+                metric.setValue(getJmxMetric(jmxMetric.getCanonicalName(), msc, srv, jmxUrl));
                 result.add(metric);
-            });
+            };
             pool.returnObject(jmxUrl, srv);
         } catch (IOException ioe) {
             log.error("Cannot get jmxMetricsNames, {}", jmxUrl, ioe);
@@ -53,26 +56,17 @@ public class JmxClusterUtil {
         return result;
     }
 
-    private Map<String, BigDecimal> getJmxMetric(int jmxPort, String jmxHost, String canonicalName) {
-        String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
-
+    private Map<String, BigDecimal> getJmxMetric(String canonicalName, MBeanServerConnection msc, JMXConnector srv, String jmxUrl) {
         Map<String, BigDecimal> resultAttr = new HashMap<>();
-        JMXConnector srv = null;
         try {
-            srv = pool.borrowObject(jmxUrl);
-            MBeanServerConnection msc = srv.getMBeanServerConnection();
-
             ObjectName name = new ObjectName(canonicalName);
             var attrNames = msc.getMBeanInfo(name).getAttributes();
             for (MBeanAttributeInfo attrName : attrNames) {
                 var value = msc.getAttribute(name, attrName.getName());
-                if (value instanceof BigDecimal) {
-                    resultAttr.put(attrName.getName(), (BigDecimal) value);
-                } else if (value instanceof Integer) {
-                    resultAttr.put(attrName.getName(), new BigDecimal((Integer) value));
+                if (value instanceof Number) {
+                    resultAttr.put(attrName.getName(), new BigDecimal(value.toString()));
                 }
             }
-            pool.returnObject(jmxUrl, srv);
         } catch (MalformedURLException url) {
             log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
             closeConnectionExceptionally(jmxUrl, srv);
@@ -100,11 +94,39 @@ public class JmxClusterUtil {
         }
     }
 
-    public static BigDecimal metricValueSum(Number value1, Number value2) {
-        if (value1 instanceof Integer) {
-            return new BigDecimal(value1.toString()).add(new BigDecimal(value2.toString()));
-        } else {
-            return new BigDecimal(value1.longValue()).add(new BigDecimal(value2.longValue()));
-        }
+    private static Pair<String, String> getCanonicalAndMetricName(String value) {
+        return Pair.of(value.substring(0, value.indexOf("+")), value.substring(value.indexOf("+") + 1));
+    }
+
+    public static List<Pair<String, Pair<String, BigDecimal>>> squashIntoNameMetricPair(InternalClusterMetrics internalClusterMetrics) {
+        return internalClusterMetrics.getInternalBrokerMetrics().values().stream()
+                .map(c ->
+                        c.getJmxMetrics().stream()
+                                .filter(j -> StringUtils.containsIgnoreCase(j.getCanonicalName(), "num") || StringUtils.containsIgnoreCase(j.getCanonicalName(), "persec"))
+                                .map(j -> j.getValue().entrySet().stream()
+                                        .map(e -> Pair.of(j.getCanonicalName() + "+" + e.getKey(), e.getValue()))
+                                        .collect(Collectors.toList()))
+                                .collect(Collectors.toList())
+                )
+                .collect(Collectors.toList())
+                .stream().flatMap(List::stream).collect(Collectors.toList())
+                .stream().flatMap(List::stream).collect(Collectors.toList())
+                .stream().map(c -> {
+            var pairNames = JmxClusterUtil.getCanonicalAndMetricName(c.getKey());
+            return Pair.of(pairNames.getKey(), Pair.of(pairNames.getValue(), c.getValue()));
+        }).collect(Collectors.toList());
+    }
+
+    public static JmxMetric reduceJmxMetrics (List<JmxMetric> metrics) {
+        var result = List.copyOf(metrics);
+        return result.stream().reduce((j1, j2) -> {
+            var temp1 = new HashMap<>(j1.getValue());
+            var temp2 = new HashMap<>(j2.getValue());
+            temp2.forEach((k, v) -> temp1.merge(k, v, BigDecimal::add));
+            var mergedMetric = new JmxMetric();
+            mergedMetric.setCanonicalName(j1.getCanonicalName());
+            mergedMetric.setValue(temp1);
+            return mergedMetric;
+        }).orElse(null);
     }
 }

+ 20 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -49,7 +49,7 @@ public class KafkaService {
         return getOrCreateAdminClient(cluster)
                 .flatMap(
                 ac -> getClusterMetrics(ac.getAdminClient())
-                        .flatMap(i -> fillJmxMetrics(i, cluster.getName()))
+                        .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
                         .flatMap( clusterMetrics ->
                             getTopicsData(ac.getAdminClient()).flatMap( topics ->
                                 loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
@@ -346,20 +346,33 @@ public class KafkaService {
 
     public List<JmxMetric> getJmxMetric(String clusterName, Node node) {
         return clustersStorage.getClusterByName(clusterName)
-                        .map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host())).orElseThrow();
+                        .map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host())).orElse(Collections.emptyList());
     }
 
-    private Mono<InternalClusterMetrics> fillJmxMetrics (InternalClusterMetrics internalClusterMetrics, String clusterName) {
-        return fillBrokerMetrics(internalClusterMetrics, clusterName);
+    private Mono<InternalClusterMetrics> fillJmxMetrics (InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) {
+        return fillBrokerMetrics(internalClusterMetrics, clusterName, ac).map(this::calculateClusterMetrics);
     }
 
-    private Mono<InternalClusterMetrics> fillBrokerMetrics(InternalClusterMetrics internalClusterMetrics, String clusterName) {
-        return getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
-                .flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().describeCluster().nodes()))
+    private Mono<InternalClusterMetrics> fillBrokerMetrics(InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) {
+        return ClusterUtil.toMono(ac.describeCluster().nodes())
                 .flatMapIterable(nodes -> nodes)
                 .map(broker -> Map.of(broker.id(), InternalBrokerMetrics.builder().
                             jmxMetrics(getJmxMetric(clusterName, broker)).build()))
                 .collectList()
                 .map(s -> internalClusterMetrics.toBuilder().internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build());
     }
+
+    private InternalClusterMetrics calculateClusterMetrics(InternalClusterMetrics internalClusterMetrics) {
+        return internalClusterMetrics.toBuilder().jmxMetrics(
+                    JmxClusterUtil.squashIntoNameMetricPair(internalClusterMetrics)
+                            .stream().map(c -> {
+                        JmxMetric jmx = new JmxMetric();
+                        jmx.setCanonicalName(c.getKey());
+                        jmx.setValue(Map.of(c.getValue().getKey(), c.getValue().getValue()));
+                        return jmx;
+                    }).collect(Collectors.groupingBy(JmxMetric::getCanonicalName, Collectors.toList()))
+                    .values().stream().map(JmxClusterUtil::reduceJmxMetrics)
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList())).build();
+    }
 }