Browse Source

SupportedFeatures method moved to util

Roman Nedzvetskiy 5 years ago
parent
commit
72830b30ef

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/ExtendedAdminClient.java

@@ -1,8 +1,10 @@
 package com.provectus.kafka.ui.cluster.model;
 
+import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import lombok.Data;
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.clients.admin.AdminClient;
+import reactor.core.publisher.Mono;
 
 import java.util.List;
 
@@ -17,4 +19,9 @@ public class ExtendedAdminClient {
         INCREMENTAL_ALTER_CONFIGS,
         ALTER_CONFIGS
     }
+
+    public static Mono<ExtendedAdminClient> extendedAdminClient(AdminClient adminClient) {
+        return ClusterUtil.getSupportedFeatures(adminClient)
+                .map(s -> new ExtendedAdminClient(adminClient, s));
+    }
 }

+ 16 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -6,17 +6,13 @@ import com.provectus.kafka.ui.model.Partition;
 import com.provectus.kafka.ui.model.Replica;
 import com.provectus.kafka.ui.model.Topic;
 import lombok.extern.log4j.Log4j2;
-import org.apache.kafka.clients.admin.Config;
-import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.clients.admin.ConsumerGroupDescription;
-import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigResource;
 import reactor.core.publisher.Mono;
 
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -48,7 +44,19 @@ public class ClusterUtil {
         return consumerGroup;
     }
 
-    public static ExtendedAdminClient.SupportedFeatures getSupportedUpdateFeature(KafkaCluster cluster, Map<ConfigResource, Config> configs) {
+    public static Mono<List<ExtendedAdminClient.SupportedFeatures>> getSupportedFeatures(AdminClient adminClient) {
+        List<ExtendedAdminClient.SupportedFeatures> supportedFeatures = new ArrayList<>();
+        return ClusterUtil.toMono(adminClient.describeCluster().controller())
+                .map(Node::id)
+                .map(id -> Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())))
+                .flatMap(brokerCR -> ClusterUtil.toMono(adminClient.describeConfigs(brokerCR).all())
+                        .map(s -> {
+                            supportedFeatures.add(getSupportedUpdateFeature(s));
+                            return supportedFeatures;
+                        }));
+    }
+
+    private static ExtendedAdminClient.SupportedFeatures getSupportedUpdateFeature(Map<ConfigResource, Config> configs) {
         String version = configs.values().stream()
                 .map(en -> en.entries().stream()
                         .filter(en1 -> en1.name().contains(CLUSTER_VERSION_PARAM_KEY))
@@ -57,9 +65,6 @@ public class ClusterUtil {
         try {
             return Float.parseFloat(version.split("-")[0]) <= 2.3f
                     ? ExtendedAdminClient.SupportedFeatures.ALTER_CONFIGS : ExtendedAdminClient.SupportedFeatures.INCREMENTAL_ALTER_CONFIGS;
-        } catch (NoSuchElementException el) {
-            log.error("Cluster version param not found {}", cluster.getName());
-            throw el;
         } catch (Exception e) {
             log.error("Conversion clusterVersion {} to float value failed", version);
             throw e;

+ 6 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -14,6 +14,7 @@ import org.apache.kafka.clients.admin.*;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigResource;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
@@ -32,6 +33,9 @@ import java.util.stream.Stream;
 @Log4j2
 public class KafkaService {
 
+    @Value("${kafka.admin-client-timeout}")
+    private int clientTimeout;
+
     private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true);
 
     private final ZookeeperService zookeeperService;
@@ -210,18 +214,9 @@ public class KafkaService {
     public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
         Properties properties = new Properties();
         properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
-        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
+        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
         AdminClient adminClient = AdminClient.create(properties);
-        return ClusterUtil.toMono(adminClient.describeCluster().controller())
-                .map(Node::id)
-                .map(id -> Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString())))
-                .flatMap(brokerCR -> ClusterUtil.toMono(adminClient.describeConfigs(brokerCR).all())
-                    .map(cfg -> ClusterUtil.getSupportedUpdateFeature(kafkaCluster, cfg))
-                    .map(u -> {
-                        List<ExtendedAdminClient.SupportedFeatures> supportedFeatures = Collections.singletonList(u);
-                        return new ExtendedAdminClient(adminClient, supportedFeatures);
-                    })
-                );
+        return ExtendedAdminClient.extendedAdminClient(adminClient);
     }
 
     private Mono<ExtendedAdminClient> isAdminClientConnected(ExtendedAdminClient adminClient) {

+ 3 - 1
kafka-ui-api/src/main/resources/application.yml

@@ -1,2 +1,4 @@
 zookeeper:
-  connection-timeout: 1000
+  connection-timeout: 1000
+kafka:
+  admin-client-timeout: 5000