Parcourir la source

updating of topic done properly

Roman Nedzvetskiy il y a 5 ans
Parent
commit
5e7602b46c

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.common.Node;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
@@ -65,7 +66,9 @@ public class ClusterService {
     public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData) {
         KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
         if (cluster == null) return null;
-        return topicFormData.flatMap(t -> kafkaService.updateTopic(cluster, topicName, t)).map(ResponseEntity::ok);
+        return ClusterUtil.toMono(cluster.getAdminClient().describeCluster().controller())
+                .map(Node::id)
+                .flatMap(id -> topicFormData.flatMap(t -> kafkaService.updateTopic(cluster, topicName, t, id)).map(ResponseEntity::ok));
     }
 
     @SneakyThrows

+ 38 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -27,6 +27,8 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
 @Log4j2
 public class KafkaService {
 
+    private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
+
     @SneakyThrows
     @Async
     public void loadClusterMetrics(KafkaCluster kafkaCluster) {
@@ -75,28 +77,25 @@ public class KafkaService {
         );
     }
 
-    public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
-        return ClusterUtil.toMono(cluster.getAdminClient().describeCluster().controller()).map(Node::id)
-                .flatMap(n -> {
-                    ConfigResource cr = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
-
-                    return ClusterUtil.toMono(cluster.getAdminClient().describeConfigs(Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, n))).all())
-                            .flatMap(c -> {
-                                String e = c.entrySet().stream().map(en -> en.getValue().entries().stream().filter(en1 -> en1.name().contains("inter.broker.protocol.version")).findFirst().orElseThrow()).findFirst().orElseThrow().value();
-                                if (Float.parseFloat(e.split("-")[0]) <= 2.3f) {
-                                    List<ConfigEntry> ce = topicFormData.getConfigs().entrySet().stream()
-                                            .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))).collect(Collectors.toList());
-                                    Config config = new Config(ce);
-                                    Map<ConfigResource, Config> map = Collections.singletonMap(cr, config);
-                                    cluster.getAdminClient().alterConfigs(map);
-                                } else {
-                                    List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream()
-                                            .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList());
-                                    cluster.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(cr, listOp));
-                                }
-                                return ClusterUtil.toMono(cluster.getAdminClient().describeTopics(Collections.singletonList(topicName)).all()).map(t -> collectTopicData(cluster, t.get(topicName)));
-                            });
-                            });
+    public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData, Integer id) {
+        ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
+        List<ConfigResource> brokerCR = Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString()));
+        return ClusterUtil.toMono(cluster.getAdminClient().describeConfigs(brokerCR).all())
+                .flatMap(c -> {
+                    if (oldClusterVersion(c, cluster)) {
+                        List<ConfigEntry> configEntries = topicFormData.getConfigs().entrySet().stream()
+                                .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))).collect(Collectors.toList());
+                        Config config = new Config(configEntries);
+                        Map<ConfigResource, Config> map = Collections.singletonMap(topicCR, config);
+                        cluster.getAdminClient().alterConfigs(map);
+                    } else {
+                        List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream()
+                                .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList());
+                        cluster.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCR, listOp));
+                    }
+                    return ClusterUtil.toMono(cluster.getAdminClient().describeTopics(Collections.singletonList(topicName)).all())
+                            .map(t -> collectTopicData(cluster, t.get(topicName)));
+                });
     }
 
     @SneakyThrows
@@ -294,4 +293,21 @@ public class KafkaService {
                 .next()
                 .get();
     }
+
+    private boolean oldClusterVersion (Map<ConfigResource, Config> configs, KafkaCluster cluster) {
+        String clusterVersion = configs.values().stream()
+                .map(en -> en.entries().stream()
+                        .filter(en1 -> en1.name().contains(CLUSTER_VERSION_PARAM_KEY))
+                        .findFirst().orElseThrow())
+                .findFirst().orElseThrow().value();
+        try {
+            return Float.parseFloat(clusterVersion.split("-")[0]) <= 2.3f;
+        } catch (NoSuchElementException el) {
+            log.error("Cluster version param not found {}", cluster.getName());
+            throw el;
+        } catch (Exception e) {
+            log.error("Conversion clusterVersion {} to float value failed", clusterVersion);
+            throw e;
+        }
+    }
 }