Jelajahi Sumber

AdminClient now boxed into extendedAdminClient

Roman Nedzvetskiy 5 tahun lalu
induk
melakukan
9d4fb8007c

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ExtendedAdminClient.java

@@ -0,0 +1,20 @@
+package com.provectus.kafka.ui.cluster.model;
+
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.admin.AdminClient;
+
+import java.util.List;
+
+@Data
+@RequiredArgsConstructor
+public class ExtendedAdminClient {
+
+    private final AdminClient adminClient;
+    private final List<SupportedFeatures> supportedFeatures;
+
+    public enum SupportedFeatures {
+        INCREMENTAL_ALTER_CONFIGS,
+        ALTER_CONFIGS
+    }
+}

+ 6 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -7,6 +7,7 @@ import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
+import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 
@@ -63,10 +64,11 @@ public class ClusterService {
         ).orElse(Mono.empty()).map(clusterMapper::toTopic);
     }
 
-    public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData, Integer id) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
-        if (cluster == null) return Mono.error(new Throwable("Cluster " + clusterName + " not found"));
-        return topicFormData.flatMap(t -> kafkaService.updateTopic(cluster, topicName, t, id)).map(ResponseEntity::ok);
+    @SneakyThrows
+    public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData) {
+        return clustersStorage.getClusterByName(clusterName).map(c ->
+                    topicFormData.flatMap(t -> kafkaService.updateTopic(c, topicName, t)).map(ResponseEntity::ok))
+                .orElse(Mono.empty());
     }
 
     @SneakyThrows

+ 25 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -2,6 +2,9 @@ package com.provectus.kafka.ui.cluster.util;
 
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.model.ConsumerGroup;
+import com.provectus.kafka.ui.model.Partition;
+import com.provectus.kafka.ui.model.Replica;
+import com.provectus.kafka.ui.model.Topic;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
@@ -17,6 +20,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static com.provectus.kafka.ui.kafka.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
 import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
@@ -46,15 +50,15 @@ public class ClusterUtil {
         return consumerGroup;
     }
 
