diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaCompatibilityException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaCompatibilityException.java new file mode 100644 index 0000000000..6ea5d12d81 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaCompatibilityException.java @@ -0,0 +1,12 @@ +package com.provectus.kafka.ui.exception; + +public class SchemaCompatibilityException extends CustomBaseException { + public SchemaCompatibilityException(String message) { + super(message); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.UNPROCESSABLE_ENTITY; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index 085e6c39be..5207c0cdd1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -1,8 +1,10 @@ package com.provectus.kafka.ui.service; +import static org.springframework.http.HttpStatus.CONFLICT; import static org.springframework.http.HttpStatus.NOT_FOUND; import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY; +import com.provectus.kafka.ui.exception.SchemaCompatibilityException; import com.provectus.kafka.ui.exception.SchemaFailedToDeleteException; import com.provectus.kafka.ui.exception.SchemaNotFoundException; import com.provectus.kafka.ui.exception.SchemaTypeNotSupportedException; @@ -65,6 +67,7 @@ public class SchemaRegistryService { private static final String LATEST = "latest"; private static final String UNRECOGNIZED_FIELD_SCHEMA_TYPE = "Unrecognized field: schemaType"; + private static final String INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA = "incompatible with an earlier schema"; private final ClusterMapper mapper; private final WebClient webClient; @@ -164,18 +167,18 @@ public class SchemaRegistryService { private Mono deleteSchemaSubject(KafkaCluster cluster, String schemaName, String version) { return configuredWebClient( - cluster, - HttpMethod.DELETE, - SchemaRegistryService.URL_SUBJECT_BY_VERSION, - List.of(schemaName, version)) - .retrieve() - .onStatus(NOT_FOUND::equals, - throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) - ) - .toBodilessEntity() - .then() - .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), - () -> this.deleteSchemaSubject(cluster, schemaName, version)))); + cluster, + HttpMethod.DELETE, + SchemaRegistryService.URL_SUBJECT_BY_VERSION, + List.of(schemaName, version)) + .retrieve() + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) + ) + .toBodilessEntity() + .then() + .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), + () -> this.deleteSchemaSubject(cluster, schemaName, version)))); } public Mono deleteSchemaSubjectEntirely(KafkaCluster cluster, @@ -216,20 +219,29 @@ public class SchemaRegistryService { Mono newSchemaSubject, KafkaCluster cluster) { return configuredWebClient( - cluster, - HttpMethod.POST, - URL_SUBJECT_VERSIONS, subject) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) - .retrieve() - .onStatus(UNPROCESSABLE_ENTITY::equals, - r -> r.bodyToMono(ErrorResponse.class) - .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage()) - ? new SchemaTypeNotSupportedException() - : new UnprocessableEntityException(x.getMessage())))) - .bodyToMono(SubjectIdResponse.class) - .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), - () -> submitNewSchema(subject, newSchemaSubject, cluster)))); + cluster, + HttpMethod.POST, + URL_SUBJECT_VERSIONS, subject) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) + .retrieve() + .onStatus(status -> UNPROCESSABLE_ENTITY.equals(status) || CONFLICT.equals(status), + r -> r.bodyToMono(ErrorResponse.class) + .flatMap(this::getMonoError)) + .bodyToMono(SubjectIdResponse.class) + .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), + () -> submitNewSchema(subject, newSchemaSubject, cluster)))); + } + + @NotNull + private Mono getMonoError(ErrorResponse x) { + if (isUnrecognizedFieldSchemaTypeMessage(x.getMessage())) { + return Mono.error(new SchemaTypeNotSupportedException()); + } else if (isIncompatibleSchemaMessage(x.getMessage())) { + return Mono.error(new SchemaCompatibilityException(x.getMessage())); + } else { + return Mono.error(new UnprocessableEntityException(x.getMessage())); + } } @NotNull @@ -337,6 +349,10 @@ public class SchemaRegistryService { return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE); } + private boolean isIncompatibleSchemaMessage(String message) { + return message.contains(INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA); + } + private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method, String uri) { return configuredWebClient(cluster, method, uri, Collections.emptyList(), diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java index 354b5eb4a5..190831da98 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java @@ -5,7 +5,9 @@ import com.provectus.kafka.ui.model.NewSchemaSubjectDTO; import com.provectus.kafka.ui.model.SchemaSubjectDTO; import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO; import com.provectus.kafka.ui.model.SchemaTypeDTO; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Objects; import java.util.UUID; import lombok.extern.slf4j.Slf4j; import lombok.val; @@ -13,10 +15,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.test.web.reactive.server.EntityExchangeResult; import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.web.reactive.function.BodyInserters; +import org.testcontainers.shaded.org.hamcrest.MatcherAssert; +import org.testcontainers.shaded.org.hamcrest.Matchers; import reactor.core.publisher.Mono; @Slf4j @@ -102,6 +107,61 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest { Assertions.assertEquals("1", dto.getVersion()); } + @Test + void shouldReturnCorrectMessageWhenIncompatibleSchema() { + String schema = "{\"subject\":\"%s\",\"schemaType\":\"JSON\",\"schema\":" + + "\"{\\\"type\\\": \\\"string\\\"," + "\\\"properties\\\": " + + "{\\\"f1\\\": {\\\"type\\\": \\\"integer\\\"}}}" + + "\"}"; + String schema2 = "{\"subject\":\"%s\"," + "\"schemaType\":\"JSON\",\"schema\":" + + "\"{\\\"type\\\": \\\"string\\\"," + "\\\"properties\\\": " + + "{\\\"f1\\\": {\\\"type\\\": \\\"string\\\"}," + + "\\\"f2\\\": {" + "\\\"type\\\": \\\"string\\\"}}}" + + "\"}"; + + SchemaSubjectDTO dto = + webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(String.format(schema, subject))) + .exchange() + .expectStatus() + .isOk() + .expectBody(SchemaSubjectDTO.class) + .returnResult() + .getResponseBody(); + + Assertions.assertNotNull(dto); + Assertions.assertEquals("1", dto.getVersion()); + + webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(String.format(schema2, subject))) + .exchange() + .expectStatus().isEqualTo(HttpStatus.UNPROCESSABLE_ENTITY) + .expectBody().consumeWith(body -> { + String responseBody = new String(Objects.requireNonNull(body.getResponseBody()), StandardCharsets.UTF_8); + MatcherAssert.assertThat("Must return correct message incompatible schema", + responseBody, + Matchers.containsString("Schema being registered is incompatible with an earlier schema")); + }); + + dto = webTestClient + .get() + .uri("/api/clusters/{clusterName}/schemas/{subject}/latest", LOCAL, subject) + .exchange() + .expectStatus().isOk() + .expectBody(SchemaSubjectDTO.class) + .returnResult() + .getResponseBody(); + + Assertions.assertNotNull(dto); + Assertions.assertEquals("1", dto.getVersion()); + } + @Test void shouldCreateNewProtobufSchema() { String schema =