From 82f879c916d9de0146ff3f1184ae340ea9bb40f5 Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Fri, 26 Feb 2021 10:41:08 +0300 Subject: [PATCH] Return id, version, schema, and subject after creating a new schema --- .../schemaregistry/SubjectIdResponse.java | 8 +++ .../service/SchemaRegistryService.java | 68 ++++++++----------- .../kafka/ui/rest/MetricsRestController.java | 6 +- .../main/resources/swagger/kafka-ui-api.yaml | 12 ++-- 4 files changed, 44 insertions(+), 50 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/SubjectIdResponse.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/SubjectIdResponse.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/SubjectIdResponse.java new file mode 100644 index 0000000000..3a6eefee27 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/SubjectIdResponse.java @@ -0,0 +1,8 @@ +package com.provectus.kafka.ui.cluster.model.schemaregistry; + +import lombok.Data; + +@Data +public class SubjectIdResponse { + private Integer id; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java index 232e25d026..5b6a22900a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -5,6 +5,7 @@ import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck; import com.provectus.kafka.ui.cluster.model.InternalCompatibilityLevel; +import com.provectus.kafka.ui.cluster.model.schemaregistry.SubjectIdResponse; import com.provectus.kafka.ui.model.CompatibilityCheckResponse; import com.provectus.kafka.ui.model.CompatibilityLevel; import com.provectus.kafka.ui.model.NewSchemaSubject; @@ -12,20 +13,20 @@ import com.provectus.kafka.ui.model.SchemaSubject; import java.util.Formatter; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpStatus; +import org.jetbrains.annotations.NotNull; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.Arrays; -import java.util.List; import java.util.Objects; +import java.util.function.Function; + +import static org.springframework.http.HttpStatus.NOT_FOUND; @Service @Log4j2 @@ -69,10 +70,8 @@ public class SchemaRegistryService { .map(cluster -> webClient.get() .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException(formatted("No such schema %s")) - ) + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted("No such schema %s")) ).bodyToFlux(Integer.class) ).orElse(Flux.error(new NotFoundException("No such cluster"))); } @@ -90,12 +89,8 @@ public class SchemaRegistryService { .map(cluster -> webClient.get() .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException( - formatted("No such schema %s with version %s", schemaName, version) - ) - ) + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted("No such schema %s with version %s", schemaName, version)) ).bodyToMono(SchemaSubject.class) .zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName)) .map(tuple -> { @@ -105,7 +100,7 @@ public class SchemaRegistryService { return schema; }) ) - .orElseThrow(); + .orElse(Mono.error(new NotFoundException("No such cluster"))); } public Mono> deleteSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) { @@ -121,46 +116,40 @@ public class SchemaRegistryService { .map(cluster -> webClient.delete() .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException( - formatted("No such schema %s with version %s", schemaName, version) - ) - ) + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted("No such schema %s with version %s", schemaName, version)) ).toBodilessEntity() ).orElse(Mono.error(new NotFoundException("No such cluster"))); } - public Mono> deleteSchemaSubject(String clusterName, String schemaName) { + public Mono> deleteSchemaSubjectEntirely(String clusterName, String schemaName) { return clustersStorage.getClusterByName(clusterName) .map(cluster -> webClient.delete() .uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException( - formatted("No such schema %s", schemaName) - ) - ) + .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName)) ) .toBodilessEntity()) .orElse(Mono.error(new NotFoundException("No such cluster"))); } - public Mono> createNewSubject(String clusterName, String schemaName, Mono newSchemaSubject) { - return clustersStorage.getClusterByName(clusterName) + public Mono createNewSchema(String clusterName, String schemaName, Mono newSchemaSubject) { + var response = clustersStorage.getClusterByName(clusterName) .map(cluster -> webClient.post() .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName) .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException(formatted("No such schema %s", schemaName))) - ) - .toEntity(SchemaSubject.class) + .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName))) + .bodyToMono(SubjectIdResponse.class) .log()) .orElse(Mono.error(new NotFoundException("No such cluster"))); + return response.then(getLatestSchemaSubject(clusterName, schemaName)); + } + + @NotNull + private Function> throwIfNotFoundStatus(String formatted) { + return resp -> Mono.error(new NotFoundException(formatted)); } /** @@ -178,8 +167,8 @@ public class SchemaRegistryService { .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class)) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName)))) + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted("No such schema %s", schemaName))) .bodyToMono(Void.class); }).orElse(Mono.error(new NotFoundException("No such cluster"))); } @@ -217,8 +206,7 @@ public class SchemaRegistryService { .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName)))) + .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName))) .bodyToMono(InternalCompatibilityCheck.class) .map(mapper::toCompatibilityCheckResponse) .log() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 7b150c8f43..05ba6d28d5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -138,14 +138,16 @@ public class MetricsRestController implements ApiClustersApi { @Override public Mono> deleteSchema(String clusterName, String subjectName, ServerWebExchange exchange) { - return schemaRegistryService.deleteSchemaSubject(clusterName, subjectName); + return schemaRegistryService.deleteSchemaSubjectEntirely(clusterName, subjectName); } @Override public Mono> createNewSchema(String clusterName, String subject, @Valid Mono newSchemaSubject, ServerWebExchange exchange) { - return schemaRegistryService.createNewSubject(clusterName, subject, newSchemaSubject); + return schemaRegistryService + .createNewSchema(clusterName, subject, newSchemaSubject) + .map(ResponseEntity::ok); } @Override diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 51139ebd13..bd54282ab0 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -980,6 +980,10 @@ components: type: string required: - id + - subject + - version + - schema + - compatibilityLevel NewSchemaSubject: type: object @@ -1005,14 +1009,6 @@ components: required: - compatibility -# CompatibilityLevelResponse: -# type: object -# properties: -# compatibilityLevel: -# type: string -# required: -# - compatibilityLevel - CompatibilityCheckResponse: type: object properties: