diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java new file mode 100644 index 0000000000..7ccceefe61 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/TopicMetadataException.java @@ -0,0 +1,13 @@ +package com.provectus.kafka.ui.exception; + +public class TopicMetadataException extends CustomBaseException { + + public TopicMetadataException(String message) { + super(message); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.INVALID_ENTITY_STATE; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index cc978d7a37..b3a2d92d56 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.service; +import com.provectus.kafka.ui.exception.TopicMetadataException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; @@ -261,10 +262,12 @@ public class KafkaService { topicData.getReplicationFactor().shortValue()); newTopic.configs(topicData.getConfigs()); return createTopic(adminClient, newTopic).map(v -> topicData); - }).flatMap( - topicData -> - getTopicsData(adminClient, Collections.singleton(topicData.getName())) - .next() + }) + .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage()))) + .flatMap( + topicData -> + getTopicsData(adminClient, Collections.singleton(topicData.getName())) + .next() ).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic"))) .flatMap(t -> loadTopicsConfig(adminClient, Collections.singletonList(t.getName())) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaTopicCreateTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaTopicCreateTests.java new file mode 100644 index 0000000000..51b11425c1 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaTopicCreateTests.java @@ -0,0 +1,60 @@ +package com.provectus.kafka.ui; + +import com.provectus.kafka.ui.model.TopicCreation; +import java.util.UUID; +import lombok.extern.log4j.Log4j2; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.web.reactive.server.WebTestClient; + +@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) +@Log4j2 +@AutoConfigureWebTestClient(timeout = "10000") +public class KafkaTopicCreateTests extends AbstractBaseTest { + @Autowired + private WebTestClient webTestClient; + private TopicCreation topicCreation; + + @BeforeEach + public void setUpBefore() { + this.topicCreation = new TopicCreation() + .replicationFactor(1) + .partitions(3) + .name(UUID.randomUUID().toString()); + } + + @Test + void shouldCreateNewTopicSuccessfully() { + webTestClient.post() + .uri("/api/clusters/{clusterName}/topics", LOCAL) + .bodyValue(topicCreation) + .exchange() + .expectStatus() + .isOk(); + } + + @Test + void shouldReturn400IfTopicAlreadyExists() { + TopicCreation topicCreation = new TopicCreation() + .replicationFactor(1) + .partitions(3) + .name(UUID.randomUUID().toString()); + + webTestClient.post() + .uri("/api/clusters/{clusterName}/topics", LOCAL) + .bodyValue(topicCreation) + .exchange() + .expectStatus() + .isOk(); + + webTestClient.post() + .uri("/api/clusters/{clusterName}/topics", LOCAL) + .bodyValue(topicCreation) + .exchange() + .expectStatus() + .isBadRequest(); + } +}