소스 검색

Added endpoint for getting jmx metric per broker

Roman Nedzvetskiy 5 년 전
부모
커밋
f427d01ea3

+ 2 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

@@ -1,10 +1,7 @@
 package com.provectus.kafka.ui.cluster.mapper;
 package com.provectus.kafka.ui.cluster.mapper;
 
 
 import com.provectus.kafka.ui.cluster.config.ClustersProperties;
 import com.provectus.kafka.ui.cluster.config.ClustersProperties;
-import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
-import com.provectus.kafka.ui.cluster.model.InternalTopic;
-import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.model.*;
 import com.provectus.kafka.ui.model.*;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapper;
 import org.mapstruct.Mapping;
 import org.mapstruct.Mapping;
@@ -26,4 +23,5 @@ public interface ClusterMapper {
     Topic toTopic(InternalTopic topic);
     Topic toTopic(InternalTopic topic);
     TopicDetails toTopicDetails(InternalTopic topic);
     TopicDetails toTopicDetails(InternalTopic topic);
     TopicConfig toTopicConfig(InternalTopicConfig topic);
     TopicConfig toTopicConfig(InternalTopicConfig topic);
+    JmxMetric toJmxMetric(InternalJmxMetric metric);
 }
 }

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

@@ -4,6 +4,7 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.Data;
 
 
 import java.math.BigDecimal;
 import java.math.BigDecimal;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 
 
@@ -24,5 +25,6 @@ public class InternalClusterMetrics {
     private final int segmentCount;
     private final int segmentCount;
     private final long segmentSize;
     private final long segmentSize;
     private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
     private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
+    private final List<InternalJmxMetric> jmxMetricsNames;
     private final int zooKeeperStatus;
     private final int zooKeeperStatus;
 }
 }

+ 14 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalJmxMetric.java

@@ -0,0 +1,14 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder(toBuilder = true)
+public class InternalJmxMetric {
+
+    private String name;
+    private String type;
+    private String topic;
+    private String canonicalName;
+}

+ 7 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -124,6 +124,7 @@ public class ClusterService {
                     .map(n -> n.stream().map(node -> {
                     .map(n -> n.stream().map(node -> {
                         Broker broker = new Broker();
                         Broker broker = new Broker();
                         broker.setId(node.idString());
                         broker.setId(node.idString());
+                        broker.setHost(node.host());
                         return broker;
                         return broker;
                     }).collect(Collectors.toList())))
                     }).collect(Collectors.toList())))
                 .flatMapMany(Flux::fromIterable);
                 .flatMapMany(Flux::fromIterable);
@@ -154,7 +155,11 @@ public class ClusterService {
 
 
     }
     }
 
 
-    public Flux<JmxMetrics> getJmxMetricsNames() {
-
+    public Mono<JmxMetric> getJmxMetric(String clusterName, Integer nodeId, JmxMetric metric) {
+        return clustersStorage.getClusterByName(clusterName)
+                .map(c -> kafkaService.getOrCreateAdminClient(c)
+                        .flatMap(a -> ClusterUtil.toMono(a.getAdminClient().describeCluster().nodes())
+                                    .map(n -> n.stream().filter(s -> s.id() == nodeId).findFirst().orElseThrow().host()))
+                        .map(host -> kafkaService.getJmxMetric(c, c.getJmxPort(), host, metric))).orElseThrow();
     }
     }
 }
 }

+ 43 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.cluster.util;
 package com.provectus.kafka.ui.cluster.util;
 
 
+import com.provectus.kafka.ui.cluster.model.InternalJmxMetric;
+import com.provectus.kafka.ui.model.JmxMetric;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.commons.pool2.KeyedObjectPool;
@@ -8,12 +10,12 @@ import org.springframework.stereotype.Component;
 import javax.management.*;
 import javax.management.*;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnector;
 import java.io.IOException;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.net.MalformedURLException;
 import java.net.MalformedURLException;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 
 @Component
 @Component
 @Slf4j
 @Slf4j
