Sfoglia il codice sorgente

topics and cluster refactored into mono way instead of get

Roman Nedzvetskiy 5 anni fa
parent
commit
3f7b68ff03

+ 8 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java

@@ -1,12 +1,15 @@
 package com.provectus.kafka.ui.cluster;
 
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+import java.util.ArrayList;
 
 @Component
 @RequiredArgsConstructor
@@ -19,8 +22,9 @@ public class ClustersMetricsScheduler {
 
     @Scheduled(fixedRate = 30000)
     public void updateMetrics() {
-        for (KafkaCluster kafkaCluster : clustersStorage.getKafkaClusters()) {
-            metricsUpdateService.updateMetrics(kafkaCluster);
-        }
+        Flux.range(0, clustersStorage.getKafkaClusters().size())
+                .subscribeOn(Schedulers.parallel())
+                .doOnNext(s -> metricsUpdateService.updateMetrics(new ArrayList<>(clustersStorage.getKafkaClusters()).get(s)))
+                .subscribe();
     }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -59,7 +59,7 @@ public class ClusterService {
     public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData> topicFormData) {
         KafkaCluster cluster = clustersStorage.getClusterByName(name);
         if (cluster == null) return null;
-        return kafkaService.createTopic(cluster, topicFormData);
+        return kafkaService.createTopic(cluster.getAdminClient(), cluster, topicFormData);
     }
 
     @SneakyThrows

+ 8 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java

@@ -5,8 +5,10 @@ import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+import java.util.logging.Level;
 
 @Service
 @RequiredArgsConstructor
@@ -16,10 +18,11 @@ public class MetricsUpdateService {
     private final KafkaService kafkaService;
     private final ZookeeperService zookeeperService;
 
-    @Async
     public void updateMetrics(KafkaCluster kafkaCluster) {
-        log.debug("Start getting metrics for kafkaCluster: " + kafkaCluster.getName());
-        kafkaService.loadClusterMetrics(kafkaCluster);
-        zookeeperService.checkZookeeperStatus(kafkaCluster);
+        Mono.just(true)
+                .doOnNext(s -> {
+                    log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
+                    kafkaService.loadClusterMetrics(kafkaCluster);
+                }).doOnNext(s -> zookeeperService.checkZookeeperStatus(kafkaCluster)).subscribe();
     }
 }

+ 148 - 147
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -11,8 +11,8 @@ import org.apache.kafka.common.*;
 import org.apache.kafka.common.config.ConfigResource;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.*;
@@ -26,51 +26,50 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
 public class KafkaService {
 
     @SneakyThrows
-    @Async
     public void loadClusterMetrics(KafkaCluster kafkaCluster) {
-        log.debug("Start getting Kafka metrics for cluster: " + kafkaCluster.getName());
-        boolean isConnected = false;
-        if (kafkaCluster.getAdminClient() != null) {
-            isConnected = isAdminClientConnected(kafkaCluster);
-        }
-        if (kafkaCluster.getAdminClient() == null || !isConnected) {
-            isConnected = createAdminClient(kafkaCluster);
-        }
-
-        if (!isConnected) {
-            kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
-
-            return;
-        }
-
-        kafkaCluster.getCluster().setId(kafkaCluster.getId());
-        kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
-        loadMetrics(kafkaCluster);
-        loadTopicsData(kafkaCluster);
+        Mono.just(false)
+            .map(b -> {
+                log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
+                if (kafkaCluster.getAdminClient() != null) {
+                    b = isAdminClientConnected(kafkaCluster);
+                }
+                if (kafkaCluster.getAdminClient() == null || !b) {
+                    b = createAdminClient(kafkaCluster);
+                }
+                if (!b) {
+                    kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
+                }
+                return b;
+            }).filter(s -> s)
+            .doOnNext(s -> {
+                kafkaCluster.getCluster().setId(kafkaCluster.getId());
+                kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
+            }).doOnNext(s -> loadMetrics(kafkaCluster))
+            .doOnNext(s -> loadTopicsData(kafkaCluster)).subscribe();
     }
 
 
     @SneakyThrows
-    public Mono<ResponseEntity<Topic>> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
+    public Mono<ResponseEntity<Topic>> createTopic(AdminClient adminClient, KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
         return topicFormData.flatMap(
                 topicData -> {
-                    AdminClient adminClient = cluster.getAdminClient();
                     NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
                     newTopic.configs(topicData.getConfigs());
-
                     createTopic(adminClient, newTopic);
-
+                    return topicFormData;
+                }).flatMap(topicData -> {
                     DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
                     Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
                     var entry = topicDescriptionFuturesMap.entrySet().iterator().next();
                     var topicDescription = getTopicDescription(entry);
                     if (topicDescription == null) return Mono.error(new RuntimeException("Can't find created topic"));
-
-                    Topic topic = collectTopicData(cluster, topicDescription);
+                    return topicDescription;
+                }).flatMap(td ->
+                    collectTopicData(cluster, td))
+                .map(topic -> {
                     cluster.getTopics().add(topic);
-                    return Mono.just(new ResponseEntity<>(topic, HttpStatus.CREATED));
-                }
-        );
+                    return new ResponseEntity<>(topic, HttpStatus.CREATED);
+                });
     }
 
     @SneakyThrows
