ソースを参照

CreateTopic method updated -- now returns topicName, update topic method changed -- removed redundant param clusterName

Roman Nedzvetskiy 5 年 前
コミット
cc9b773beb

+ 10 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -37,6 +37,16 @@ public class ClusterUtil {
         }));
     }
 
+    public static Mono<String> toMono(KafkaFuture<Void> future, String topicName){
+        return Mono.create(sink -> future.whenComplete((res, ex)->{
+            if (ex!=null) {
+                sink.error(ex);
+            } else {
+                sink.success(topicName);
+            }
+        }));
+    }
+
     public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
         ConsumerGroup consumerGroup = new ConsumerGroup();
         consumerGroup.setClusterId(cluster.getId());

+ 5 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -165,7 +165,6 @@ public class KafkaService {
                     var tdw = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
                     return getTopicDescription(tdw.values().get(topicData.getName()), topicData.getName());
                 })
-                .switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
                 .map(ClusterUtil::mapToInternalTopic)
                 .flatMap( t ->
                         loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
@@ -180,15 +179,15 @@ public class KafkaService {
                 .flatMap(ac -> {
                     if (ac.getSupportedFeatures().contains(ExtendedAdminClient.SupportedFeatures.INCREMENTAL_ALTER_CONFIGS)) {
                         return incrementalAlterConfig(topicFormData, topicCR, ac)
-                                .flatMap(c -> getUpdatedTopic(ac, topicName, cluster.getName()));
+                                .flatMap(c -> getUpdatedTopic(ac, topicName));
                     } else {
                         return alterConfig(topicFormData, topicCR, ac)
-                                .flatMap(c -> getUpdatedTopic(ac, topicName, cluster.getName()));
+                                .flatMap(c -> getUpdatedTopic(ac, topicName));
                     }
                 });
     }
 
-    private Mono<Topic> getUpdatedTopic (ExtendedAdminClient ac, String topicName, String clusterName) {
+    private Mono<Topic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
         return getTopicsData(ac.getAdminClient())
                 .map(s -> s.stream()
                         .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow())
@@ -264,8 +263,8 @@ public class KafkaService {
 
 
     @SneakyThrows
-    private Mono<KafkaFuture<Void>> createTopic(AdminClient adminClient, NewTopic newTopic) {
-        return Mono.just(adminClient.createTopics(Collections.singletonList(newTopic)).values().get(newTopic.name()))
+    private Mono<String> createTopic(AdminClient adminClient, NewTopic newTopic) {
+        return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(), newTopic.name())
                 .onErrorResume(e -> {
                     log.error(new Exception(e));
                     return Mono.empty();