-    public static void setSupportedCommands(KafkaCluster cluster, Map<ConfigResource, Config> configs) {
+    public static ExtendedAdminClient.SupportedFeatures getSupportedUpdateFeature(KafkaCluster cluster, Map<ConfigResource, Config> configs) {
         String version = configs.values().stream()
                 .map(en -> en.entries().stream()
                         .filter(en1 -> en1.name().contains(CLUSTER_VERSION_PARAM_KEY))
                         .findFirst().orElseThrow())
                 .findFirst().orElseThrow().value();
         try {
-            cluster.getSupportedCommands().add(Float.parseFloat(version.split("-")[0]) <= 2.3f
-                    ? SupportedCommands.ALTER_CONFIGS : SupportedCommands.INCREMENTAL_ALTER_CONFIGS);
+            return Float.parseFloat(version.split("-")[0]) <= 2.3f
+                    ? ExtendedAdminClient.SupportedFeatures.ALTER_CONFIGS : ExtendedAdminClient.SupportedFeatures.INCREMENTAL_ALTER_CONFIGS;
         } catch (NoSuchElementException el) {
             log.error("Cluster version param not found {}", cluster.getName());
             throw el;
@@ -124,4 +128,22 @@ public class ClusterUtil {
         return topic.build();
     }
 
+    public static Topic convertToTopic (InternalTopic internalTopic) {
+        Topic topic = new Topic();
+        topic.setName(internalTopic.getName());
+        List<Partition> partitions = internalTopic.getPartitions().stream().flatMap(s -> {
+            Partition partition = new Partition();
+            partition.setPartition(s.getPartition());
+            partition.setLeader(s.getLeader());
+            partition.setReplicas(s.getReplicas().stream().flatMap(r -> {
+                Replica replica = new Replica();
+                replica.setBroker(r.getBroker());
+                return Stream.of(replica);
+            }).collect(Collectors.toList()));
+            return Stream.of(partition);
+        }).collect(Collectors.toList());
+        topic.setPartitions(partitions);
+        return topic;
+    }
+
 }

+ 0 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/SupportedCommands.java

@@ -1,7 +0,0 @@
-package com.provectus.kafka.ui.cluster.util;
-
-public enum SupportedCommands {
-
-    INCREMENTAL_ALTER_CONFIGS,
-    ALTER_CONFIGS
-}

+ 41 - 40
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -1,11 +1,7 @@
 package com.provectus.kafka.ui.kafka;
 
-import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
-import com.provectus.kafka.ui.cluster.model.InternalTopic;
-import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
-import com.provectus.kafka.ui.cluster.util.SupportedCommands;
 import com.provectus.kafka.ui.model.ConsumerGroup;
 import com.provectus.kafka.ui.model.ServerStatus;
 import com.provectus.kafka.ui.model.Topic;
@@ -23,10 +19,7 @@ import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -39,14 +32,14 @@ public class KafkaService {
     private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true);
 
     private final ZookeeperService zookeeperService;
-    private final Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
+    private final Map<String, Mono<ExtendedAdminClient>> adminClientCache = new ConcurrentHashMap<>();
 
     @SneakyThrows
     public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
         return getOrCreateAdminClient(cluster).flatMap(
-                ac -> getClusterMetrics(ac).flatMap( clusterMetrics ->
-                            getTopicsData(ac).flatMap( topics ->
-                                loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
+                ac -> getClusterMetrics(ac.getAdminClient()).flatMap( clusterMetrics ->
+                            getTopicsData(ac.getAdminClient()).flatMap( topics ->
+                                loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
                                         .map( configs -> mergeWithConfigs(topics, configs) )
                             ).map( topics -> buildFromData(cluster, clusterMetrics, topics))
                         )
@@ -150,8 +143,7 @@ public class KafkaService {
 
 
     public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
-        AdminClient adminClient = this.createAdminClient(cluster);
-        return this.createTopic(adminClient, topicFormData);
+        return this.getOrCreateAdminClient(cluster).flatMap(t -> this.createTopic(t.getAdminClient(), topicFormData));
     }
 
     @SneakyThrows
@@ -174,54 +166,62 @@ public class KafkaService {
                 );
     }
 
-    public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData, Integer id) {
+    public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
         ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
-        List<ConfigResource> brokerCR = Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString()));
-        return ClusterUtil.toMono(cluster.getAdminClient().describeConfigs(brokerCR).all())
-                .flatMap(c -> {
-                    if (cluster.getSupportedCommands().isEmpty()) {
-                        ClusterUtil.setSupportedCommands(cluster, c);
-                    }
-                    if (cluster.getSupportedCommands().contains(SupportedCommands.INCREMENTAL_ALTER_CONFIGS)) {
+        return getOrCreateAdminClient(cluster)
+                .flatMap(ac -> {
+                    if (ac.getSupportedFeatures().contains(ExtendedAdminClient.SupportedFeatures.INCREMENTAL_ALTER_CONFIGS)) {
                         List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream()
                                 .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList());
-                        cluster.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCR, listOp));
+                        ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCR, listOp));
                     } else {
+
                         List<ConfigEntry> configEntries = topicFormData.getConfigs().entrySet().stream()
                                 .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))).collect(Collectors.toList());
                         Config config = new Config(configEntries);
                         Map<ConfigResource, Config> map = Collections.singletonMap(topicCR, config);
-                        cluster.getAdminClient().alterConfigs(map);
+                        ac.getAdminClient().alterConfigs(map);
                     }
-                    return ClusterUtil.toMono(cluster.getAdminClient().describeTopics(Collections.singletonList(topicName)).all())
-                            .map(t -> collectTopicData(cluster, t.get(topicName)));
+
+                    return getTopicsData(ac.getAdminClient())
+                            .map(s -> s.stream()
+                            .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow())
+                            .map(ClusterUtil::convertToTopic);
                 });
     }
 
 
     @SneakyThrows
