Browse Source

topic details

Zhenya Taran 5 years ago
parent
commit
9641ca3bea

+ 4 - 4
docker/kafka-clusters-only.yaml

@@ -33,8 +33,8 @@ services:
       - kafka0
       - kafka0
     command: "bash -c 'echo Waiting for Kafka to be ready... && \
     command: "bash -c 'echo Waiting for Kafka to be ready... && \
                 cub kafka-ready -b kafka0:29090 1 20 && \
                 cub kafka-ready -b kafka0:29090 1 20 && \
-                kafka-topics --create --topic users --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
-                kafka-topics --create --topic messages --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181'"
+                kafka-topics --create --topic users --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
+                kafka-topics --create --topic messages --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181'"
     environment:
     environment:
       KAFKA_BROKER_ID: ignored
       KAFKA_BROKER_ID: ignored
       KAFKA_ZOOKEEPER_CONNECT: ignored
       KAFKA_ZOOKEEPER_CONNECT: ignored
@@ -72,8 +72,8 @@ services:
       - kafka1
       - kafka1
     command: "bash -c 'echo Waiting for Kafka to be ready... && \
     command: "bash -c 'echo Waiting for Kafka to be ready... && \
                 cub kafka-ready -b kafka1:29090 1 20 && \
                 cub kafka-ready -b kafka1:29090 1 20 && \
-                kafka-topics --create --topic users --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
-                kafka-topics --create --topic messages --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181'"
+                kafka-topics --create --topic users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+                kafka-topics --create --topic messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181'"
     environment:
     environment:
       KAFKA_BROKER_ID: ignored
       KAFKA_BROKER_ID: ignored
       KAFKA_ZOOKEEPER_CONNECT: ignored
       KAFKA_ZOOKEEPER_CONNECT: ignored

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClustersStorage.java

@@ -10,6 +10,8 @@ import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 
 
+import static com.provectus.kafka.ui.cluster.model.MetricsConstants.CLUSTER_ID;
+
 @Component
 @Component
 @RequiredArgsConstructor
 @RequiredArgsConstructor
 public class ClustersStorage {
 public class ClustersStorage {
@@ -30,4 +32,11 @@ public class ClustersStorage {
     public List<KafkaCluster> getKafkaClusters() {
     public List<KafkaCluster> getKafkaClusters() {
         return kafkaClusters;
         return kafkaClusters;
     }
     }
+
+    public KafkaCluster getClusterById(String clusterId) {
+        return kafkaClusters.stream()
+                .filter(cltr -> cltr.getMetricsMap().get(CLUSTER_ID).equals(clusterId))
+                .findFirst()
+                .orElseThrow();
+    }
 }
 }

+ 10 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -28,7 +28,7 @@ public class KafkaCluster {
 
 
     Map<String, String> metricsMap = new ConcurrentHashMap<>();
     Map<String, String> metricsMap = new ConcurrentHashMap<>();
     List<Topic> topics = new ArrayList<>();
     List<Topic> topics = new ArrayList<>();
-    List<TopicDetails> topicDetails = new ArrayList<>();
+    private Map<String, TopicDetails> topicDetailsMap = new ConcurrentHashMap<>();
 
 
     MBeanServerConnection mBeanServerConnection;
     MBeanServerConnection mBeanServerConnection;
     ZkClient zkClient;
     ZkClient zkClient;
@@ -55,4 +55,13 @@ public class KafkaCluster {
                 .map(key -> key + "=" + metricsMap.get(key))
                 .map(key -> key + "=" + metricsMap.get(key))
                 .collect(Collectors.joining(", ", "{", "}"));
                 .collect(Collectors.joining(", ", "{", "}"));
     }
     }
+
+    public TopicDetails getTopicDetails(String key) {
+        var topicDetails = topicDetailsMap.get(key);
+        if(topicDetails == null) {
+            topicDetailsMap.putIfAbsent(key, new TopicDetails());
+            topicDetails = topicDetailsMap.get(key);
+        }
+        return topicDetails;
+    }
 }
 }