@@ -114,22 +113,20 @@ public class KafkaService {
         AdminClient adminClient = kafkaCluster.getAdminClient();
         ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
         listTopicsOptions.listInternal(true);
-        var topicListings = ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names()).map(tl -> {
+        ClusterUtil.toMono(adminClient.listTopics(listTopicsOptions).names())
+                .map(tl -> {
             kafkaCluster.getCluster().setTopicCount(tl.size());
-
             DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(tl);
             Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
-            List<Topic> foundTopics = new ArrayList<>();
             resetMetrics(kafkaCluster);
-
-            for (var entry : topicDescriptionFuturesMap.entrySet()) {
-                var topicDescription = getTopicDescription(entry);
-                if (topicDescription == null) continue;
-                Topic topic = collectTopicData(kafkaCluster, topicDescription);
-                foundTopics.add(topic);
-            }
-            kafkaCluster.setTopics(foundTopics);
-        }
+            return topicDescriptionFuturesMap.entrySet();
+        })
+        .flatMapMany(Flux::fromIterable)
+        .flatMap(s -> ClusterUtil.toMono(s.getValue()))
+        .flatMap(e -> collectTopicData(kafkaCluster, e))
+        .collectList()
+        .doOnNext(kafkaCluster::setTopics)
+        .subscribe();
     }
 
     private void resetMetrics(KafkaCluster kafkaCluster) {
@@ -140,133 +137,137 @@ public class KafkaService {
         kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(0);
     }
 
-    private Topic collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
-        var topic = new Topic();
-        topic.setInternal(topicDescription.isInternal());
-        topic.setName(topicDescription.name());
-
-        int inSyncReplicasCount = 0, replicasCount = 0;
-        List<Partition> partitions = new ArrayList<>();
-
-        int urpCount = 0;
-        for (TopicPartitionInfo partition : topicDescription.partitions()) {
-            var partitionDto = new Partition();
-            partitionDto.setLeader(partition.leader().id());
-            partitionDto.setPartition(partition.partition());
-            List<Replica> replicas = new ArrayList<>();
-
-            boolean isUrp = false;
-            for (Node replicaNode : partition.replicas()) {
-                var replica = new Replica();
-                replica.setBroker(replicaNode.id());
-                replica.setLeader(partition.leader() != null && partition.leader().id() == replicaNode.id());
-                replica.setInSync(partition.isr().contains(replicaNode));
-                if (!replica.getInSync()) {
-                    isUrp = true;
+    private Mono<Topic> collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
+        return Mono.just(topicDescription).map(td -> {
+            var topic = new Topic();
+            topic.setInternal(td.isInternal());
+            topic.setName(td.name());
+
+            int inSyncReplicasCount = 0, replicasCount = 0;
+            List<Partition> partitions = new ArrayList<>();
+
+            int urpCount = 0;
+            for (TopicPartitionInfo partition : td.partitions()) {
+                var partitionDto = new Partition();
+                partitionDto.setLeader(partition.leader().id());
+                partitionDto.setPartition(partition.partition());
+                List<Replica> replicas = new ArrayList<>();
+
+                boolean isUrp = false;
+                for (Node replicaNode : partition.replicas()) {
+                    var replica = new Replica();
+                    replica.setBroker(replicaNode.id());
+                    replica.setLeader(partition.leader() != null && partition.leader().id() == replicaNode.id());
+                    replica.setInSync(partition.isr().contains(replicaNode));
+                    if (!replica.getInSync()) {
+                        isUrp = true;
+                    }
+                    replicas.add(replica);
+
+                    inSyncReplicasCount += partition.isr().size();
+                    replicasCount += partition.replicas().size();
                 }
-                replicas.add(replica);
+                if (isUrp) {
+                    urpCount++;
+                }
+                partitionDto.setReplicas(replicas);
+                partitions.add(partitionDto);
 
-                inSyncReplicasCount += partition.isr().size();
-                replicasCount += partition.replicas().size();
-            }
-            if (isUrp) {
-                urpCount++;
+                if (partition.leader() != null) {
+                    kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount() + 1);
+                } else {
+                    kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(kafkaCluster.getBrokersMetrics().getOfflinePartitionCount() + 1);
+                }
             }
-            partitionDto.setReplicas(replicas);
-            partitions.add(partitionDto);
 
-            if (partition.leader() != null) {
-                kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount() + 1);
-            } else {
-                kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(kafkaCluster.getBrokersMetrics().getOfflinePartitionCount() + 1);
-            }
-        }
-
-        kafkaCluster.getCluster().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount());
-        kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(
-                kafkaCluster.getBrokersMetrics().getUnderReplicatedPartitionCount() + urpCount);
-        kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(
-                kafkaCluster.getBrokersMetrics().getInSyncReplicasCount() + inSyncReplicasCount);
-        kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(
-                kafkaCluster.getBrokersMetrics().getOutOfSyncReplicasCount() + (replicasCount - inSyncReplicasCount));
-
-        topic.setPartitions(partitions);
-        TopicDetails topicDetails = kafkaCluster.getOrCreateTopicDetails(topicDescription.name());
-        topicDetails.setReplicas(replicasCount);
-        topicDetails.setPartitionCount(topicDescription.partitions().size());
-        topicDetails.setInSyncReplicas(inSyncReplicasCount);
-        topicDetails.setReplicationFactor(topicDescription.partitions().size() > 0
-                ? topicDescription.partitions().get(0).replicas().size()
-                : null);
-        topicDetails.setUnderReplicatedPartitions(urpCount);
-
-        loadTopicConfig(kafkaCluster, topicDescription.name());
-
-        return topic;
+            kafkaCluster.getCluster().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount());
+            kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(
+                    kafkaCluster.getBrokersMetrics().getUnderReplicatedPartitionCount() + urpCount);
+            kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(
+                    kafkaCluster.getBrokersMetrics().getInSyncReplicasCount() + inSyncReplicasCount);
+            kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(
+                    kafkaCluster.getBrokersMetrics().getOutOfSyncReplicasCount() + (replicasCount - inSyncReplicasCount));
+
+            topic.setPartitions(partitions);
+            TopicDetails topicDetails = kafkaCluster.getOrCreateTopicDetails(td.name());
+            topicDetails.setReplicas(replicasCount);
+            topicDetails.setPartitionCount(topicDescription.partitions().size());
+            topicDetails.setInSyncReplicas(inSyncReplicasCount);
+            topicDetails.setReplicationFactor(topicDescription.partitions().size() > 0
+                    ? topicDescription.partitions().get(0).replicas().size()
+                    : null);
+            topicDetails.setUnderReplicatedPartitions(urpCount);
+            return topic;
+        }).flatMap(topic -> {
+                    loadTopicConfig(kafkaCluster, topic.getName());
+                    return Mono.just(topic);
+                });
     }
 
