Roman Nedzvetskiy 5 anni fa
parent
commit
7d73096ad6

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java

@@ -25,7 +25,7 @@ public class ClustersMetricsScheduler {
                 .subscribeOn(Schedulers.parallel())
                 .map(s -> new ClusterWithId(s.getKey(), s.getValue()))
                 .flatMap(metricsUpdateService::updateMetrics)
-                .doOnNext(s ->clustersStorage.setKafkaCluster(s.getId(), s.getKafkaCluster()))
+                .doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s.getKafkaCluster()))
                 .subscribe();
     }
 }

+ 35 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/BrokersMetricsMapper.java

@@ -0,0 +1,35 @@
+package com.provectus.kafka.ui.cluster.mapper;
+
+import com.provectus.kafka.ui.cluster.model.BrokersMetricsDto;
+import com.provectus.kafka.ui.model.BrokersMetrics;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.mapstruct.factory.Mappers;
+
+@Mapper
+public interface BrokersMetricsMapper {
+
+    BrokersMetricsMapper instance = Mappers.getMapper(BrokersMetricsMapper.class);
+
+    @Mapping(target = "brokerCount")
+    @Mapping(target = "zooKeeperStatus")
+    @Mapping(target = "activeControllers")
+    @Mapping(target = "uncleanLeaderElectionCount")
+    @Mapping(target = "onlinePartitionCount")
+    @Mapping(target = "underReplicatedPartitionCount")
+    @Mapping(target = "offlinePartitionCount")
+    @Mapping(target = "inSyncReplicasCount")
+    @Mapping(target = "outOfSyncReplicasCount")
+    BrokersMetricsDto toBrokersMetricsDto (BrokersMetrics brokersMetrics);
+
+    @Mapping(target = "brokerCount")
+    @Mapping(target = "zooKeeperStatus")
+    @Mapping(target = "activeControllers")
+    @Mapping(target = "uncleanLeaderElectionCount")
+    @Mapping(target = "onlinePartitionCount")
+    @Mapping(target = "underReplicatedPartitionCount")
+    @Mapping(target = "offlinePartitionCount")
+    @Mapping(target = "inSyncReplicasCount")
+    @Mapping(target = "outOfSyncReplicasCount")
+    BrokersMetrics toBrokersMetrics (BrokersMetricsDto brokersMetrics);
+}

+ 35 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterDtoMapper.java

@@ -0,0 +1,35 @@
+package com.provectus.kafka.ui.cluster.mapper;
+
+import com.provectus.kafka.ui.cluster.model.ClusterDto;
+import com.provectus.kafka.ui.model.Cluster;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.mapstruct.factory.Mappers;
+
+@Mapper
+public interface ClusterDtoMapper {
+
+    ClusterDtoMapper instance = Mappers.getMapper(ClusterDtoMapper.class);
+
+    @Mapping(target = "name")
+    @Mapping(target = "id")
+    @Mapping(target = "defaultCluster")
+    @Mapping(target = "status")
+    @Mapping(target = "brokerCount")
+    @Mapping(target = "onlinePartitionCount")
+    @Mapping(target = "topicCount")
+    @Mapping(target = "bytesInPerSec")
+    @Mapping(target = "bytesOutPerSec")
+    ClusterDto toClusterDto(Cluster cluster);
+
+    @Mapping(target = "name")
+    @Mapping(target = "id")
+    @Mapping(target = "defaultCluster")
+    @Mapping(target = "status")
+    @Mapping(target = "brokerCount")
+    @Mapping(target = "onlinePartitionCount")
+    @Mapping(target = "topicCount")
+    @Mapping(target = "bytesInPerSec")
+    @Mapping(target = "bytesOutPerSec")
+    Cluster toCluster(ClusterDto cluster);
+}

+ 17 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/BrokersMetricsDto.java

