Explorar el Código

Added metricdto object

Roman Nedzvetskiy hace 5 años
padre
commit
5beb6ae937

+ 0 - 1
docker/kafka-clusters-only.yaml

@@ -79,7 +79,6 @@ services:
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092,PLAIN://localhost:29090
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
       JMX_PORT: 9998
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998
 

+ 14 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java

@@ -0,0 +1,14 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.math.BigDecimal;
+
+@Getter
+@AllArgsConstructor
+public class MetricDto {
+    private String canonicalName;
+    private String metricName;
+    private BigDecimal value;
+}

+ 37 - 30
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java

@@ -1,11 +1,11 @@
 package com.provectus.kafka.ui.cluster.util;
 
+import com.google.common.collect.ImmutableList;
 import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
+import com.provectus.kafka.ui.cluster.model.MetricDto;
 import com.provectus.kafka.ui.model.JmxMetric;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.springframework.stereotype.Component;
 
@@ -18,6 +18,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 @Component
@@ -31,6 +32,16 @@ public class JmxClusterUtil {
     private static final String JMX_SERVICE_TYPE = "jmxrmi";
     private static final String KAFKA_SERVER_PARAM = "kafka.server";
 
+
+    private static final ImmutableList<String> metricsNames = ImmutableList.of(
+        "MessagesInPerSec", "BytesInPerSec", "ReplicationBytesInPerSec", "RequestsPerSec", "ErrorsPerSec", "MessageConversionsPerSec", "BytesOutPerSec", "ReplicationBytesOutPerSec", "NoKeyCompactedTopicRecordsPerSec",
+        "InvalidMagicNumberRecordsPerSec", "InvalidMessageCrcRecordsPerSec", "InvalidOffsetOrSequenceRecordsPerSec",
+        "UncleanLeaderElectionsPerSec", "IsrShrinksPerSec", "IsrExpandsPerSec", "ReassignmentBytesOutPerSec", "ReassignmentBytesInPerSec",
+        "ProduceMessageConversionsPerSec", "FailedFetchRequestsPerSec", "ZooKeeperSyncConnectsPerSec", "BytesRejectedPerSec",
+        "IsrShrinksPerSec", "ReplicationBytesOutPerSec", "ZooKeeperAuthFailuresPerSec", "TotalFetchRequestsPerSec", "FailedIsrUpdatesPerSec",
+        "IncrementalFetchSessionEvictionsPerSec", "FetchMessageConversionsPerSec", "TotalFetchRequestsPerSec", "FailedProduceRequestsPerSec"
+    );
+
     public List<JmxMetric> getJmxMetrics(int jmxPort, String jmxHost) {
         String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
         List<JmxMetric> result = new ArrayList<>();
@@ -64,6 +75,7 @@ public class JmxClusterUtil {
             for (MBeanAttributeInfo attrName : attrNames) {
                 var value = msc.getAttribute(name, attrName.getName());
                 if (value instanceof Number) {
+                    if (!(value instanceof Double) || !((Double) value).isInfinite())
                     resultAttr.put(attrName.getName(), new BigDecimal(value.toString()));
                 }
             }
@@ -94,39 +106,34 @@ public class JmxClusterUtil {
         }
     }
 
-    private static Pair<String, String> getCanonicalAndMetricName(String value) {
-        return Pair.of(value.substring(0, value.indexOf("+")), value.substring(value.indexOf("+") + 1));
-    }
-
-    public static List<Pair<String, Pair<String, BigDecimal>>> squashIntoNameMetricPair(InternalClusterMetrics internalClusterMetrics) {
+    public static List<MetricDto> squashIntoNameMetricPair(InternalClusterMetrics internalClusterMetrics) {
         return internalClusterMetrics.getInternalBrokerMetrics().values().stream()
                 .map(c ->
                         c.getJmxMetrics().stream()
-                                .filter(j -> StringUtils.containsIgnoreCase(j.getCanonicalName(), "num") || StringUtils.containsIgnoreCase(j.getCanonicalName(), "persec"))
+                                .filter(j -> isSameMetric(j.getCanonicalName()))
                                 .map(j -> j.getValue().entrySet().stream()
-                                        .map(e -> Pair.of(j.getCanonicalName() + "+" + e.getKey(), e.getValue()))
-                                        .collect(Collectors.toList()))
-                                .collect(Collectors.toList())
-                )
-                .collect(Collectors.toList())
-                .stream().flatMap(List::stream).collect(Collectors.toList())
-                .stream().flatMap(List::stream).collect(Collectors.toList())
-                .stream().map(c -> {
-            var pairNames = JmxClusterUtil.getCanonicalAndMetricName(c.getKey());
-            return Pair.of(pairNames.getKey(), Pair.of(pairNames.getValue(), c.getValue()));
-        }).collect(Collectors.toList());
+                                        .map(e -> new MetricDto(j.getCanonicalName(), e.getKey(), e.getValue()))))
+                .flatMap(Function.identity()).flatMap(Function.identity()).collect(Collectors.toList());
     }
 
-    public static JmxMetric reduceJmxMetrics (List<JmxMetric> metrics) {
-        var result = List.copyOf(metrics);
-        return result.stream().reduce((j1, j2) -> {
-            var temp1 = new HashMap<>(j1.getValue());
-            var temp2 = new HashMap<>(j2.getValue());
-            temp2.forEach((k, v) -> temp1.merge(k, v, BigDecimal::add));
-            var mergedMetric = new JmxMetric();
-            mergedMetric.setCanonicalName(j1.getCanonicalName());
-            mergedMetric.setValue(temp1);
-            return mergedMetric;
-        }).orElse(null);
+    public static JmxMetric reduceJmxMetrics (JmxMetric metric1, JmxMetric metric2) {
+        var result = new JmxMetric();
+        Map<String, BigDecimal> jmx1 = new HashMap<>(metric1.getValue());
+        Map<String, BigDecimal> jmx2 = new HashMap<>(metric2.getValue());
+        jmx1.forEach((k, v) -> jmx2.merge(k, v, BigDecimal::add));
+        result.setCanonicalName(metric1.getCanonicalName());
+        result.setValue(jmx2);
+        return result;
+    }
+
+    private static boolean isSameMetric (String metric) {
+        if (metric.contains("name=")) {
+            int beginIndex = metric.indexOf("name=");
+            int endIndex = metric.indexOf(',', beginIndex);
+            endIndex = endIndex < 0 ? metric.length() - 1 : endIndex;
+            return metricsNames.contains(metric.substring(beginIndex + 5, endIndex));
+        } else {
+            return false;
+        }
     }
 }

+ 6 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -367,12 +367,13 @@ public class KafkaService {
                     JmxClusterUtil.squashIntoNameMetricPair(internalClusterMetrics)
                             .stream().map(c -> {
                         JmxMetric jmx = new JmxMetric();
-                        jmx.setCanonicalName(c.getKey());
-                        jmx.setValue(Map.of(c.getValue().getKey(), c.getValue().getValue()));
+                        jmx.setCanonicalName(c.getCanonicalName());
+                        jmx.setValue(Map.of(c.getMetricName(), c.getValue()));
                         return jmx;
-                    }).collect(Collectors.groupingBy(JmxMetric::getCanonicalName, Collectors.toList()))
-                    .values().stream().map(JmxClusterUtil::reduceJmxMetrics)
-                    .filter(Objects::nonNull)
+                    }).collect(Collectors.groupingBy(JmxMetric::getCanonicalName, Collectors.reducing(JmxClusterUtil::reduceJmxMetrics)))
+                    .values().stream()
+                    .filter(Optional::isPresent)
+                    .map(Optional::get)
                     .collect(Collectors.toList())).build();
     }
 }