-    private TopicDescription getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
-        try {
-            return entry.getValue().get();
-        } catch (Exception e) {
-            log.error("Can't get topic with name: " + entry.getKey(), e);
-
-            return null;
-        }
+    private Mono<TopicDescription> getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
+        return ClusterUtil.toMono(entry.getValue())
+                    .onErrorResume(e -> {
+                        log.error("Can't get topic with name: " + entry.getKey());
+                        return Mono.empty();
+                    });
     }
 
-    private void loadMetrics(KafkaCluster kafkaCluster) throws InterruptedException, java.util.concurrent.ExecutionException {
+    private void loadMetrics(KafkaCluster kafkaCluster) {
         AdminClient adminClient = kafkaCluster.getAdminClient();
-        int brokerCount = adminClient.describeCluster().nodes().get().size();
-        kafkaCluster.getCluster().setBrokerCount(brokerCount);
-        kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
-        kafkaCluster.getBrokersMetrics().setActiveControllers(adminClient.describeCluster().controller().get() != null ? 1 : 0);
-
-        for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
-            if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
-                    && metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
-                kafkaCluster.getCluster().setBytesInPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
-            }
-            if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
-                    && metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
-                kafkaCluster.getCluster().setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
+        ClusterUtil.toMono(adminClient.describeCluster().nodes()).flatMap(brokers -> {
+            var brokerCount = brokers.size();
+            kafkaCluster.getCluster().setBrokerCount(brokerCount);
+            kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
+            return ClusterUtil.toMono(adminClient.describeCluster().controller());
+        }).doOnNext(c -> {
+            kafkaCluster.getBrokersMetrics().setActiveControllers(c != null ? 1 : 0);
+            for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
+                if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
+                        && metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
+                    kafkaCluster.getCluster().setBytesInPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
+                }
+                if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
+                        && metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
+                    kafkaCluster.getCluster().setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
+                }
             }