+ 4 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -48,10 +48,7 @@ public class ClusterService {
     }
     }
 
 
     public Mono<ResponseEntity<BrokerMetrics>> getBrokerMetrics(String clusterId) {
     public Mono<ResponseEntity<BrokerMetrics>> getBrokerMetrics(String clusterId) {
-        KafkaCluster cluster = clustersStorage.getKafkaClusters().stream()
-                .filter(cltr -> cltr.getMetricsMap().get(CLUSTER_ID).equals(clusterId))
-                .findFirst()
-                .orElseThrow();
+        KafkaCluster cluster = clustersStorage.getClusterById(clusterId);
 
 
         BrokerMetrics brokerMetrics = new BrokerMetrics();
         BrokerMetrics brokerMetrics = new BrokerMetrics();
         brokerMetrics.setClusterId(cluster.getMetricsMap().get(CLUSTER_ID));
         brokerMetrics.setClusterId(cluster.getMetricsMap().get(CLUSTER_ID));
@@ -67,10 +64,7 @@ public class ClusterService {
     }
     }
 
 
     public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterId) {
     public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterId) {
-        KafkaCluster cluster = clustersStorage.getKafkaClusters().stream()
-                .filter(cltr -> cltr.getMetricsMap().get(CLUSTER_ID).equals(clusterId))
-                .findFirst()
-                .orElseThrow();
+        KafkaCluster cluster = clustersStorage.getClusterById(clusterId);
 
 
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(cluster.getTopics())));
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(cluster.getTopics())));
     }
     }
@@ -84,6 +78,7 @@ public class ClusterService {
     }
     }
 
 
     public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterId, String topicName) {
     public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterId, String topicName) {
-        return null;
+        KafkaCluster cluster = clustersStorage.getClusterById(clusterId);
+        return Mono.just(ResponseEntity.ok(cluster.getTopicDetails(topicName)));
     }
     }
 }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java

@@ -22,7 +22,7 @@ public class MetricsUpdateService {
     public void updateMetrics(KafkaCluster kafkaCluster) {
     public void updateMetrics(KafkaCluster kafkaCluster) {
         log.debug("Start getting metrics for kafkaCluster: " + kafkaCluster.getName());
         log.debug("Start getting metrics for kafkaCluster: " + kafkaCluster.getName());
         kafkaService.loadClusterMetrics(kafkaCluster);
         kafkaService.loadClusterMetrics(kafkaCluster);
-        jmxService.loadClusterMetrics(kafkaCluster);
+//        jmxService.loadClusterMetrics(kafkaCluster);
         zookeeperService.checkZookeeperStatus(kafkaCluster);
         zookeeperService.checkZookeeperStatus(kafkaCluster);
     }
     }
 }
 }

+ 32 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/jmx/JmxService.java

@@ -8,12 +8,15 @@ import lombok.extern.log4j.Log4j2;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 
 
+import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 import javax.management.ObjectName;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 import javax.management.remote.JMXServiceURL;
 import java.io.IOException;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
+import java.util.Set;
 
 
 @Service
 @Service
 @Log4j2
 @Log4j2
@@ -34,21 +37,47 @@ public class JmxService {
 
 
         if (!isConnected) {
         if (!isConnected) {
             kafkaCluster.setJmxStatus(ServerStatus.OFFLINE);
             kafkaCluster.setJmxStatus(ServerStatus.OFFLINE);
+
             return;
             return;
         }
         }
 
 
         kafkaCluster.setJmxStatus(ServerStatus.ONLINE);
         kafkaCluster.setJmxStatus(ServerStatus.ONLINE);
-        loadJmxMetrics(kafkaCluster);
+        loadJmxClusterMetrics(kafkaCluster);
+        loadJmxTopicMetrics(kafkaCluster);
+    }
+
+    @SneakyThrows
+    private void loadJmxTopicMetrics(KafkaCluster kafkaCluster) {
+        Set<ObjectInstance> objectInstances = kafkaCluster.getMBeanServerConnection().queryMBeans(new ObjectName(
+                "kafka.cluster:type=Partition,name=UnderReplicated,topic=*,partition=*"), null);
+        Map<String, Integer> topicUrpMap = new HashMap<>();
+        for (ObjectInstance objectInstance : objectInstances) {
+            String topicName = objectInstance.getObjectName().getKeyProperty("topic");
+            if (topicName != null) {
+                topicUrpMap.putIfAbsent(topicName, 0);
+                Object attributeValue = kafkaCluster.getMBeanServerConnection().getAttribute(objectInstance.getObjectName(),"Value");
+                try {
+                    if (attributeValue != null && Integer.parseInt(attributeValue.toString()) == 1) {
+                        topicUrpMap.put(topicName, topicUrpMap.get(topicName) + 1);
+                    }
+                } catch (ArithmeticException e) {
+                    log.error(e);
+                }
+            }
+        }
+
+        for (Map.Entry<String, Integer> entry : topicUrpMap.entrySet()) {
+            kafkaCluster.getTopicDetails(entry.getKey()).setUnderReplicatedPartitions(entry.getValue());
+        }
     }
     }
 
 
     @SneakyThrows
     @SneakyThrows
