Browse Source

Added supportedCommands enum in cluster for getting correct method for topics settings update, changed request version from patch to put

Roman Nedzvetskiy 5 năm trước cách đây
mục cha
commit
84c905a910

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.cluster.model;
 
+import com.provectus.kafka.ui.cluster.util.SupportedCommands;
 import com.provectus.kafka.ui.model.*;
 import lombok.AccessLevel;
 import lombok.Data;
@@ -33,6 +34,7 @@ public class KafkaCluster {
 
     ZkClient zkClient;
     AdminClient adminClient;
+    List<SupportedCommands> supportedCommands;
     ServerStatus zookeeperStatus = ServerStatus.OFFLINE;
 
     Exception lastKafkaException;

+ 3 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -8,7 +8,6 @@ import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
-import org.apache.kafka.common.Node;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
@@ -63,12 +62,10 @@ public class ClusterService {
         return kafkaService.createTopic(cluster, topicFormData);
     }
 
-    public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData) {
+    public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData, Integer id) {
         KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
-        if (cluster == null) return null;
-        return ClusterUtil.toMono(cluster.getAdminClient().describeCluster().controller())
-                .map(Node::id)
-                .flatMap(id -> topicFormData.flatMap(t -> kafkaService.updateTopic(cluster, topicName, t, id)).map(ResponseEntity::ok));
+        if (cluster == null) return Mono.error(new Throwable("Cluster " + clusterName + " not found"));
+        return topicFormData.flatMap(t -> kafkaService.updateTopic(cluster, topicName, t, id)).map(ResponseEntity::ok);
     }
 
     @SneakyThrows

+ 26 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -2,15 +2,23 @@ package com.provectus.kafka.ui.cluster.util;
 
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.model.ConsumerGroup;
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.config.ConfigResource;
 import reactor.core.publisher.Mono;
 
 import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
+@Log4j2
 public class ClusterUtil {
 
+    private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
+
     public static <T> Mono<T> toMono(KafkaFuture<T> future){
         return Mono.create(sink -> future.whenComplete((res, ex)->{
             if (ex!=null) {
@@ -31,4 +39,22 @@ public class ClusterUtil {
         consumerGroup.setNumTopics(topics.size());
         return consumerGroup;
     }
+
+    public static void setSupportedCommands(KafkaCluster cluster, Map<ConfigResource, Config> configs) {
+        String version = configs.values().stream()
+                .map(en -> en.entries().stream()
+                        .filter(en1 -> en1.name().contains(CLUSTER_VERSION_PARAM_KEY))
+                        .findFirst().orElseThrow())
+                .findFirst().orElseThrow().value();
+        try {
+            cluster.getSupportedCommands().add(Float.parseFloat(version.split("-")[0]) <= 2.3f
+                    ? SupportedCommands.ALTER_CONFIGS : SupportedCommands.INCREMENTAL_ALTER_CONFIGS);
+        } catch (NoSuchElementException el) {
+            log.error("Cluster version param not found {}", cluster.getName());
+            throw el;
+        } catch (Exception e) {
+            log.error("Conversion clusterVersion {} to float value failed", version);
+            throw e;
+        }
+    }
 }

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/SupportedCommands.java

@@ -0,0 +1,7 @@
+package com.provectus.kafka.ui.cluster.util;
+
+public enum SupportedCommands {
+
+    INCREMENTAL_ALTER_CONFIGS,
+    ALTER_CONFIGS
+}

+ 40 - 53
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.kafka;
 
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
+import com.provectus.kafka.ui.cluster.util.SupportedCommands;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
@@ -27,8 +28,6 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
 @Log4j2
 public class KafkaService {
 
-    private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
-
     @SneakyThrows
     @Async
     public void loadClusterMetrics(KafkaCluster kafkaCluster) {
@@ -82,57 +81,26 @@ public class KafkaService {
         List<ConfigResource> brokerCR = Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString()));
         return ClusterUtil.toMono(cluster.getAdminClient().describeConfigs(brokerCR).all())
                 .flatMap(c -> {
-                    if (oldClusterVersion(c, cluster)) {
+                    if (cluster.getSupportedCommands().isEmpty()) {
+                        ClusterUtil.setSupportedCommands(cluster, c);
+                    }
+                    if (cluster.getSupportedCommands().contains(SupportedCommands.INCREMENTAL_ALTER_CONFIGS)) {
+                        List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream()
+                                .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList());
+                        cluster.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCR, listOp));
+                    } else {
                         List<ConfigEntry> configEntries = topicFormData.getConfigs().entrySet().stream()
                                 .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))).collect(Collectors.toList());
                         Config config = new Config(configEntries);
                         Map<ConfigResource, Config> map = Collections.singletonMap(topicCR, config);
                         cluster.getAdminClient().alterConfigs(map);
-                    } else {
-                        List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream()
-                                .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList());
-                        cluster.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCR, listOp));
                     }
                     return ClusterUtil.toMono(cluster.getAdminClient().describeTopics(Collections.singletonList(topicName)).all())
                             .map(t -> collectTopicData(cluster, t.get(topicName)));
                 });
     }
 