-        }
+        }).subscribe();
     }
 
     @SneakyThrows
-    private void loadTopicConfig(KafkaCluster kafkaCluster, String topicName) {
+    private Mono<List<TopicConfig>> loadTopicConfig(KafkaCluster kafkaCluster, String topicName) {
         AdminClient adminClient = kafkaCluster.getAdminClient();
 
         Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName));
-        final Map<ConfigResource, Config> configs = adminClient.describeConfigs(resources).all().get();
-
-        if (configs.isEmpty()) return;
-
-        Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
-        List<TopicConfig> topicConfigs = new ArrayList<>();
-        for (ConfigEntry entry : entries) {
-            TopicConfig topicConfig = new TopicConfig();
-            topicConfig.setName(entry.name());
-            topicConfig.setValue(entry.value());
-            if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
-                topicConfig.setDefaultValue(topicConfig.getValue());
-            } else {
-                topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
-            }
-            topicConfigs.add(topicConfig);
-        }
-
-        kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
+        return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
+                .map(configs -> {
+
+                if (!configs.isEmpty()) return Collections.emptyList();
+
+                    Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
+                    List<TopicConfig> topicConfigs = new ArrayList<>();
+                    for (ConfigEntry entry : entries) {
+                        TopicConfig topicConfig = new TopicConfig();
+                        topicConfig.setName(entry.name());
+                        topicConfig.setValue(entry.value());
+                        if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
+                            topicConfig.setDefaultValue(topicConfig.getValue());
+                        } else {
+                            topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
+                        }
+                        topicConfigs.add(topicConfig);
+                    }
+
+                    return kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
+            });
     }
 
     @SneakyThrows
     private void createTopic(AdminClient adminClient, NewTopic newTopic) {
-        adminClient.createTopics(Collections.singletonList(newTopic))
+        ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
                 .values()
                 .values()
                 .iterator()
-                .next()
-                .get();
+                .next()).map(s -> Mono.just(true));
     }
 }

+ 16 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

@@ -4,32 +4,30 @@ import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.I0Itec.zkclient.ZkClient;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
 
 @Service
 @RequiredArgsConstructor
 @Log4j2
 public class ZookeeperService {
 
-    @Async
     public void checkZookeeperStatus(KafkaCluster kafkaCluster) {
-        log.debug("Start getting Zookeeper metrics for kafkaCluster: " + kafkaCluster.getName());
-        boolean isConnected = false;
-        if (kafkaCluster.getZkClient() != null) {
-            isConnected = isZkClientConnected(kafkaCluster);
-        }
-        if (kafkaCluster.getZkClient() == null || !isConnected) {
-            isConnected = createZookeeperConnection(kafkaCluster);
-        }
-
-        if (!isConnected) {
-            kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.OFFLINE);
-
-            return;
-        }
-
-        kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.ONLINE);
+        Mono.just(false)
+                .doOnNext(isConnected -> {
+                    log.debug("Start getting Zookeeper metrics for kafkaCluster: {}", kafkaCluster.getName());
+                    if (kafkaCluster.getZkClient() != null) {
+                        isConnected = isZkClientConnected(kafkaCluster);
+                    }
+                    if (kafkaCluster.getZkClient() == null || !isConnected) {
+                        isConnected = createZookeeperConnection(kafkaCluster);
+                    }
+                    if (!isConnected) {
+                        kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.OFFLINE);
+                        return;
+                    }
+                    kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.ONLINE);
+                }).subscribe();
     }
 
     private boolean createZookeeperConnection(KafkaCluster kafkaCluster) {