|
@@ -1,6 +1,7 @@
|
|
|
package com.provectus.kafka.ui.kafka;
|
|
|
|
|
|
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
+import com.provectus.kafka.ui.cluster.util.ClusterUtil;
|
|
|
import com.provectus.kafka.ui.model.*;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.SneakyThrows;
|
|
@@ -15,6 +16,8 @@ import org.springframework.stereotype.Service;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
import java.util.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
import static com.provectus.kafka.ui.kafka.KafkaConstants.*;
|
|
|
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
|
|
@@ -72,6 +75,30 @@ public class KafkaService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
|
|
|
+ return ClusterUtil.toMono(cluster.getAdminClient().describeCluster().controller()).map(Node::id)
|
|
|
+ .flatMap(n -> {
|
|
|
+ ConfigResource cr = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
|
|
|
+
|
|
|
+ return ClusterUtil.toMono(cluster.getAdminClient().describeConfigs(Collections.singletonList(new ConfigResource(ConfigResource.Type.BROKER, n))).all())
|
|
|
+ .flatMap(c -> {
|
|
|
+ String e = c.entrySet().stream().map(en -> en.getValue().entries().stream().filter(en1 -> en1.name().contains("inter.broker.protocol.version")).findFirst().orElseThrow()).findFirst().orElseThrow().value();
|
|
|
+ if (Float.parseFloat(e.split("-")[0]) <= 2.3f) {
|
|
|
+ List<ConfigEntry> ce = topicFormData.getConfigs().entrySet().stream()
|
|
|
+ .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))).collect(Collectors.toList());
|
|
|
+ Config config = new Config(ce);
|
|
|
+ Map<ConfigResource, Config> map = Collections.singletonMap(cr, config);
|
|
|
+ cluster.getAdminClient().alterConfigs(map);
|
|
|
+ } else {
|
|
|
+ List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream()
|
|
|
+ .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList());
|
|
|
+ cluster.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(cr, listOp));
|
|
|
+ }
|
|
|
+ return ClusterUtil.toMono(cluster.getAdminClient().describeTopics(Collections.singletonList(topicName)).all()).map(t -> collectTopicData(cluster, t.get(topicName)));
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
@SneakyThrows
|
|
|
private String getClusterId(KafkaCluster kafkaCluster) {
|
|
|
return kafkaCluster.getAdminClient().describeCluster().clusterId().get();
|