@@ -0,0 +1,17 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Data;
+
+@Data
+public class BrokersMetricsDto {
+
+    private Integer brokerCount;
+    private Integer zooKeeperStatus;
+    private Integer activeControllers;
+    private Integer uncleanLeaderElectionCount;
+    private Integer onlinePartitionCount;
+    private Integer underReplicatedPartitionCount;
+    private Integer offlinePartitionCount;
+    private Integer inSyncReplicasCount;
+    private Integer outOfSyncReplicasCount;
+}

+ 18 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ClusterDto.java

@@ -0,0 +1,18 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import com.provectus.kafka.ui.model.ServerStatus;
+import lombok.Data;
+
+@Data
+public class ClusterDto {
+
+    private String name;
+    private String id;
+    private boolean defaultCluster;
+    private ServerStatus status;
+    private Integer brokerCount;
+    private Integer onlinePartitionCount;
+    private Integer topicCount;
+    private Integer bytesInPerSec;
+    private Integer bytesOutPerSec;
+}

+ 1 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java

@@ -18,10 +18,6 @@ public class MetricsUpdateService {
 
     public Mono<ClusterWithId> updateMetrics(ClusterWithId clusterWithId) {
         log.debug("Start getting metrics for kafkaCluster: {}", clusterWithId.getKafkaCluster());
-        return kafkaService.getUpdatedCluster(clusterWithId)
-                .map(s -> {
-                    zookeeperService.checkZookeeperStatus(s.getKafkaCluster());
-                    return s;
-                });
+        return kafkaService.getUpdatedCluster(clusterWithId);
     }
 }

+ 0 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -6,15 +6,10 @@ import com.provectus.kafka.ui.model.ConsumerGroup;
 import com.provectus.kafka.ui.model.Partition;
 import com.provectus.kafka.ui.model.Replica;
 import com.provectus.kafka.ui.model.Topic;
-import lombok.SneakyThrows;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.common.KafkaFuture;
 import reactor.core.publisher.Mono;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -66,17 +61,4 @@ public class ClusterUtil {
         }).collect(Collectors.toList()));
         return topic;
     }
-
-    @SneakyThrows
-    public static <T> T clone(T subject) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream ous = new ObjectOutputStream(baos);
-        ous.writeObject(subject);
-        ous.close();
-        ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-        ObjectInputStream ois = new ObjectInputStream(bais);
-        return (T) ois.readObject();
-
-
-    }
 }

+ 39 - 32
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -1,9 +1,12 @@
 package com.provectus.kafka.ui.kafka;
 
+import com.provectus.kafka.ui.cluster.mapper.BrokersMetricsMapper;
+import com.provectus.kafka.ui.cluster.mapper.ClusterDtoMapper;
 import com.provectus.kafka.ui.cluster.model.Metrics;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.model.*;