@@ -30,20 +32,46 @@ public class JmxClusterUtil {
     private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
     private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
     private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;
     private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;
 
 
-    private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
+    public List<InternalJmxMetric> getJmxMetricsNames(int jmxPort, String jmxHost) {
+        String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
+        List<InternalJmxMetric> result = new ArrayList<>();
+        JMXConnector srv = null;
+        try {
+            srv = pool.borrowObject(jmxUrl);
+            MBeanServerConnection msc = srv.getMBeanServerConnection();
+            var jmxMetrics = msc.queryNames(null, null).stream().filter(q -> q.getCanonicalName().startsWith("kafka.server")).collect(Collectors.toList());
+            jmxMetrics.forEach(j -> {
+                InternalJmxMetric.InternalJmxMetricBuilder internalMetric = InternalJmxMetric.builder();
+                internalMetric.name(j.getKeyPropertyList().computeIfAbsent("name", s -> null));
+                internalMetric.topic(j.getKeyPropertyList().computeIfAbsent("topic", s -> null));
+                internalMetric.type(j.getKeyPropertyList().computeIfAbsent("type", s -> null));
+                internalMetric.canonicalName(j.getCanonicalName());
+                result.add(internalMetric.build());
+            });
+        } catch (IOException ioe) {
+            log.error("Cannot get jmxMetricsNames, {}", jmxUrl, ioe);
+        } catch (Exception e) {
+            log.error("Cannot get JmxConnection from pool, {}", jmxUrl, e);
+        }
+        return result;
+    }
 
 
-    public Map<String, BigDecimal> getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) {
+    public JmxMetric getJmxMetric(int jmxPort, String jmxHost, String canonicalName) {
         String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
         String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
-        Map<String, BigDecimal> result = new HashMap<>();
+
+        var result = new JmxMetric();
         JMXConnector srv = null;
         JMXConnector srv = null;
         try {
         try {
             srv = pool.borrowObject(jmxUrl);
             srv = pool.borrowObject(jmxUrl);
             MBeanServerConnection msc = srv.getMBeanServerConnection();
             MBeanServerConnection msc = srv.getMBeanServerConnection();
-            ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
-                    new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
-            for (String attrName : attrNames) {
-                result.put(attrName, BigDecimal.valueOf((Double) msc.getAttribute(name, attrName)));
+            Map<String, Object> resultAttr = new HashMap<>();
+            ObjectName name = new ObjectName(canonicalName);
+            var attrNames = msc.getMBeanInfo(name).getAttributes();
+            for (MBeanAttributeInfo attrName : attrNames) {
+                resultAttr.put(attrName.getName(), msc.getAttribute(name, attrName.getName()));
             }
             }
+            result.setCanonicalName(canonicalName);
+            result.setValue(resultAttr);
             pool.returnObject(jmxUrl, srv);
             pool.returnObject(jmxUrl, srv);
         } catch (MalformedURLException url) {
         } catch (MalformedURLException url) {
             log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
             log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
@@ -71,4 +99,10 @@ public class JmxClusterUtil {
             log.error("Cannot invalidate object in pool, {}", url);
             log.error("Cannot invalidate object in pool, {}", url);
         }
         }
     }
     }
+
+    public String getParamFromName(String param, String name) {
+        int paramValueBeginIndex = name.indexOf(param) + param.length() + 1;
+        int paramValueEndIndex = name.indexOf(',', paramValueBeginIndex);
+        return paramValueEndIndex != -1 ? name.substring(paramValueBeginIndex, paramValueEndIndex) : name.substring(paramValueBeginIndex);
+    }
 }
 }

+ 20 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -3,10 +3,7 @@ package com.provectus.kafka.ui.kafka;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
 import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
-import com.provectus.kafka.ui.model.ConsumerGroup;
-import com.provectus.kafka.ui.model.ServerStatus;
-import com.provectus.kafka.ui.model.Topic;
-import com.provectus.kafka.ui.model.TopicFormData;
+import com.provectus.kafka.ui.model.*;
 import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
@@ -26,7 +23,6 @@ import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 import reactor.util.function.Tuples;
 
 
-import java.math.BigDecimal;
 import java.util.*;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
@@ -166,12 +162,14 @@ public class KafkaService {
                         c -> {
                         c -> {
                             InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
                             InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
                             metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
                             metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
-                            Map<String, BigDecimal> bytesInPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_IN_PER_SEC);
-                            Map<String, BigDecimal> bytesOutPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_OUT_PER_SEC);
+                            List<InternalJmxMetric> metrics = jmxClusterUtil.getJmxMetricsNames(cluster.getJmxPort(), c.host());
+
+//                            Map<String, BigDecimal> bytesOutPerSec = jmxClusterUtil.getJmxTrafficMetrics(cluster.getJmxPort(), c.host(), JmxClusterUtil.BYTES_OUT_PER_SEC);
                             metricsBuilder
                             metricsBuilder
                                     .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
                                     .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
-                                    .bytesOutPerSec(bytesOutPerSec)
-                                    .bytesInPerSec(bytesInPerSec);
+//                                    .bytesOutPerSec(bytesOutPerSec)
+//                                    .bytesInPerSec(bytesInPerSec)
+                                    .jmxMetricsNames(metrics);
                             return metricsBuilder.build();
                             return metricsBuilder.build();
                         }
                         }
                     )
                     )
@@ -351,4 +349,17 @@ public class KafkaService {
                 })
                 })
             );
             );
     }
     }