-    @SneakyThrows
-    private String getClusterId(KafkaCluster kafkaCluster) {
-        return kafkaCluster.getAdminClient().describeCluster().clusterId().get();
-    }
-
-    private boolean createAdminClient(KafkaCluster kafkaCluster) {
-        try {
-            Properties properties = new Properties();
-            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
-            properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
-            kafkaCluster.setAdminClient(AdminClient.create(properties));
-            kafkaCluster.setId(getClusterId(kafkaCluster));
-            kafkaCluster.getCluster().setId(kafkaCluster.getId());
-
-            return true;
-        } catch (Exception e) {
-            log.error(e);
-            kafkaCluster.setLastKafkaException(e);
-
-            return false;
-        }
-    }
 
-    private boolean isAdminClientConnected(KafkaCluster kafkaCluster) {
-        try {
-            getClusterId(kafkaCluster);
-
-            return true;
-        } catch (Exception e) {
-            log.error(e);
-            kafkaCluster.setLastKafkaException(e);
-
-            return false;
-        }
-    }
 
     @SneakyThrows
     private void loadTopicsData(KafkaCluster kafkaCluster) {
@@ -294,20 +262,39 @@ public class KafkaService {
                 .get();
     }
 
-    private boolean oldClusterVersion (Map<ConfigResource, Config> configs, KafkaCluster cluster) {
-        String clusterVersion = configs.values().stream()
-                .map(en -> en.entries().stream()
-                        .filter(en1 -> en1.name().contains(CLUSTER_VERSION_PARAM_KEY))
-                        .findFirst().orElseThrow())
-                .findFirst().orElseThrow().value();
+    private boolean createAdminClient(KafkaCluster kafkaCluster) {
         try {
-            return Float.parseFloat(clusterVersion.split("-")[0]) <= 2.3f;
-        } catch (NoSuchElementException el) {
-            log.error("Cluster version param not found {}", cluster.getName());
-            throw el;
+            Properties properties = new Properties();
+            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
+            properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
+            kafkaCluster.setAdminClient(AdminClient.create(properties));
+            kafkaCluster.setId(getClusterId(kafkaCluster));
+            kafkaCluster.getCluster().setId(kafkaCluster.getId());
+
+            return true;
         } catch (Exception e) {
-            log.error("Conversion clusterVersion {} to float value failed", clusterVersion);
-            throw e;
+            log.error(e.getMessage());
+            kafkaCluster.setLastKafkaException(e);
+
+            return false;
+        }
+    }
+
+    @SneakyThrows
+    private String getClusterId(KafkaCluster kafkaCluster) {
+        return kafkaCluster.getAdminClient().describeCluster().clusterId().get();
+    }
+
+    private boolean isAdminClientConnected(KafkaCluster kafkaCluster) {
+        try {
+            getClusterId(kafkaCluster);
+
+            return true;
+        } catch (Exception e) {
+            log.error(e.getMessage());
+            kafkaCluster.setLastKafkaException(e);
+
+            return false;
         }
     }
 }

+ 9 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -1,9 +1,12 @@
 package com.provectus.kafka.ui.rest;
 
 import com.provectus.kafka.ui.api.ApiClustersApi;
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.service.ClusterService;
+import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.Node;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -19,6 +22,8 @@ public class MetricsRestController implements ApiClustersApi {
 
     private final ClusterService clusterService;
 
+    private final ClustersStorage clustersStorage;
+
     @Override
     public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
         return clusterService.getClusters();
@@ -41,7 +46,10 @@ public class MetricsRestController implements ApiClustersApi {
 
     @Override
     public Mono<ResponseEntity<Topic>> updateTopic(String clusterId, String topicName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
-        return clusterService.updateTopic(clusterId, topicName, topicFormData);
+        var cluster = clustersStorage.getClusterByName(clusterId);
+        return ClusterUtil.toMono(cluster.getAdminClient().describeCluster().controller())
+                .map(Node::id)
+                .flatMap(id -> clusterService.updateTopic(clusterId, topicName, topicFormData, id));
     }
 
     @Override

+ 0 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZooKeeperConstants.java

@@ -7,7 +7,4 @@ public final class ZooKeeperConstants {
     public static int ONLINE = 1;
     public static int OFFLINE = 0;
 
-    public static int CONNECTION_TIMEOUT_MS = 1000;
-    public static int SESSION_TIMEOUT_MS = 3000;
-
 }

+ 5 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/zookeeper/ZookeeperService.java

@@ -4,6 +4,7 @@ import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.I0Itec.zkclient.ZkClient;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
@@ -12,6 +13,9 @@ import org.springframework.stereotype.Service;
 @Log4j2
 public class ZookeeperService {
 
+    @Value("${zookeeper.connection-timeout}")
+    private Integer sessionTimeout;
+
     @Async
     public void checkZookeeperStatus(KafkaCluster kafkaCluster) {
         log.debug("Start getting Zookeeper metrics for kafkaCluster: " + kafkaCluster.getName());
@@ -34,7 +38,7 @@ public class ZookeeperService {
 
     private boolean createZookeeperConnection(KafkaCluster kafkaCluster) {
         try {
-            kafkaCluster.setZkClient(new ZkClient(kafkaCluster.getZookeeper(), ZooKeeperConstants.CONNECTION_TIMEOUT_MS));
+            kafkaCluster.setZkClient(new ZkClient(kafkaCluster.getZookeeper(), sessionTimeout));
 
             return true;
         } catch (Exception e) {

+ 2 - 0
kafka-ui-api/src/main/resources/application.yml

@@ -0,0 +1,2 @@
+zookeeper:
+  connection-timeout: 1000

+ 1 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -141,7 +141,7 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/TopicDetails'
-    patch:
+    put:
       tags:
         - /api/clusters
       summary: updateTopic