+import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
@@ -29,56 +32,65 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
 @Log4j2
 public class KafkaService {
 
+    private final ZookeeperService zookeeperService;
+
     private Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
 
     @SneakyThrows
     public Mono<ClusterWithId> getUpdatedCluster(ClusterWithId clusterWithId) {
-        var tempCluster = ClusterUtil.clone(clusterWithId.getKafkaCluster());
-        var internalCluster = clusterWithId.getKafkaCluster().toBuilder();
+        var internalCluster = clusterWithId.getKafkaCluster();
+        var clusterBuilder = clusterWithId.getKafkaCluster().toBuilder();
         return getOrCreateAdminClient(clusterWithId).flatMap(
                     ac ->
                         getClusterMetrics(ac).flatMap(
                             metrics -> {
-                                Cluster cluster = ClusterUtil.clone(tempCluster.getCluster());
+                                clusterBuilder.zookeeperStatus(zookeeperService.isZookeeperOnline(internalCluster) ? ServerStatus.ONLINE : ServerStatus.OFFLINE);
+                                ClusterDto cluster = ClusterDtoMapper.instance.toClusterDto(internalCluster.getCluster());
                                 cluster.setStatus(ServerStatus.ONLINE);
                                 cluster.setBytesInPerSec(metrics.getBytesInPerSec());
                                 cluster.setBytesOutPerSec(metrics.getBytesOutPerSec());
-                                BrokersMetrics brokersMetrics = tempCluster.getBrokersMetrics() != null
-                                        ? ClusterUtil.clone(tempCluster.getBrokersMetrics()) : new BrokersMetrics();
+                                BrokersMetricsDto brokersMetrics = internalCluster.getBrokersMetrics() != null
+                                        ? BrokersMetricsMapper.instance.toBrokersMetricsDto(internalCluster.getBrokersMetrics()) : new BrokersMetricsDto();
+                                brokersMetrics.setBrokerCount(metrics.getBrokerCount());
+                                brokersMetrics.setActiveControllers(metrics.getActiveControllers());
                                 brokersMetrics.setBrokerCount(metrics.getBrokerCount());
-                                brokersMetrics.activeControllers(metrics.getActiveControllers());
-                                brokersMetrics.brokerCount(metrics.getBrokerCount());
-                                resetMetrics(brokersMetrics);
+                                resetPartitionMetrics(brokersMetrics);
                                 cluster.setBrokerCount(metrics.getBrokerCount());
-                                return getTopicsData(ac, internalCluster, cluster, brokersMetrics, tempCluster)
+                                Map<String, TopicDetails> topicDetails;
+                                if (internalCluster.getTopicDetailsMap() == null) {
+                                    topicDetails = new HashMap<>();
+                                    clusterBuilder.topicDetailsMap(topicDetails);
+                                } else {
+                                    topicDetails = new HashMap<>(internalCluster.getTopicDetailsMap());
+                                }
+                                return getTopicsData(ac, cluster, brokersMetrics, topicDetails)
                                         .map(topics -> {
-                                            internalCluster.topics(ClusterUtil.convertToExternalTopicList(topics));
+                                            clusterBuilder.topics(ClusterUtil.convertToExternalTopicList(topics));
                                             cluster.setTopicCount(topics.size());
                                             return topics;
                                         })
                                         .flatMap(topics ->
                                             loadTopicConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList())).collectList()
-                                                    .map(s -> s.stream().collect(Collectors.toMap(map -> new ArrayList<>(map.entrySet()).get(0).getKey(),
-                                                            e -> new ArrayList<>(e.entrySet()).get(0).getValue())))
+                                                    .map(s -> s.stream().collect(HashMap<String, List<TopicConfig>>::new, HashMap::putAll, HashMap::putAll))
                                                     .map(topicsConfig -> {
-                                                        internalCluster.topicConfigsMap(topicsConfig);
-                                                        return internalCluster;
+                                                        clusterBuilder.topicConfigsMap(topicsConfig);
+                                                        return clusterBuilder;
                                                     })
                                         ).map(kc -> clusterWithId.toBuilder().kafkaCluster(
                                                         kc
-                                                        .cluster(cluster)
-                                                        .brokersMetrics(brokersMetrics)
+                                                        .cluster(ClusterDtoMapper.instance.toCluster(cluster))
+                                                        .brokersMetrics(BrokersMetricsMapper.instance.toBrokersMetrics(brokersMetrics))
                                                         .build()
                                         ).build());
                             })
             ).onErrorResume(
                     e -> {
-                        Cluster cluster = ClusterUtil.clone(tempCluster.getCluster());
+                        ClusterDto cluster = ClusterDtoMapper.instance.toClusterDto(internalCluster.getCluster());
                         cluster.setStatus(ServerStatus.OFFLINE);
                         return Mono.just(clusterWithId.toBuilder().kafkaCluster(
-                                tempCluster.toBuilder()
+                                internalCluster.toBuilder()
                                         .lastKafkaException(e)
-                                        .cluster(cluster)
+                                        .cluster(ClusterDtoMapper.instance.toCluster(cluster))
                                         .build()
                         ).build());
                     }
@@ -86,20 +98,21 @@ public class KafkaService {
     }
 
     @SneakyThrows
-    private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient, KafkaCluster.KafkaClusterBuilder kafkaCluster,
-                                                    Cluster cluster, BrokersMetrics brokersMetrics, KafkaCluster tempCluster) {
+    private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient, ClusterDto cluster,
+                                                    BrokersMetricsDto brokersMetrics, Map<String, TopicDetails> topicDetails) {
         ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
         listTopicsOptions.listInternal(true);
         return ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
                     .map(tl -> {
-                    cluster.setTopicCount(tl.size());
+                        cluster.setTopicCount(tl.size());
                         DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
                         Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
+                        topicDetails.putAll(tl.stream().collect(Collectors.toMap(String::new, v -> new TopicDetails())));
                         return topicDescriptionFuturesMap.entrySet();
                     })
                     .flatMapMany(Flux::fromIterable)
                     .flatMap(s -> ClusterUtil.toMono(s.getValue()))
-                    .map(e -> collectTopicData(kafkaCluster, e, cluster, brokersMetrics, tempCluster))
+                    .map(e -> collectTopicData(e, cluster, brokersMetrics, topicDetails.get(e.name())))
                     .collectList();
     }
 
