Handle errors when creating topics and return custom one as BAD_REQUEST (#670)
This commit is contained in:
parent
41f9cb07f2
commit
5f4de1313d
3 changed files with 80 additions and 4 deletions
|
@ -0,0 +1,13 @@
|
|||
package com.provectus.kafka.ui.exception;
|
||||
|
||||
public class TopicMetadataException extends CustomBaseException {
|
||||
|
||||
public TopicMetadataException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ErrorCode getErrorCode() {
|
||||
return ErrorCode.INVALID_ENTITY_STATE;
|
||||
}
|
||||
}
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui.service;
|
||||
|
||||
import com.provectus.kafka.ui.exception.TopicMetadataException;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.model.CreateTopicMessage;
|
||||
import com.provectus.kafka.ui.model.ExtendedAdminClient;
|
||||
|
@ -261,10 +262,12 @@ public class KafkaService {
|
|||
topicData.getReplicationFactor().shortValue());
|
||||
newTopic.configs(topicData.getConfigs());
|
||||
return createTopic(adminClient, newTopic).map(v -> topicData);
|
||||
}).flatMap(
|
||||
topicData ->
|
||||
getTopicsData(adminClient, Collections.singleton(topicData.getName()))
|
||||
.next()
|
||||
})
|
||||
.onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage())))
|
||||
.flatMap(
|
||||
topicData ->
|
||||
getTopicsData(adminClient, Collections.singleton(topicData.getName()))
|
||||
.next()
|
||||
).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
|
||||
.flatMap(t ->
|
||||
loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package com.provectus.kafka.ui;
|
||||
|
||||
import com.provectus.kafka.ui.model.TopicCreation;
|
||||
import java.util.UUID;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.web.reactive.server.WebTestClient;
|
||||
|
||||
@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
|
||||
@Log4j2
|
||||
@AutoConfigureWebTestClient(timeout = "10000")
|
||||
public class KafkaTopicCreateTests extends AbstractBaseTest {
|
||||
@Autowired
|
||||
private WebTestClient webTestClient;
|
||||
private TopicCreation topicCreation;
|
||||
|
||||
@BeforeEach
|
||||
public void setUpBefore() {
|
||||
this.topicCreation = new TopicCreation()
|
||||
.replicationFactor(1)
|
||||
.partitions(3)
|
||||
.name(UUID.randomUUID().toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateNewTopicSuccessfully() {
|
||||
webTestClient.post()
|
||||
.uri("/api/clusters/{clusterName}/topics", LOCAL)
|
||||
.bodyValue(topicCreation)
|
||||
.exchange()
|
||||
.expectStatus()
|
||||
.isOk();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldReturn400IfTopicAlreadyExists() {
|
||||
TopicCreation topicCreation = new TopicCreation()
|
||||
.replicationFactor(1)
|
||||
.partitions(3)
|
||||
.name(UUID.randomUUID().toString());
|
||||
|
||||
webTestClient.post()
|
||||
.uri("/api/clusters/{clusterName}/topics", LOCAL)
|
||||
.bodyValue(topicCreation)
|
||||
.exchange()
|
||||
.expectStatus()
|
||||
.isOk();
|
||||
|
||||
webTestClient.post()
|
||||
.uri("/api/clusters/{clusterName}/topics", LOCAL)
|
||||
.bodyValue(topicCreation)
|
||||
.exchange()
|
||||
.expectStatus()
|
||||
.isBadRequest();
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue