diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeIsNotSupportedException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeIsNotSupportedException.java new file mode 100644 index 0000000000..eabaaf97e5 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeIsNotSupportedException.java @@ -0,0 +1,12 @@ +package com.provectus.kafka.ui.exception; + +public class SchemaTypeIsNotSupportedException extends UnprocessableEntityException { + + private static final String REQUIRED_SCHEMA_REGISTRY_VERSION = "5.5.0"; + + public SchemaTypeIsNotSupportedException() { + super(String.format("Current version of Schema Registry does " + + "not support provided schema type," + + " version %s or later is required here.", REQUIRED_SCHEMA_REGISTRY_VERSION)); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index 02db53f23b..ccf21cc125 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -5,6 +5,7 @@ import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY; import com.provectus.kafka.ui.exception.DuplicateEntityException; import com.provectus.kafka.ui.exception.SchemaNotFoundException; +import com.provectus.kafka.ui.exception.SchemaTypeIsNotSupportedException; import com.provectus.kafka.ui.exception.UnprocessableEntityException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.mapper.ClusterMapper; @@ -51,6 +52,8 @@ public class SchemaRegistryService { private static final String URL_SUBJECT_BY_VERSION = "/subjects/{schemaName}/versions/{version}"; private static final String LATEST = "latest"; + private static final String UNRECOGNIZED_FIELD_SCHEMA_TYPE = "Unrecognized field: schemaType"; + private final ClusterMapper mapper; private final WebClient webClient; @@ -195,7 +198,9 @@ public class SchemaRegistryService { .retrieve() .onStatus(UNPROCESSABLE_ENTITY::equals, r -> r.bodyToMono(ErrorResponse.class) - .flatMap(x -> Mono.error(new UnprocessableEntityException(x.getMessage())))) + .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage()) + ? new SchemaTypeIsNotSupportedException() + : new UnprocessableEntityException(x.getMessage())))) .bodyToMono(SubjectIdResponse.class); } @@ -213,7 +218,9 @@ public class SchemaRegistryService { .onStatus(NOT_FOUND::equals, res -> Mono.empty()) .onStatus(UNPROCESSABLE_ENTITY::equals, r -> r.bodyToMono(ErrorResponse.class) - .flatMap(x -> Mono.error(new UnprocessableEntityException(x.getMessage())))) + .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage()) + ? new SchemaTypeIsNotSupportedException() + : new UnprocessableEntityException(x.getMessage())))) .bodyToMono(SchemaSubjectDTO.class) .filter(s -> Objects.isNull(s.getId())) .switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists"))); @@ -321,4 +328,8 @@ public class SchemaRegistryService { .uri(schemaRegistry.getFirstUrl() + uri, params) .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers)); } + + private boolean isUnrecognizedFieldSchemaTypeMessage(String errorMessage) { + return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE); + } }