Explorar o código

Added bytesIn/OutPerSec params for clusterMetrics object

Roman Nedzvetskiy %!s(int64=5) %!d(string=hai) anos
pai
achega
328273157d

+ 25 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/KafkaJmxProperties.java

@@ -0,0 +1,25 @@
+package com.provectus.kafka.ui.cluster.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Configuration
+@ConfigurationProperties("kafka")
+@Data
+public class KafkaJmxProperties {
+    List<Jmx> jmxParams = new ArrayList<>();
+
+    @Data
+    public static class Jmx {
+        private String clusterName;
+        private int brokerId;
+        private int port;
+        private String host;
+        private String url;
+        private String serviceType;
+    }
+}

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

@@ -19,8 +19,8 @@ public class InternalClusterMetrics {
     private final int inSyncReplicasCount;
     private final int outOfSyncReplicasCount;
     //TODO: find way to fill
-    private final int bytesInPerSec;
-    private final int bytesOutPerSec;
+    private final Map<String, String> bytesInPerSec;
+    private final Map<String, String> bytesOutPerSec;
     private final int segmentCount;
     //TODO: find way to fill
     private final long segmentSize;

+ 47 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -3,8 +3,6 @@ package com.provectus.kafka.ui.cluster.util;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.model.*;
 import lombok.extern.slf4j.Slf4j;
-import com.provectus.kafka.ui.model.TopicMessage;
-
 import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -14,13 +12,17 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Bytes;
-
 import reactor.core.publisher.Mono;
 
+import javax.management.*;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.IOException;
+import java.net.MalformedURLException;
 import java.time.Instant;
 import java.time.OffsetDateTime;
 import java.time.ZoneId;
-import java.util.HashMap;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -31,13 +33,20 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
 @Slf4j
 public class ClusterUtil {
 
+    public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
+    public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
+    private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
+    private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;
+
+    private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
+
     private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
 
     private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
 
-    public static <T> Mono<T> toMono(KafkaFuture<T> future){
-        return Mono.create(sink -> future.whenComplete((res, ex)->{
-            if (ex!=null) {
+    public static <T> Mono<T> toMono(KafkaFuture<T> future) {
+        return Mono.create(sink -> future.whenComplete((res, ex) -> {
+            if (ex != null) {
                 sink.error(ex);
             } else {
                 sink.success(res);
@@ -45,9 +54,9 @@ public class ClusterUtil {
         }));
     }
 
-    public static Mono<String> toMono(KafkaFuture<Void> future, String topicName){
-        return Mono.create(sink -> future.whenComplete((res, ex)->{
-            if (ex!=null) {
+    public static Mono<String> toMono(KafkaFuture<Void> future, String topicName) {
+        return Mono.create(sink -> future.whenComplete((res, ex) -> {
+            if (ex != null) {
                 sink.error(ex);
             } else {
                 sink.success(topicName);
@@ -110,7 +119,7 @@ public class ClusterUtil {
                     partitionDto.inSyncReplicasCount(partition.isr().size());
                     partitionDto.replicasCount(partition.replicas().size());
                     List<InternalReplica> replicas = partition.replicas().stream().map(
-                            r -> new InternalReplica(r.id(), partition.leader().id()!=r.id(), partition.isr().contains(r)))
+                            r -> new InternalReplica(r.id(), partition.leader().id() != r.id(), partition.isr().contains(r)))
                             .collect(Collectors.toList());
                     partitionDto.replicas(replicas);
                     return partitionDto.build();
@@ -183,6 +192,7 @@ public class ClusterUtil {
                 throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
         }
     }
+
     public static Mono<Set<ExtendedAdminClient.SupportedFeature>> getSupportedFeatures(AdminClient adminClient) {
         return ClusterUtil.toMono(adminClient.describeCluster().controller())
                 .map(Node::id)
@@ -208,7 +218,7 @@ public class ClusterUtil {
         }
     }
 
-    public static Topic convertToTopic (InternalTopic internalTopic) {
+    public static Topic convertToTopic(InternalTopic internalTopic) {
         Topic topic = new Topic();
         topic.setName(internalTopic.getName());
         List<Partition> partitions = internalTopic.getPartitions().stream().flatMap(s -> {
@@ -226,6 +236,31 @@ public class ClusterUtil {
         return topic;
     }
 
+    public static Map<String, String> getJmxTrafficMetrics(String jmxUrl, String metricName) {
+        Map<String, String> result = new HashMap<>();
+        try {
+            JMXServiceURL url = new JMXServiceURL(jmxUrl);
+            JMXConnector srv = JMXConnectorFactory.connect(url);
+            MBeanServerConnection msc = srv.getMBeanServerConnection();
+            ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
+                    new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
+            for (String attrName : attrNames) {
+                result.put(attrName, msc.getAttribute(name, attrName).toString());
+            }
+        } catch (MalformedURLException url) {
+            log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
+        } catch (IOException io) {
+            log.error("Cannot connect to KafkaJmxServer with url {}", jmxUrl);
+        } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
+            log.error("Cannot find attribute from");
+            log.error(e.getMessage());
+        } catch (MalformedObjectNameException objectNameE) {
+            log.error("Cannot create objectName");
+            log.error(objectNameE.getMessage());
+        }
+        return result;
+    }
+
     public static <T, R> Map<T, R> toSingleMap (Stream<Map<T, R>> streamOfMaps) {
         return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
                 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();

+ 33 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.kafka;
 
+import com.provectus.kafka.ui.cluster.config.KafkaJmxProperties;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.model.ConsumerGroup;
@@ -11,20 +12,25 @@ import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.*;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
-import org.springframework.beans.factory.annotation.Value;
 import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
+import javax.management.MBeanServer;
+import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -43,6 +49,7 @@ public class KafkaService {
     private final ZookeeperService zookeeperService;
     private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
     private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
+    private final KafkaJmxProperties kafkaJmxDto;
 
     @SneakyThrows
     public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
@@ -64,6 +71,18 @@ public class KafkaService {
         );
     }
 
+    public static Consumer<Long, String> createConsumer() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29091");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        Consumer<Long, String> consumer = new KafkaConsumer<>(props);
+        consumer.subscribe(Collections.singletonList("users"));
+        return consumer;
+    }
+
     private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalSegmentSizeDto segmentSizeDto) {
 
         var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
@@ -163,9 +182,17 @@ public class KafkaService {
                         c -> {
                             InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
                             metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
+                            Map<String, String> bytesInPerSec;
+                            Map<String, String> bytesOutPerSec;
+                            var jmx = kafkaJmxDto.getJmxParams().stream().filter(j -> j.getBrokerId() == c.id()).findFirst().orElseThrow();
+                            var jmxUrl = jmx.getUrl() + jmx.getHost() + ":" + jmx.getPort() + "/" + jmx.getServiceType();
+                            bytesInPerSec = ClusterUtil.getJmxTrafficMetrics(jmxUrl, ClusterUtil.BYTES_IN_PER_SEC);
+                            bytesOutPerSec = ClusterUtil.getJmxTrafficMetrics(jmxUrl, ClusterUtil.BYTES_OUT_PER_SEC);
                             // TODO: fill bytes in/out metrics
                             metricsBuilder
-                                    .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))));
+                                    .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))))
+                                    .bytesOutPerSec(bytesOutPerSec)
+                                    .bytesInPerSec(bytesInPerSec);
 
                             return metricsBuilder.build();
                         }
@@ -204,7 +231,11 @@ public class KafkaService {
     }
 
 
+    @SneakyThrows
     public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
+        Consumer<Long, String> kek = createConsumer();
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
         return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
                 .switchIfEmpty(createAdminClient(cluster))
                 .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));

+ 11 - 0
kafka-ui-api/src/main/resources/application-local.yml

@@ -1,4 +1,12 @@
 kafka:
+  jmxParams:
+    -
+      clusterName: local
+      brokerid: 1
+      port: 9997
+      host: localhost
+      url: service:jmx:rmi:///jndi/rmi://
+      serviceType: jmxrmi
   clusters:
     -
       name: local
@@ -15,3 +23,6 @@ kafka:
   admin-client-timeout: 5000
 zookeeper:
   connection-timeout: 1000
+spring:
+  jmx:
+    enabled: true