-    private Mono<String> getClusterId(AdminClient adminClient) {
-        return ClusterUtil.toMono(adminClient.describeCluster().clusterId());
+    private Mono<String> getClusterId(ExtendedAdminClient adminClient) {
+        return ClusterUtil.toMono(adminClient.getAdminClient().describeCluster().clusterId());
     }
 
 
-    public Mono<AdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
-        AdminClient adminClient = adminClientCache.computeIfAbsent(
+    public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
+        return adminClientCache.computeIfAbsent(
                 cluster.getId(),
                 (id) -> createAdminClient(cluster)
-        );
-
-        return isAdminClientConnected(adminClient);
+        ).flatMap(this::isAdminClientConnected);
     }
 
-    public AdminClient createAdminClient(KafkaCluster kafkaCluster) {
+    public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
         Properties properties = new Properties();
         properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
         properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
-        return AdminClient.create(properties);
+        AdminClient adminClient = AdminClient.create(properties);
+        return ClusterUtil.toMono(adminClient.describeCluster().controller())
+                .map(Node::id)
+                .map(id -> Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())))
+                .flatMap(brokerCR -> ClusterUtil.toMono(adminClient.describeConfigs(brokerCR).all())
+                    .map(cfg -> ClusterUtil.getSupportedUpdateFeature(kafkaCluster, cfg))
+                    .map(u -> {
+                        List<ExtendedAdminClient.SupportedFeatures> supportedFeatures = Collections.singletonList(u);
+                        return new ExtendedAdminClient(adminClient, supportedFeatures);
+                    })
+                );
     }
 
-    private Mono<AdminClient> isAdminClientConnected(AdminClient adminClient) {
+    private Mono<ExtendedAdminClient> isAdminClientConnected(ExtendedAdminClient adminClient) {
         return getClusterId(adminClient).map( r -> adminClient);
     }
 
@@ -256,11 +256,12 @@ public class KafkaService {
     }
 
     public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
-        var adminClient =  this.createAdminClient(cluster);
+        var extendedAdminClient =  this.createAdminClient(cluster);
 
-        return ClusterUtil.toMono(adminClient.listConsumerGroups().all())
-                .flatMap(s -> ClusterUtil.toMono(adminClient
+        return extendedAdminClient.flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()))
+                .flatMap(s -> extendedAdminClient.map(ac -> ac.getAdminClient()
                         .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
+                .flatMap(ClusterUtil::toMono)
                 .map(s -> s.values().stream()
                         .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
     }

+ 1 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -55,10 +55,7 @@ public class MetricsRestController implements ApiClustersApi {
 
     @Override
     public Mono<ResponseEntity<Topic>> updateTopic(String clusterId, String topicName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
-        var cluster = clustersStorage.getClusterByName(clusterId);
-        return ClusterUtil.toMono(cluster.getAdminClient().describeCluster().controller())
-                .map(Node::id)
-                .flatMap(id -> clusterService.updateTopic(clusterId, topicName, topicFormData, id));
+        return clusterService.updateTopic(clusterId, topicName, topicFormData);
     }
 
     @Override

+ 1 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

@@ -20,16 +20,6 @@ public class ZookeeperService {
     @Value("${zookeeper.connection-timeout}")
     private Integer sessionTimeout;
 
-    @Async
-    public void checkZookeeperStatus(KafkaCluster kafkaCluster) {
-        log.debug("Start getting Zookeeper metrics for kafkaCluster: " + kafkaCluster.getName());
-        boolean isConnected = false;
-        if (kafkaCluster.getZkClient() != null) {
-            isConnected = isZkClientConnected(kafkaCluster);
-        }
-        if (kafkaCluster.getZkClient() == null || !isConnected) {
-            isConnected = createZookeeperConnection(kafkaCluster);
-        }
     private final Map<String, ZkClient> cachedZkClient = new HashMap<>();
 
     public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
@@ -49,7 +39,7 @@ public class ZookeeperService {
 
     private ZkClient getOrCreateZkClient (KafkaCluster cluster) {
         try {
-            return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), 1000));
+            return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), sessionTimeout));
         } catch (Exception e) {
             log.error("Error while creating zookeeper client for cluster {}", cluster.getName());
             return null;