Explorar o código

Changed to all values in metrics

Roman Nedzvetskiy %!s(int64=5) %!d(string=hai) anos
pai
achega
0c72e079aa

+ 1 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

@@ -12,7 +12,7 @@ public interface ClusterMapper {
     @Mapping(target = "brokerCount", source = "metrics.brokerCount")
     @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
     @Mapping(target = "topicCount", source = "metrics.topicCount")
-    @Mapping(target = "jmxMetricsNames", source = "metrics.jmxMetricsNames")
+    @Mapping(target = "jmxMetrics", source = "metrics.jmxMetrics")
     Cluster toCluster(KafkaCluster cluster);
 
     KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
@@ -20,5 +20,4 @@ public interface ClusterMapper {
     Topic toTopic(InternalTopic topic);
     TopicDetails toTopicDetails(InternalTopic topic);
     TopicConfig toTopicConfig(InternalTopicConfig topic);
-    JmxMetric toJmxMetric(InternalJmxMetric metric);
 }

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.cluster.model;
 
+import com.provectus.kafka.ui.model.JmxMetric;
 import lombok.Builder;
 import lombok.Data;
 
@@ -25,6 +26,6 @@ public class InternalClusterMetrics {
     private final int segmentCount;
     private final long segmentSize;
     private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
-    private final List<InternalJmxMetric> jmxMetricsNames;
+    private final List<JmxMetric> jmxMetrics;
     private final int zooKeeperStatus;
 }

+ 0 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalJmxMetric.java

@@ -1,14 +0,0 @@
-package com.provectus.kafka.ui.cluster.model;
-
-import lombok.Builder;
-import lombok.Data;
-
-@Data
-@Builder(toBuilder = true)
-public class InternalJmxMetric {
-
-    private String name;
-    private String type;
-    private String topic;
-    private String canonicalName;
-}

+ 5 - 30
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -31,6 +31,7 @@ public class ClusterService {
     private final ClusterMapper clusterMapper;
     private final KafkaService kafkaService;
     private final ConsumingService consumingService;
+    private final JmxClusterUtil jmxClusterUtil;
 
     public List<Cluster> getClusters() {
         return clustersStorage.getKafkaClusters()
@@ -39,10 +40,12 @@ public class ClusterService {
                 .collect(Collectors.toList());
     }
 
-    public Optional<BrokersMetrics> getBrokersMetrics(String name) {
+    public Mono<BrokersMetrics> getBrokersMetrics(String name, Integer id) {
         return clustersStorage.getClusterByName(name)
                 .map(KafkaCluster::getMetrics)
-                .map(clusterMapper::toBrokerMetrics);
+                .map(s -> kafkaService.getJmxMetric(name, id)
+                        .map(j -> s.toBuilder().jmxMetrics(j).build()))
+                .map(s -> s.map(clusterMapper::toBrokerMetrics)).orElseThrow();
     }
 
     public List<Topic> getTopics(String name) {
@@ -155,32 +158,4 @@ public class ClusterService {
                 .orElse(Flux.empty());
 
     }
-
-    public Mono<JmxMetric> getJmxMetric(String clusterName, Integer nodeId, String canonicalName) {
-        return clustersStorage.getClusterByName(clusterName)
-                .map(c -> kafkaService.getOrCreateAdminClient(c)
-                        .flatMap(a -> ClusterUtil.toMono(a.getAdminClient().describeCluster().nodes())
-                                    .map(n -> n.stream().filter(s -> s.id() == nodeId).findFirst().orElseThrow().host()))
-                        .map(host -> kafkaService.getJmxMetric(c, c.getJmxPort(), host, canonicalName))).orElseThrow();
-    }
-
-    public Mono<JmxMetric> getClusterJmxMetric(String clusterName, String canonicalName) {
-        return clustersStorage.getClusterByName(clusterName)
-                .map(c -> kafkaService.getOrCreateAdminClient(c)
-                    .flatMap(eac -> ClusterUtil.toMono(eac.getAdminClient().describeCluster().nodes()))
-                    .flatMapIterable(n -> n.stream().flatMap(node -> Stream.of(node.host())).collect(Collectors.toList()))
-                    .map(host -> kafkaService.getJmxMetric(c, c.getJmxPort(), host, canonicalName))
-                    .collectList()
-                    .map(s -> s.stream().filter(metric1 -> JmxClusterUtil.metricNamesEquals(metric1.getCanonicalName(), canonicalName))
-                            .collect(Collectors.toList())
-                            .stream().reduce((jmx, jmx1) -> {
-                                if (jmx.getCanonicalName().equals(jmx1.getCanonicalName())) {
-                                    jmx.getValue().keySet().forEach(k -> jmx1.getValue().compute(k, (k1, v1) ->
-                                        JmxClusterUtil.metricValueReduce(v1, jmx1.getValue().get(k))));
-                                }
-                            return jmx;
-                            })
-                            .orElseThrow())
-                ).orElseThrow();
-    }
 }

+ 10 - 39
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.cluster.util;
 
-import com.provectus.kafka.ui.cluster.model.InternalJmxMetric;
 import com.provectus.kafka.ui.model.JmxMetric;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -25,21 +24,19 @@ public class JmxClusterUtil {
     private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
     private static final String JMX_SERVICE_TYPE = "jmxrmi";
 
-    public List<InternalJmxMetric> getJmxMetricsNames(int jmxPort, String jmxHost) {
+    public List<JmxMetric> getJmxMetrics(int jmxPort, String jmxHost) {
         String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
-        List<InternalJmxMetric> result = new ArrayList<>();
+        List<JmxMetric> result = new ArrayList<>();
         JMXConnector srv = null;
         try {
             srv = pool.borrowObject(jmxUrl);
             MBeanServerConnection msc = srv.getMBeanServerConnection();
             var jmxMetrics = msc.queryNames(null, null).stream().filter(q -> q.getCanonicalName().startsWith("kafka.server")).collect(Collectors.toList());
             jmxMetrics.forEach(j -> {
-                InternalJmxMetric.InternalJmxMetricBuilder internalMetric = InternalJmxMetric.builder();
-                internalMetric.name(j.getKeyPropertyList().computeIfAbsent("name", s -> null));
-                internalMetric.topic(j.getKeyPropertyList().computeIfAbsent("topic", s -> null));
-                internalMetric.type(j.getKeyPropertyList().computeIfAbsent("type", s -> null));
-                internalMetric.canonicalName(j.getCanonicalName());
-                result.add(internalMetric.build());
+                JmxMetric metric = new JmxMetric();
+                metric.setCanonicalName(j.getCanonicalName());
+                metric.setValue(getJmxMetric(jmxPort, jmxHost, j.getCanonicalName()));
+                result.add(metric);
             });
             pool.returnObject(jmxUrl, srv);
         } catch (IOException ioe) {
@@ -52,22 +49,20 @@ public class JmxClusterUtil {
         return result;
     }
 
-    public JmxMetric getJmxMetric(int jmxPort, String jmxHost, String canonicalName) {
+    private Map<String, Object> getJmxMetric(int jmxPort, String jmxHost, String canonicalName) {
         String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
 
-        var result = new JmxMetric();
+        Map<String, Object> resultAttr = new HashMap<>();
         JMXConnector srv = null;
         try {
             srv = pool.borrowObject(jmxUrl);
             MBeanServerConnection msc = srv.getMBeanServerConnection();
-            Map<String, Object> resultAttr = new HashMap<>();
+
             ObjectName name = new ObjectName(canonicalName);
             var attrNames = msc.getMBeanInfo(name).getAttributes();
             for (MBeanAttributeInfo attrName : attrNames) {
                 resultAttr.put(attrName.getName(), msc.getAttribute(name, attrName.getName()));
             }
-            result.setCanonicalName(canonicalName);
-            result.setValue(resultAttr);
             pool.returnObject(jmxUrl, srv);
         } catch (MalformedURLException url) {
             log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
@@ -85,7 +80,7 @@ public class JmxClusterUtil {
             log.error("Error while retrieving connection {} from pool", jmxUrl);
             closeConnectionExceptionally(jmxUrl, srv);
         }
-        return result;
+        return resultAttr;
     }
 
     private void closeConnectionExceptionally(String url, JMXConnector srv) {
@@ -96,30 +91,6 @@ public class JmxClusterUtil {
         }
     }
 
-    public static String getParamFromName(String param, String name) {
-        if (!name.contains(param)) {
-            return null;
-        }
-        int paramValueBeginIndex = name.indexOf(param) + param.length() + 1;
-        int paramValueEndIndex = name.indexOf(',', paramValueBeginIndex);
-        return paramValueEndIndex != -1 ? name.substring(paramValueBeginIndex, paramValueEndIndex) : name.substring(paramValueBeginIndex);
-    }
-
-    public static boolean metricNamesEquals (String metric1Name, String metric2Name) {
-        boolean result = false;
-        try {
-            result = Objects.equals(getParamFromName("name", metric1Name), getParamFromName("name", metric2Name))
-                    &&
-                    Objects.equals(getParamFromName("type", metric1Name), getParamFromName("type", metric2Name))
-                    &&
-                    getParamFromName("topic", metric1Name) == null ||
-                            Objects.equals(getParamFromName("topic", metric1Name), getParamFromName("topic", metric2Name));
-        } catch (NullPointerException npe) {
-            log.error("Cannot compare {} with {}, npe caught", metric1Name, metric2Name);
-        }
-        return result;
-    }
-
     public static Object metricValueReduce(Object value1, Object value2) {
         if (value1 instanceof Number) {
             return new BigDecimal(value1.toString()).add(new BigDecimal(value2.toString()));

+ 32 - 16
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -42,6 +42,7 @@ public class KafkaService {
     private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
     private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
     private final JmxClusterUtil jmxClusterUtil;
+    private final ClustersStorage clustersStorage;
 
     @SneakyThrows
     public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
@@ -158,18 +159,18 @@ public class KafkaService {
     private Mono<InternalClusterMetrics> getClusterMetrics(KafkaCluster cluster, AdminClient client) {
         return ClusterUtil.toMono(client.describeCluster().nodes())
                 .flatMap(brokers ->
-                    ClusterUtil.toMono(client.describeCluster().controller()).map(
-                        c -> {
+                    ClusterUtil.toMono(client.describeCluster().controller()).flatMap(
+                        c ->
+                            getClusterJmxMetric(cluster.getName()).map(jmxMetric -> {
                             InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
                             metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
-                            List<InternalJmxMetric> metrics = jmxClusterUtil.getJmxMetricsNames(cluster.getJmxPort(), c.host());
                             metricsBuilder
                                     .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
-                                    .jmxMetricsNames(metrics);
+                                    .jmxMetrics(jmxMetric);
                             return metricsBuilder.build();
                         }
                     )
-                );
+                ));
     }
 
 
@@ -346,16 +347,31 @@ public class KafkaService {
             );
     }
 
-    public JmxMetric getJmxMetric (KafkaCluster cluster, int jmxPort, String host, String canonicalName) {
-        var jmxMetric = cluster.getMetrics().getJmxMetricsNames().stream().filter(c -> {
-            var foundTopic = false;
-            var found = JmxClusterUtil.getParamFromName("name", canonicalName).equals(c.getName())
-                        && JmxClusterUtil.getParamFromName("type", canonicalName).equals(c.getType());
-            if (found && c.getTopic() != null) {
-                foundTopic = c.getTopic().equals(JmxClusterUtil.getParamFromName("topic", canonicalName));
-            }
-            return found && foundTopic;
-        }).findFirst().orElseThrow();
-        return jmxClusterUtil.getJmxMetric(jmxPort, host, jmxMetric.getCanonicalName());
+    public Mono<List<JmxMetric>> getClusterJmxMetric(String clusterName) {
+        return clustersStorage.getClusterByName(clusterName)
+                .map(c -> getOrCreateAdminClient(c)
+                        .flatMap(eac -> ClusterUtil.toMono(eac.getAdminClient().describeCluster().nodes()))
+                        .flatMapIterable(n -> n.stream().flatMap(node -> Stream.of(node.host())).collect(Collectors.toList()))
+                        .map(host -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), host))
+                        .collectList()
+                        .map(s -> s.stream().reduce((s1, s2) -> {
+                            s1.forEach(j1 -> {
+                                s2.forEach(j2 -> {
+                                    if (j1.getCanonicalName().equals(j2.getCanonicalName())) {
+                                        j1.getValue().keySet().forEach(k -> j2.getValue().compute(k, (k1, v1) ->
+                                                JmxClusterUtil.metricValueReduce(j1, j2.getValue().get(k1))));
+                                    }
+                                });
+                            });
+                            return s1;
+                        }).orElseThrow())).orElseThrow();
+    }
+
+    public Mono<List<JmxMetric>> getJmxMetric(String clusterName, Integer nodeId) {
+        return clustersStorage.getClusterByName(clusterName)
+                .map(c -> getOrCreateAdminClient(c)
+                        .flatMap(a -> ClusterUtil.toMono(a.getAdminClient().describeCluster().nodes())
+                                .map(n -> n.stream().filter(s -> s.id() == nodeId).findFirst().orElseThrow().host()))
+                        .map(host ->  jmxClusterUtil.getJmxMetrics(c.getJmxPort(), host))).orElseThrow();
     }
 }

+ 3 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -30,12 +30,10 @@ public class MetricsRestController implements ApiClustersApi {
     }
 
     @Override
-    public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterName, ServerWebExchange exchange) {
-        return Mono.just(
-                clusterService.getBrokersMetrics(clusterName)
+    public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
+        return clusterService.getBrokersMetrics(clusterName, id)
                         .map(ResponseEntity::ok)
-                        .orElse(ResponseEntity.notFound().build())
-        );
+                        .onErrorReturn(ResponseEntity.notFound().build());
     }
 
     @Override
@@ -98,18 +96,6 @@ public class MetricsRestController implements ApiClustersApi {
         return clusterService.updateTopic(clusterId, topicName, topicFormData).map(ResponseEntity::ok);
     }
 
-    @Override
-    public Mono<ResponseEntity<JmxMetric>> getBrokerJmxMetric(String clusterName, Integer host, String canonicalName, ServerWebExchange exchange){
-        return
-               clusterService.getJmxMetric(clusterName, host, canonicalName)
-                        .map(ResponseEntity::ok);
-    }
-
-    @Override
-    public Mono<ResponseEntity<JmxMetric>> getClusterJmxMetric(String clusterName, String canonicalName, ServerWebExchange exchange){
-        return clusterService.getClusterJmxMetric(clusterName, canonicalName)
-                        .map(ResponseEntity::ok);
-    }
 
     private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
         return Mono.justOrEmpty(seekTo)

+ 67 - 64
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -52,7 +52,7 @@ paths:
                 items:
                   $ref: '#/components/schemas/Broker'
 
-  /api/clusters/{clusterName}/metrics/broker:
+  /api/clusters/{clusterName}/metrics/broker/{id}:
     get:
       tags:
         - /api/clusters
@@ -64,6 +64,11 @@ paths:
           required: true
           schema:
             type: string
+        - name: id
+          in: path
+          required: true
+          schema:
+            type: integer
       responses:
         200:
           description: OK
@@ -264,60 +269,60 @@ paths:
               schema:
                 $ref: '#/components/schemas/ConsumerGroupDetails'
 
-  /api/clusters/{clusterName}/brokers/{brokerId}/metrics/{canonicalName}:
-    get:
-      tags:
-        - /api/clusters
-      summary: get specific JmxMetric for broker
-      operationId: getBrokerJmxMetric
-      parameters:
-        - name: clusterName
-          in: path
-          required: true
-          schema:
-            type: string
-        - name: brokerId
-          in: path
-          required: true
-          schema:
-            type: integer
-        - name: canonicalName
-          in: path
-          required: true
-          schema:
-            type: string
-      responses:
-        200:
-          description: OK
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/JmxMetric'
+#  /api/clusters/{clusterName}/brokers/{brokerId}/metrics/{canonicalName}:
+#    get:
+#      tags:
+#        - /api/clusters
+#      summary: get specific JmxMetric for broker
+#      operationId: getBrokerJmxMetric
+#      parameters:
+#        - name: clusterName
+#          in: path
+#          required: true
+#          schema:
+#            type: string
+#        - name: brokerId
+#          in: path
+#          required: true
+#          schema:
+#            type: integer
+#        - name: canonicalName
+#          in: path
+#          required: true
+#          schema:
+#            type: string
+#      responses:
+#        200:
+#          description: OK
+#          content:
+#            application/json:
+#              schema:
+#                $ref: '#/components/schemas/JmxMetric'
 
-  /api/clusters/{clusterName}/metrics:
-    get:
-      tags:
-        - /api/clusters
-      summary: get specific JmxMetric for cluster
-      operationId: getClusterJmxMetric
-      parameters:
-        - name: clusterName
-          in: path
-          required: true
-          schema:
-            type: string
-        - name: canonicalName
-          in: path
-          required: true
-          schema:
-            type: string
-      responses:
-        200:
-          description: OK
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/JmxMetric'
+#  /api/clusters/{clusterName}/metrics:
+#    get:
+#      tags:
+#        - /api/clusters
+#      summary: get specific JmxMetric for cluster
+#      operationId: getClusterJmxMetric
+#      parameters:
+#        - name: clusterName
+#          in: path
+#          required: true
+#          schema:
+#            type: string
+#        - name: canonicalName
+#          in: path
+#          required: true
+#          schema:
+#            type: string
+#      responses:
+#        200:
+#          description: OK
+#          content:
+#            application/json:
+#              schema:
+#                $ref: '#/components/schemas/JmxMetric'
 
   /api/clusters/{clusterName}/consumerGroups:
     get:
@@ -358,10 +363,10 @@ components:
           type: integer
         topicCount:
           type: integer
-        jmxMetricsNames:
+        jmxMetrics:
           type: array
           items:
-            $ref: '#/components/schemas/JmxMetricName'
+            $ref: '#/components/schemas/JmxMetric'
       required:
         - id
         - name
@@ -392,6 +397,10 @@ components:
           type: integer
         segmentZise:
           type: integer
+        jmxMetrics:
+          type: array
+          items:
+            $ref: '#/components/schemas/JmxMetric'
 
     Topic:
       type: object
@@ -571,12 +580,6 @@ components:
         canonicalName:
           type: string
         value:
-          type: object
+          type: string
           additionalProperties:
-            type: object
-
-    JmxMetricName:
-      type: object
-      properties:
-        canonicalName:
-          type: string
+            type: object