From a68fa153546ec7881b14ced53d7c581d60f8f2d8 Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Tue, 2 Mar 2021 15:27:24 +0300 Subject: [PATCH] Refactor registerNewSchema method --- .../service/SchemaRegistryService.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java index 2d49eca6f0..4f7b7cae49 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -138,29 +138,29 @@ public class SchemaRegistryService { .orElse(Mono.error(new NotFoundException("No such cluster"))); } + /** + * Checks whether the provided schema duplicates the previous or not, creates a new schema + * and then returns the whole content by requesting its latest version. + */ public Mono registerNewSchema(String clusterName, Mono newSchemaSubject) { return newSchemaSubject .flatMap(schema -> { Mono newSchema = Mono.just(new InternalNewSchema(schema.getSchema())); String subject = schema.getSubject(); - return createNewSchema(clusterName, subject, newSchema); + return clustersStorage.getClusterByName(clusterName) + .map(KafkaCluster::getSchemaRegistry) + .map(schemaRegistryUrl -> checkSchemaOnDuplicate(subject, newSchema, schemaRegistryUrl) + .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl)) + .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject)) + ) + .orElse(Mono.error(new NotFoundException("No such cluster"))); }); } - private Mono createNewSchema(String clusterName, String subject, Mono newSchemaSubject) { - return clustersStorage.getClusterByName(clusterName) - .map(KafkaCluster::getSchemaRegistry) - .map(schemaRegistryUrl -> checkSchemaOnDuplicate(subject, newSchemaSubject, schemaRegistryUrl) - .flatMap(s -> submitNewSchema(subject, newSchemaSubject, schemaRegistryUrl)) - .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject)) - ) - .orElse(Mono.error(new NotFoundException("No such cluster"))); - } - @NotNull - private Mono submitNewSchema(String subject, Mono newSchemaSubject, String schemaRegistry) { + private Mono submitNewSchema(String subject, Mono newSchemaSubject, String schemaRegistryUrl) { return webClient.post() - .uri(schemaRegistry + URL_SUBJECT_VERSIONS, subject) + .uri(schemaRegistryUrl + URL_SUBJECT_VERSIONS, subject) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) .retrieve() @@ -169,13 +169,14 @@ public class SchemaRegistryService { } @NotNull - private Mono checkSchemaOnDuplicate(String subject, Mono newSchemaSubject, String schemaRegistry) { + private Mono checkSchemaOnDuplicate(String subject, Mono newSchemaSubject, String schemaRegistryUrl) { return webClient.post() - .uri(schemaRegistry + URL_SUBJECT, subject) + .uri(schemaRegistryUrl + URL_SUBJECT, subject) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) .retrieve() .onStatus(NOT_FOUND::equals, res -> Mono.empty()) + .onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params"))) .bodyToMono(SchemaSubject.class) .filter(s -> Objects.isNull(s.getId())) .switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists"))); @@ -256,6 +257,8 @@ public class SchemaRegistryService { return "AVRO"; } else if (schema.contains("proto")) { return "PROTO"; + } else if (schema.contains("json")) { + return "JSON"; } else return schema; } }