Pārlūkot izejas kodu

Improve exception message about schema registry back compatibility

* [ISSUE-1250] Introduced new exception for attempt at creating new PROTOBUF/JSON schemas via a deprecated version of Schema Registry.

* [issue-1250] fixed formatting

* [ISSUE-1250] updated exception message
ValentinPrischepa 3 gadi atpakaļ
vecāks
revīzija
49285bee9b

+ 12 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/SchemaTypeIsNotSupportedException.java

@@ -0,0 +1,12 @@
+package com.provectus.kafka.ui.exception;
+
+public class SchemaTypeIsNotSupportedException extends UnprocessableEntityException {
+
+  private static final String REQUIRED_SCHEMA_REGISTRY_VERSION = "5.5.0";
+
+  public SchemaTypeIsNotSupportedException() {
+    super(String.format("Current version of Schema Registry does "
+            + "not support provided schema type,"
+            + " version %s or later is required here.", REQUIRED_SCHEMA_REGISTRY_VERSION));
+  }
+}

+ 13 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -5,6 +5,7 @@ import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
 
 import com.provectus.kafka.ui.exception.DuplicateEntityException;
 import com.provectus.kafka.ui.exception.SchemaNotFoundException;
+import com.provectus.kafka.ui.exception.SchemaTypeIsNotSupportedException;
 import com.provectus.kafka.ui.exception.UnprocessableEntityException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
@@ -51,6 +52,8 @@ public class SchemaRegistryService {
   private static final String URL_SUBJECT_BY_VERSION = "/subjects/{schemaName}/versions/{version}";
   private static final String LATEST = "latest";
 
+  private static final String UNRECOGNIZED_FIELD_SCHEMA_TYPE = "Unrecognized field: schemaType";
+
   private final ClusterMapper mapper;
   private final WebClient webClient;
 
@@ -195,7 +198,9 @@ public class SchemaRegistryService {
         .retrieve()
         .onStatus(UNPROCESSABLE_ENTITY::equals,
             r -> r.bodyToMono(ErrorResponse.class)
-                .flatMap(x -> Mono.error(new UnprocessableEntityException(x.getMessage()))))
+                .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage())
+                        ? new SchemaTypeIsNotSupportedException()
+                        : new UnprocessableEntityException(x.getMessage()))))
         .bodyToMono(SubjectIdResponse.class);
   }
 
@@ -213,7 +218,9 @@ public class SchemaRegistryService {
         .onStatus(NOT_FOUND::equals, res -> Mono.empty())
         .onStatus(UNPROCESSABLE_ENTITY::equals,
             r -> r.bodyToMono(ErrorResponse.class)
-                .flatMap(x -> Mono.error(new UnprocessableEntityException(x.getMessage()))))
+                .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage())
+                        ? new SchemaTypeIsNotSupportedException()
+                        : new UnprocessableEntityException(x.getMessage()))))
         .bodyToMono(SchemaSubjectDTO.class)
         .filter(s -> Objects.isNull(s.getId()))
         .switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists")));
@@ -321,4 +328,8 @@ public class SchemaRegistryService {
         .uri(schemaRegistry.getFirstUrl() + uri, params)
         .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers));
   }
+
+  private boolean isUnrecognizedFieldSchemaTypeMessage(String errorMessage) {
+    return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE);
+  }
 }