From 34b5bb4a3d2fb559fe75e6a7a059b4a53d58943c Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Tue, 26 Jan 2021 15:07:31 +0300 Subject: [PATCH] Get subject schema by version from schema-registry --- .../ui/cluster/service/ClusterService.java | 4 ++ .../service/SchemaRegistryService.java | 17 ++++++- .../kafka/ui/rest/MetricsRestController.java | 6 +++ .../main/resources/swagger/kafka-ui-api.yaml | 51 ++++++++++++++++++- 4 files changed, 75 insertions(+), 3 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 2caef507d3..cd26f9b448 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -188,4 +188,8 @@ public class ClusterService { public Flux getSchemaSubjectVersions(String clusterName, String subjectName) { return schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName); } + + public Flux getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) { + return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subjectName, version); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java index 2e39dc3804..92da1dfab2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.cluster.service; import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.model.SubjectSchema; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.stereotype.Service; @@ -14,9 +15,12 @@ import reactor.core.publisher.Flux; public class SchemaRegistryService { private final ClustersStorage clustersStorage; public static final String URL_SUBJECTS = "/subjects"; + public static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions"; + public static final String URL_SUBJECT = "/subjects/{subjectName}/versions/{version}"; public Flux getAllSchemaSubjects(String clusterName) { KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); +// todo: use it as a bean WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); return webClient.get() .uri(URL_SUBJECTS) @@ -26,11 +30,20 @@ public class SchemaRegistryService { public Flux getSchemaSubjectVersions(String clusterName, String subjectName) { KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); +// todo: use it as a bean WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); - String url = "%s/%s/versions".formatted(URL_SUBJECTS, subjectName); return webClient.get() - .uri(url) + .uri(URL_SUBJECT_VERSIONS, subjectName) .retrieve() .bodyToFlux(Integer.class); } + + public Flux getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) { + KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); + WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); + return webClient.get() + .uri(URL_SUBJECT, subjectName, version) + .retrieve() + .bodyToFlux(SubjectSchema.class); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 0e1fc9fb6a..975c797d55 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -100,6 +100,12 @@ public class MetricsRestController implements ApiClustersApi { .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list } + @Override + public Mono>> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) { + Flux flux = clusterService.getSchemaSubjectByVersion(clusterName, subjectName, version); + return Mono.just(ResponseEntity.ok(flux)); + } + @Override public Mono>> getSchemaSubjects(String clusterName, ServerWebExchange exchange) { Flux subjects = clusterService.getSchemaSubjects(clusterName); diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index c9f5007762..f053fa8c20 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -384,6 +384,38 @@ paths: items: type: integer + /api/clusters/{clusterName}/schema/subjects/{subjectName}/versions/{version}: + get: + tags: + - /api/clusters + summary: get schema of subject by version from schema registry + operationId: getSchemaSubjectByVersion + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: subjectName + in: path + required: true + schema: + type: string + - name: version + in: path + required: true + schema: + type: integer + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/SubjectSchema' + components: schemas: Cluster: @@ -706,4 +738,21 @@ components: value: type: string additionalProperties: - type: number \ No newline at end of file + type: number + + SubjectSchema: + type: object + properties: + subject: + type: string + version: + type: string + id: + type: integer + schema: + type: string + required: + - subject + - version + - id + - schema \ No newline at end of file