Sfoglia il codice sorgente

Fixed test by removing updateCluster calls, fixed add topic

Roman Nedzvetskiy 5 anni fa
parent
commit
0ba466682c

+ 14 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -61,13 +61,25 @@ public class ClusterService {
     public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
         return clustersStorage.getClusterByName(name).map(
                 cluster -> kafkaService.createTopic(cluster, topicFormData)
+                .flatMap(t -> kafkaService.getUpdatedCluster(cluster)
+                    .map(c -> {
+                        clustersStorage.setKafkaCluster(name, c);
+                        return t;
+                    })
+                )
         ).orElse(Mono.empty()).map(clusterMapper::toTopic);
     }
 
     @SneakyThrows
     public Mono<ResponseEntity<Topic>> updateTopic(String clusterName, String topicName, Mono<TopicFormData> topicFormData) {
-        return clustersStorage.getClusterByName(clusterName).map(c ->
-                    topicFormData.flatMap(t -> kafkaService.updateTopic(c, topicName, t)).map(ResponseEntity::ok))
+        return clustersStorage.getClusterByName(clusterName).map(cl ->
+                    topicFormData.flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
+                            .flatMap(t -> kafkaService.getUpdatedCluster(cl)
+                                    .map(c -> {
+                                        clustersStorage.setKafkaCluster(clusterName, c);
+                                        return t;
+                                    })
+                    .map(ResponseEntity::ok)))
                 .orElse(Mono.empty());
     }
 

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -12,7 +12,10 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigResource;
 import reactor.core.publisher.Mono;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 

+ 3 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -22,10 +22,7 @@ import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -191,7 +188,6 @@ public class KafkaService {
     }
 
     private Mono<Topic> getUpdatedTopic (ExtendedAdminClient ac, String topicName, String clusterName) {
-        getUpdatedCluster(clustersStorage.getClusterByName(clusterName).orElseThrow());
         return getTopicsData(ac.getAdminClient())
                 .map(s -> s.stream()
                         .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow())
@@ -267,12 +263,8 @@ public class KafkaService {
 
 
     @SneakyThrows
-    private Mono<Void> createTopic(AdminClient adminClient, NewTopic newTopic) {
-        return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
-                    .values()
-                    .values()
-                    .iterator()
-                    .next());
+    private Mono<KafkaFuture<Void>> createTopic(AdminClient adminClient, NewTopic newTopic) {
+        return Mono.just(adminClient.createTopics(Collections.singletonList(newTopic)).all());
     }
 
     private Mono<Void> incrementalAlterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) {

+ 11 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/UpdateTopicTest.java

@@ -22,6 +22,7 @@ import org.springframework.http.MediaType;
 import org.springframework.test.context.junit4.SpringRunner;
 import org.springframework.test.util.ReflectionTestUtils;
 import org.springframework.test.web.reactive.server.WebTestClient;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.KafkaContainer;
 import reactor.core.publisher.Mono;
 
@@ -30,7 +31,7 @@ import java.util.Map;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
-@AutoConfigureWebTestClient
+@AutoConfigureWebTestClient(timeout = "10000000")
 public class UpdateTopicTest {
 
     TopicFormData topicFormData = new TopicFormData();
@@ -44,7 +45,7 @@ public class UpdateTopicTest {
     private String PARAM_TO_CHANGE_VALUE = "10485761";
 
     private final String CLUSTER_NAME = "local";
-    private final String TOPIC_NAME = "messages";
+    private final String TOPIC_NAME = "messages1";
 
     private String urlUpdate;
     private String urlSettings;
@@ -66,6 +67,9 @@ public class UpdateTopicTest {
     @ClassRule
     public static KafkaContainer kafka = new KafkaContainer();
 
+    @ClassRule
+    public static GenericContainer zk = new GenericContainer("confluentinc/cp-zookeeper:5.1.0");
+
     @Before
     public void prepareParams () {
         urlUpdate = String.format(URL_TEMPLATE_UPDATE, CLUSTER_NAME, TOPIC_NAME);
@@ -82,7 +86,6 @@ public class UpdateTopicTest {
         try {
             webTestClient.post().uri(urlCreate).accept(MediaType.APPLICATION_JSON).body(topicFormDataCreateMono, TopicFormData.class)
                     .exchange().returnResult(Topic.class).getResponseBody().blockLast();
-            clustersStorage.setKafkaCluster(CLUSTER_NAME, metricsUpdateService.updateMetrics(kafkaCluster).block());
             webTestClient.put().uri(urlUpdate).accept(MediaType.APPLICATION_JSON).body(topicFormDataMono, TopicFormData.class)
                     .exchange().returnResult(Topic.class).getResponseBody().blockLast();
             Assert.assertEquals(webTestClient.get().uri(urlSettings).accept(MediaType.APPLICATION_JSON).exchange().returnResult(TopicConfig.class)
@@ -107,6 +110,11 @@ public class UpdateTopicTest {
     }
 
     private void startTestEnvironment() {
+         zk
+                .withNetwork(kafka.getNetwork())
+                .withNetworkAliases("zookeeper")
+                .withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
+         kafka.withExternalZookeeper("zookeeper:2181");
         kafka.start();
     }