-    private void loadJmxMetrics(KafkaCluster kafkaCluster) {
+    private void loadJmxClusterMetrics(KafkaCluster kafkaCluster) {
         for (Map.Entry<MBeanInfo, String> mbeanToMetric : JmxConstants.mbeanToAttributeMap.entrySet()) {
         for (Map.Entry<MBeanInfo, String> mbeanToMetric : JmxConstants.mbeanToAttributeMap.entrySet()) {
             MBeanInfo mBeanInfo = mbeanToMetric.getKey();
             MBeanInfo mBeanInfo = mbeanToMetric.getKey();
             Object attributeValue = kafkaCluster.getMBeanServerConnection().getAttribute(new ObjectName(mBeanInfo.getName()), mBeanInfo.getAttribute());
             Object attributeValue = kafkaCluster.getMBeanServerConnection().getAttribute(new ObjectName(mBeanInfo.getName()), mBeanInfo.getAttribute());
             kafkaCluster.putMetric(mbeanToMetric.getValue(), attributeValue.toString());
             kafkaCluster.putMetric(mbeanToMetric.getValue(), attributeValue.toString());
         }
         }
-
     }
     }
 
 
     private boolean createJmxConnection(KafkaCluster kafkaCluster) {
     private boolean createJmxConnection(KafkaCluster kafkaCluster) {

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/jmx/JmxTopicConstants.java

@@ -0,0 +1,8 @@
+package com.provectus.kafka.ui.jmx;
+
+public final class JmxTopicConstants {
+
+    private JmxTopicConstants() {}
+
+
+}

+ 43 - 32
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -2,10 +2,7 @@ package com.provectus.kafka.ui.kafka;
 
 
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.model.MetricsConstants;
 import com.provectus.kafka.ui.cluster.model.MetricsConstants;
-import com.provectus.kafka.ui.model.Partition;
-import com.provectus.kafka.ui.model.Replica;
-import com.provectus.kafka.ui.model.ServerStatus;
-import com.provectus.kafka.ui.model.Topic;
+import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
 import lombok.extern.log4j.Log4j2;
@@ -46,7 +43,6 @@ public class KafkaService {
         kafkaCluster.setStatus(ServerStatus.ONLINE);
         kafkaCluster.setStatus(ServerStatus.ONLINE);
         loadMetrics(kafkaCluster);
         loadMetrics(kafkaCluster);
         loadTopics(kafkaCluster);
         loadTopics(kafkaCluster);
-        loadTopicsDetails(kafkaCluster);
     }
     }
 
 
     private boolean createAdminClient(KafkaCluster kafkaCluster) {
     private boolean createAdminClient(KafkaCluster kafkaCluster) {
@@ -78,10 +74,6 @@ public class KafkaService {
         }
         }
     }
     }
 
 