+
+    public JmxMetric getJmxMetric (KafkaCluster cluster, int jmxPort, String host, JmxMetric metric) {
+        var jmxMetric = cluster.getMetrics().getJmxMetricsNames().stream().filter(c -> {
+            var foundTopic = false;
+            var found = jmxClusterUtil.getParamFromName("name", metric.getCanonicalName()).equals(c.getName())
+                        && jmxClusterUtil.getParamFromName("type", metric.getCanonicalName()).equals(c.getType());
+            if (found && c.getTopic() != null) {
+                foundTopic = c.getTopic().equals(jmxClusterUtil.getParamFromName("topic", metric.getCanonicalName()));
+            }
+            return found && foundTopic;
+        }).findFirst().orElseThrow();
+        return jmxClusterUtil.getJmxMetric(jmxPort, host, jmxMetric.getCanonicalName());
+    }
 }
 }

+ 5 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -5,7 +5,6 @@ import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
 import com.provectus.kafka.ui.cluster.service.ClusterService;
 import com.provectus.kafka.ui.cluster.service.ClusterService;
 import com.provectus.kafka.ui.model.*;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
-
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.http.ResponseEntity;
@@ -14,12 +13,11 @@ import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
+import javax.validation.Valid;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.function.Function;
 import java.util.function.Function;
 
 
-import javax.validation.Valid;
-
 @RestController
 @RestController
 @RequiredArgsConstructor
 @RequiredArgsConstructor
 public class MetricsRestController implements ApiClustersApi {
 public class MetricsRestController implements ApiClustersApi {
@@ -101,8 +99,10 @@ public class MetricsRestController implements ApiClustersApi {
     }
     }
 
 
     @Override
     @Override
-    public Mono<ResponseEntity<Flux<JmxMetrics>>> getJmxMetricsNames(String clusterName, String brokerId, ServerWebExchange exchange){
-        return clusterService;
+    public Mono<ResponseEntity<JmxMetric>> getJmxMetric(String clusterName, Integer host, Mono<JmxMetric> metric, ServerWebExchange exchange){
+        return metric
+                .flatMap(m -> clusterService.getJmxMetric(clusterName, host, m)
+                        .map(ResponseEntity::ok));
     }
     }
 
 
     private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {
     private Mono<ConsumerPosition> parseConsumerPosition(SeekType seekType, List<String> seekTo) {

+ 24 - 13
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -264,12 +264,12 @@ paths:
               schema:
               schema:
                 $ref: '#/components/schemas/ConsumerGroupDetails'
                 $ref: '#/components/schemas/ConsumerGroupDetails'
 
 
-  /api/clusters/{clusterName}/brokers/{brokerId}:
-    get:
+  /api/clusters/{clusterName}/metrics/brokers/{brokerId}:
+    post:
       tags:
       tags:
         - /api/clusters
         - /api/clusters
-      summary: get all JmxMetricsNames
-      operationId: getJmxMetricsNames
+      summary: get specific JmxMetric
+      operationId: getJmxMetric
       parameters:
       parameters:
         - name: clusterName
         - name: clusterName
           in: path
           in: path
@@ -280,16 +280,19 @@ paths:
           in: path
           in: path
           required: true
           required: true
           schema:
           schema:
-            type: string
+            type: integer
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/JmxMetric'
       responses:
       responses:
         200:
         200:
           description: OK
           description: OK
           content:
           content:
             application/json:
             application/json:
               schema:
               schema:
-                type: array
-                items:
-                  $ref: '#/components/schemas/JmxMetrics'
+                $ref: '#/components/schemas/JmxMetric'
 
 
   /api/clusters/{clusterName}/consumerGroups:
   /api/clusters/{clusterName}/consumerGroups:
     get:
     get:
@@ -338,6 +341,10 @@ components:
           type: object
           type: object
           additionalProperties:
           additionalProperties:
             type: number
             type: number
+        jmxMetricsNames:
+          type: array
+          items:
+            $ref: '#/components/schemas/JmxMetric'
       required:
       required:
         - id
         - id
         - name
         - name
@@ -452,6 +459,8 @@ components:
       properties:
       properties:
         id:
         id:
           type: string
           type: string
+        host:
+          type: string
 
 
     ConsumerGroup:
     ConsumerGroup:
       type: object
       type: object
@@ -539,10 +548,12 @@ components:
           items:
           items:
             $ref: '#/components/schemas/ConsumerTopicPartitionDetail'
             $ref: '#/components/schemas/ConsumerTopicPartitionDetail'
 
 
-    JmxMetrics:
+    JmxMetric:
       type: object
       type: object
       properties:
       properties:
-        metricName:
-          type: array
-          items:
-            type: string
+        canonicalName:
+          type: string
+        value:
+          type: object
+          additionalProperties:
+            type: object