Quellcode durchsuchen

Get subject schema by version from schema-registry

Ildar Almakaev vor 4 Jahren
Ursprung
Commit
34b5bb4a3d

+ 4 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -188,4 +188,8 @@ public class ClusterService {
     public Flux<Integer> getSchemaSubjectVersions(String clusterName, String subjectName) {
         return schemaRegistryService.getSchemaSubjectVersions(clusterName, subjectName);
     }
+
+    public Flux<SubjectSchema> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
+        return schemaRegistryService.getSchemaSubjectByVersion(clusterName, subjectName, version);
+    }
 }

+ 15 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.cluster.service;
 
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.model.SubjectSchema;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.stereotype.Service;
@@ -14,9 +15,12 @@ import reactor.core.publisher.Flux;
 public class SchemaRegistryService {
     private final ClustersStorage clustersStorage;
     public static final String URL_SUBJECTS = "/subjects";
+    public static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions";
+    public static final String URL_SUBJECT = "/subjects/{subjectName}/versions/{version}";
 
     public Flux<String> getAllSchemaSubjects(String clusterName) {
         KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
+//        todo: use it as a bean
         WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
         return webClient.get()
                 .uri(URL_SUBJECTS)
@@ -26,11 +30,20 @@ public class SchemaRegistryService {
 
     public Flux<Integer> getSchemaSubjectVersions(String clusterName, String subjectName) {
         KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
+//        todo: use it as a bean
         WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
-        String url = "%s/%s/versions".formatted(URL_SUBJECTS, subjectName);
         return webClient.get()
-                .uri(url)
+                .uri(URL_SUBJECT_VERSIONS, subjectName)
                 .retrieve()
                 .bodyToFlux(Integer.class);
     }
+
+    public Flux<SubjectSchema> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
+        KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
+        WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
+        return webClient.get()
+                .uri(URL_SUBJECT, subjectName, version)
+                .retrieve()
+                .bodyToFlux(SubjectSchema.class);
+    }
 }

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -100,6 +100,12 @@ public class MetricsRestController implements ApiClustersApi {
                 .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
     }
 
+    @Override
+    public Mono<ResponseEntity<Flux<SubjectSchema>>> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
+        Flux<SubjectSchema> flux = clusterService.getSchemaSubjectByVersion(clusterName, subjectName, version);
+        return Mono.just(ResponseEntity.ok(flux));
+    }
+
     @Override
     public Mono<ResponseEntity<Flux<String>>> getSchemaSubjects(String clusterName, ServerWebExchange exchange) {
         Flux<String> subjects = clusterService.getSchemaSubjects(clusterName);

+ 50 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -384,6 +384,38 @@ paths:
                 items:
                   type: integer
 
+  /api/clusters/{clusterName}/schema/subjects/{subjectName}/versions/{version}:
+    get:
+      tags:
+        - /api/clusters
+      summary: get schema of subject by version from schema registry
+      operationId: getSchemaSubjectByVersion
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: subjectName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: version
+          in: path
+          required: true
+          schema:
+            type: integer
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/SubjectSchema'
+
 components:
   schemas:
     Cluster:
@@ -706,4 +738,21 @@ components:
         value:
           type: string
           additionalProperties:
-            type: number
+            type: number
+
+    SubjectSchema:
+      type: object
+      properties:
+        subject:
+          type: string
+        version:
+          type: string
+        id:
+          type: integer
+        schema:
+          type: string
+      required:
+        - subject
+        - version
+        - id
+        - schema