@@ -177,7 +190,7 @@ public class KafkaService {
         return getClusterId(adminClient).map( r -> adminClient);
     }
 
-    private void resetMetrics(BrokersMetrics brokersMetrics) {
+    private void resetPartitionMetrics(BrokersMetricsDto brokersMetrics) {
         brokersMetrics.setOnlinePartitionCount(0);
         brokersMetrics.setOfflinePartitionCount(0);
         brokersMetrics.setUnderReplicatedPartitionCount(0);
@@ -185,8 +198,8 @@ public class KafkaService {
         brokersMetrics.setOutOfSyncReplicasCount(0);
     }
 
-    private InternalTopic collectTopicData(KafkaCluster.KafkaClusterBuilder kafkaClusterBuilder, TopicDescription topicDescription,
-                                           Cluster cluster, BrokersMetrics brokersMetrics, KafkaCluster kafkaCluster) {
+    private InternalTopic collectTopicData(TopicDescription topicDescription, ClusterDto cluster, BrokersMetricsDto brokersMetrics,
+                                           TopicDetails topicDetails) {
         var topic = InternalTopic.builder();
         topic.internal(topicDescription.isInternal());
         topic.name(topicDescription.name());
@@ -214,12 +227,6 @@ public class KafkaService {
 
         topic.partitions(partitions);
 
-        if (kafkaCluster.getTopicDetailsMap() == null) {
-            kafkaClusterBuilder.topicDetailsMap(new HashMap<>());
-        }
-
-        var topicDetails = kafkaClusterBuilder.build().getOrCreateTopicDetails(topicDescription.name());
-
         topicDetails.setReplicas(replicasCount);
         topicDetails.setPartitionCount(topicDescription.partitions().size());
         topicDetails.setInSyncReplicas(inSyncReplicasCount);

+ 2 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

@@ -19,18 +19,14 @@ public class ZookeeperService {
 
     private final Map<String, ZkClient> cachedZkClient = new HashMap<>();
 
-    public void checkZookeeperStatus(KafkaCluster kafkaCluster) {
+    public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
         var isConnected = false;
         var zkClient = getOrCreateZkClient(kafkaCluster.getName());
         log.debug("Start getting Zookeeper metrics for kafkaCluster: {}", kafkaCluster.getName());
         if (zkClient != null) {
             isConnected = isZkClientConnected(zkClient);
         }
-        if (!isConnected) {
-            kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.OFFLINE);
-            return;
-        }
-        kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.ONLINE);
+        return isConnected;
     }
 
     private boolean isZkClientConnected(ZkClient zkClient) {