-    private void loadTopicsDetails(KafkaCluster kafkaCluster) {
-
-    }
-
     @SneakyThrows
     @SneakyThrows
     private void loadTopics(KafkaCluster kafkaCluster) {
     private void loadTopics(KafkaCluster kafkaCluster) {
         AdminClient adminClient = kafkaCluster.getAdminClient();
         AdminClient adminClient = kafkaCluster.getAdminClient();
@@ -98,34 +90,54 @@ public class KafkaService {
             var topicDescription = getTopicDescription(entry);
             var topicDescription = getTopicDescription(entry);
             if (topicDescription == null) continue;
             if (topicDescription == null) continue;
 
 
-            var topic = new Topic();
-            topic.setClusterId(clusterId);
-            topic.setInternal(topicDescription.isInternal());
-            topic.setName(topicDescription.name());
-            List<Partition> partitions = new ArrayList<>();
-
-            for (TopicPartitionInfo partition : topicDescription.partitions()) {
-                var partitionDto = new Partition();
-                partitionDto.setLeader(partition.leader().id());
-                partitionDto.setPartition(partition.partition());
-                List<Replica> replicas = new ArrayList<>();
-                for (Node replicaNode : partition.replicas()) {
-                    var replica = new Replica();
-                    replica.setBroker(replicaNode.id());
-                    replica.setLeader(partition.leader() != null && partition.leader().id() == replicaNode.id());
-                    replica.setInSync(partition.isr().contains(replicaNode));
-                    replicas.add(replica);
-                }
-                partitionDto.setReplicas(replicas);
-                partitions.add(partitionDto);
-            }
+            Topic topic = collectTopicData(clusterId, topicDescription);
+            TopicDetails topicDetails = kafkaCluster.getTopicDetails(entry.getKey());
+            collectTopicDetailsData(topicDetails, topicDescription);
 
 
-            topic.setPartitions(partitions);
             foundTopics.add(topic);
             foundTopics.add(topic);
         }
         }
         kafkaCluster.setTopics(foundTopics);
         kafkaCluster.setTopics(foundTopics);
+    }
+
+    private void collectTopicDetailsData(TopicDetails topicDetails, TopicDescription topicDescription) {
+        int inSyncReplicas = 0, replicas = 0;
+        for (TopicPartitionInfo partition : topicDescription.partitions()) {
+            inSyncReplicas += partition.isr().size();
+            replicas += partition.replicas().size();
+        }
+
+        topicDetails.setReplicas(replicas);
+        topicDetails.setPartitionCount(topicDescription.partitions().size());
+        topicDetails.setInSyncReplicas(inSyncReplicas);
+        topicDetails.setReplicationFactor(topicDescription.partitions().size() > 0
+                ? topicDescription.partitions().get(0).replicas().size()
+                : null);
+    }
 
 
+    private Topic collectTopicData(String clusterId, TopicDescription topicDescription) {
+        var topic = new Topic().clusterId(clusterId);
+        topic.setInternal(topicDescription.isInternal());
+        topic.setName(topicDescription.name());
+        List<Partition> partitions = new ArrayList<>();
+
+        for (TopicPartitionInfo partition : topicDescription.partitions()) {
+            var partitionDto = new Partition();
+            partitionDto.setLeader(partition.leader().id());
+            partitionDto.setPartition(partition.partition());
+            List<Replica> replicas = new ArrayList<>();
+            for (Node replicaNode : partition.replicas()) {
+                var replica = new Replica();
+                replica.setBroker(replicaNode.id());
+                replica.setLeader(partition.leader() != null && partition.leader().id() == replicaNode.id());
+                replica.setInSync(partition.isr().contains(replicaNode));
+                replicas.add(replica);
+            }
+            partitionDto.setReplicas(replicas);
+            partitions.add(partitionDto);
+        }
+        topic.setPartitions(partitions);
 
 
+        return topic;
     }
     }
 
 
     private TopicDescription getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
     private TopicDescription getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
@@ -141,7 +153,6 @@ public class KafkaService {
     private void loadMetrics(KafkaCluster kafkaCluster) throws InterruptedException, java.util.concurrent.ExecutionException {
     private void loadMetrics(KafkaCluster kafkaCluster) throws InterruptedException, java.util.concurrent.ExecutionException {
         AdminClient adminClient = kafkaCluster.getAdminClient();
         AdminClient adminClient = kafkaCluster.getAdminClient();
         kafkaCluster.putMetric(MetricsConstants.BROKERS_COUNT, String.valueOf(adminClient.describeCluster().nodes().get().size()));
         kafkaCluster.putMetric(MetricsConstants.BROKERS_COUNT, String.valueOf(adminClient.describeCluster().nodes().get().size()));
-
         ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
         ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
         listTopicsOptions.listInternal(false);
         listTopicsOptions.listInternal(false);
         Set<String> topicNames = adminClient.listTopics(listTopicsOptions).names().get();
         Set<String> topicNames = adminClient.listTopics(listTopicsOptions).names().get();

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

@@ -29,6 +29,7 @@ public class ZookeeperService {
         if (!isConnected) {
         if (!isConnected) {
             kafkaCluster.putMetric(ZOOKEEPER_STATUS, "0");
             kafkaCluster.putMetric(ZOOKEEPER_STATUS, "0");
             kafkaCluster.setZookeeperStatus(ServerStatus.OFFLINE);
             kafkaCluster.setZookeeperStatus(ServerStatus.OFFLINE);
+
             return;
             return;
         }
         }