瀏覽代碼

More fixes

German Osin 5 年之前
父節點
當前提交
7b7c016b3c

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -151,6 +151,7 @@ public class ClusterService {
         return clustersStorage.getClusterByName(clusterName).map(cl ->
                 topicFormData
                         .flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
+                        .map(clusterMapper::toTopic)
                         .flatMap(t -> updateCluster(t, clusterName, cl))
         )
                 .orElse(Mono.empty());

+ 0 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -213,24 +213,6 @@ public class ClusterUtil {
         }
     }
 
-    public static Topic convertToTopic(InternalTopic internalTopic) {
-        Topic topic = new Topic();
-        topic.setName(internalTopic.getName());
-//        List<Partition> partitions = internalTopic.getPartitions().stream().flatMap(s -> {
-//            Partition partition = new Partition();
-//            partition.setPartition(s.getPartition());
-//            partition.setLeader(s.getLeader());
-//            partition.setReplicas(s.getReplicas().stream().flatMap(r -> {
-//                Replica replica = new Replica();
-//                replica.setBroker(r.getBroker());
-//                return Stream.of(replica);
-//            }).collect(Collectors.toList()));
-//            return Stream.of(partition);
-//        }).collect(Collectors.toList());
-//        topic.setPartitions(partitions);
-        return topic;
-    }
-
     public static <T, R> Map<T, R> toSingleMap (Stream<Map<T, R>> streamOfMaps) {
         return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
                 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();

+ 3 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -265,7 +265,7 @@ public class KafkaService {
     }
 
     @SneakyThrows
-    public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
+    public Mono<InternalTopic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
         ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
         return getOrCreateAdminClient(cluster)
                 .flatMap(ac -> {
@@ -281,11 +281,10 @@ public class KafkaService {
 
 
 
-    private Mono<Topic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
+    private Mono<InternalTopic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
         return getTopicsData(ac.getAdminClient())
                 .map(s -> s.stream()
-                        .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow())
-                .map(ClusterUtil::convertToTopic);
+                        .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow());
     }
 
     private Mono<String> incrementalAlterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) {