From 15f48e7649db5775ff0aa2719a0f3e33bc068c0c Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Mon, 1 Feb 2021 21:11:53 +0300 Subject: [PATCH] Add GET/DELETE actions for the latest schema version --- .../service/SchemaRegistryService.java | 39 ++++++++++----- .../kafka/ui/rest/MetricsRestController.java | 12 +++++ .../main/resources/swagger/kafka-ui-api.yaml | 49 +++++++++++++++++++ 3 files changed, 89 insertions(+), 11 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java index da8e6fe075..f6551829d1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -28,6 +28,7 @@ public class SchemaRegistryService { private static final String URL_SUBJECT = "/subjects/{subjectName}"; private static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions"; private static final String URL_SUBJECT_BY_VERSION = "/subjects/{subjectName}/versions/{version}"; + private static final String LATEST = "latest"; private final ClustersStorage clustersStorage; private final WebClient webClient; @@ -37,7 +38,9 @@ public class SchemaRegistryService { .map(cluster -> webClient.get() .uri(cluster.getSchemaRegistry() + URL_SUBJECTS) .retrieve() - .bodyToFlux(String.class)) + .onStatus(HttpStatus::is5xxServerError, ClientResponse::createException) + .bodyToFlux(String.class) + .doOnError(log::error)) .orElse(Flux.error(new NotFoundException("No such cluster"))); } @@ -52,6 +55,14 @@ public class SchemaRegistryService { } public Flux getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) { + return this.getSchemaSubject(clusterName, subjectName, String.valueOf(version)); + } + + public Flux getLatestSchemaSubject(String clusterName, String subjectName) { + return this.getSchemaSubject(clusterName, subjectName, LATEST); + } + + private Flux getSchemaSubject(String clusterName, String subjectName, String version) { return clustersStorage.getClusterByName(clusterName) .map(cluster -> webClient.get() .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version) @@ -62,6 +73,14 @@ public class SchemaRegistryService { } public Mono> deleteSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) { + return this.deleteSchemaSubject(clusterName, subjectName, String.valueOf(version)); + } + + public Mono> deleteLatestSchemaSubject(String clusterName, String subjectName) { + return this.deleteSchemaSubject(clusterName, subjectName, LATEST); + } + + private Mono> deleteSchemaSubject(String clusterName, String subjectName, String version) { return clustersStorage.getClusterByName(clusterName) .map(cluster -> webClient.delete() .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version) @@ -83,16 +102,14 @@ public class SchemaRegistryService { public Mono> createNewSubject(String clusterName, String subjectSchema, Mono newSchemaSubject) { return clustersStorage.getClusterByName(clusterName) - .map(cluster -> { - log.info("Submitting a new subject: {} to Schema Registry: {}", subjectSchema, cluster.getSchemaRegistry()); - return webClient.post() - .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) - .retrieve() - .onStatus(HttpStatus.INTERNAL_SERVER_ERROR::equals, ClientResponse::createException) - .toEntity(SchemaSubject.class); - }) + .map(cluster -> webClient.post() + .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) + .retrieve() + .onStatus(HttpStatus::isError, ClientResponse::createException) + .toEntity(SchemaSubject.class) + .doOnError(log::error)) .orElse(Mono.error(new NotFoundException("No such cluster"))); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 91fc0dd459..f7b4ac632f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -104,6 +104,12 @@ public class MetricsRestController implements ApiClustersApi { .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list } + @Override + public Mono>> getLatestSchemaByVersion(String clusterName, String schemaName, ServerWebExchange exchange) { + Flux flux = schemaRegistryService.getLatestSchemaSubject(clusterName, schemaName); + return Mono.just(ResponseEntity.ok(flux)).onErrorReturn(ResponseEntity.notFound().build()); + } + @Override public Mono>> getSchemaByVersion(String clusterName, String schemaName, Integer version, ServerWebExchange exchange) { Flux flux = schemaRegistryService.getSchemaSubjectByVersion(clusterName, schemaName, version); @@ -122,6 +128,12 @@ public class MetricsRestController implements ApiClustersApi { .onErrorReturn(ResponseEntity.notFound().build()); } + @Override + public Mono> deleteLatestSchema(String clusterName, String schemaName, ServerWebExchange exchange) { + return schemaRegistryService.deleteLatestSchemaSubject(clusterName, schemaName) + .onErrorReturn(ResponseEntity.notFound().build()); + } + @Override public Mono> deleteSchemaByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) { return schemaRegistryService.deleteSchemaSubjectByVersion(clusterName, subjectName, version) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 7829b34c8c..45fb60aea5 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -437,6 +437,55 @@ paths: 400: description: Bad request + /api/clusters/{clusterName}/schemas/{schemaName}/latest: + get: + tags: + - /api/clusters + summary: get the latest schema from Schema Registry service + operationId: getLatestSchemaByVersion + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: schemaName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/SchemaSubject' + delete: + tags: + - /api/clusters + summary: delete the latest schema from schema registry + operationId: deleteLatestSchema + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: schemaName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + 404: + description: Not found + + /api/clusters/{clusterName}/schemas/{schemaName}/versions/{version}: get: tags: