浏览代码

Include schema type to schema objects. By default it's AVRO

Ildar Almakaev 4 年之前
父节点
当前提交
a617a09bc5

+ 8 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java

@@ -1,10 +1,16 @@
 package com.provectus.kafka.ui.cluster.model.schemaregistry;
 
-import lombok.AllArgsConstructor;
+import com.provectus.kafka.ui.model.NewSchemaSubject;
+import com.provectus.kafka.ui.model.SchemaType;
 import lombok.Data;
 
 @Data
-@AllArgsConstructor
 public class InternalNewSchema {
     private String schema;
+    private SchemaType schemaType;
+
+    public InternalNewSchema(NewSchemaSubject schemaSubject) {
+        this.schema = schemaSubject.getSchema();
+        this.schemaType = schemaSubject.getSchemaType();
+    }
 }

+ 35 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

@@ -10,10 +10,7 @@ import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibility
 import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityLevel;
 import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalNewSchema;
 import com.provectus.kafka.ui.cluster.model.schemaregistry.SubjectIdResponse;
-import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
-import com.provectus.kafka.ui.model.CompatibilityLevel;
-import com.provectus.kafka.ui.model.NewSchemaSubject;
-import com.provectus.kafka.ui.model.SchemaSubject;
+import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.jetbrains.annotations.NotNull;
@@ -37,6 +34,10 @@ import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
 @Log4j2
 @RequiredArgsConstructor
 public class SchemaRegistryService {
+    public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s";
+    public static final String NO_SUCH_SCHEMA = "No such schema %s";
+    public static final String NO_SUCH_CLUSTER = "No such cluster";
+
     private static final String URL_SUBJECTS = "/subjects";
     private static final String URL_SUBJECT = "/subjects/{schemaName}";
     private static final String URL_SUBJECT_VERSIONS = "/subjects/{schemaName}/versions";
@@ -62,7 +63,7 @@ public class SchemaRegistryService {
                         .bodyToMono(String[].class)
                         .doOnError(log::error)
                 )
-                .orElse(Mono.error(new NotFoundException("No such cluster")));
+                .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
     }
 
     public Flux<SchemaSubject> getAllVersionsBySubject(String clusterName, String subject) {
@@ -76,9 +77,9 @@ public class SchemaRegistryService {
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
                         .retrieve()
                         .onStatus(NOT_FOUND::equals,
-                                throwIfNotFoundStatus(formatted("No such schema %s"))
+                                throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA))
                         ).bodyToFlux(Integer.class)
-                ).orElse(Flux.error(new NotFoundException("No such cluster")));
+                ).orElse(Flux.error(new NotFoundException(NO_SUCH_CLUSTER)));
     }
 
     public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
@@ -95,8 +96,9 @@ public class SchemaRegistryService {
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
                         .retrieve()
                         .onStatus(NOT_FOUND::equals,
-                                throwIfNotFoundStatus(formatted("No such schema %s with version %s", schemaName, version))
+                                throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
                         ).bodyToMono(SchemaSubject.class)
+                        .map(this::withSchemaType)
                         .zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
                         .map(tuple -> {
                             SchemaSubject schema = tuple.getT1();
@@ -105,7 +107,21 @@ public class SchemaRegistryService {
                             return schema;
                         })
                 )
-                .orElse(Mono.error(new NotFoundException("No such cluster")));
+                .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
+    }
+
+    /**
+     * If {@link SchemaSubject#getSchemaType()} is null, then AVRO, otherwise, adds the schema type as is.
+     */
+    @NotNull
+    private SchemaSubject withSchemaType(SchemaSubject s) {
+        SchemaType schemaType = Objects.nonNull(s.getSchemaType()) ? s.getSchemaType() : SchemaType.AVRO;
+        return new SchemaSubject()
+                .schema(s.getSchema())
+                .subject(s.getSubject())
+                .version(s.getVersion())
+                .id(s.getId())
+                .schemaType(schemaType);
     }
 
     public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
@@ -122,9 +138,9 @@ public class SchemaRegistryService {
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
                         .retrieve()
                         .onStatus(NOT_FOUND::equals,
-                                throwIfNotFoundStatus(formatted("No such schema %s with version %s", schemaName, version))
+                                throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
                         ).toBodilessEntity()
-                ).orElse(Mono.error(new NotFoundException("No such cluster")));
+                ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
     }
 
     public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(String clusterName, String schemaName) {
@@ -132,10 +148,10 @@ public class SchemaRegistryService {
                 .map(cluster -> webClient.delete()
                         .uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
                         .retrieve()
-                        .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName))
+                        .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))
                         )
                         .toBodilessEntity())
-                .orElse(Mono.error(new NotFoundException("No such cluster")));
+                .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
     }
 
     /**
@@ -145,7 +161,7 @@ public class SchemaRegistryService {
     public Mono<SchemaSubject> registerNewSchema(String clusterName, Mono<NewSchemaSubject> newSchemaSubject) {
         return newSchemaSubject
                 .flatMap(schema -> {
-                    Mono<InternalNewSchema> newSchema = Mono.just(new InternalNewSchema(schema.getSchema()));
+                    Mono<InternalNewSchema> newSchema = Mono.just(new InternalNewSchema(schema));
                     String subject = schema.getSubject();
                     return clustersStorage.getClusterByName(clusterName)
                             .map(KafkaCluster::getSchemaRegistry)
@@ -153,7 +169,7 @@ public class SchemaRegistryService {
                                     .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl))
                                     .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject))
                             )
-                            .orElse(Mono.error(new NotFoundException("No such cluster")));
+                            .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
                 });
     }
 
@@ -203,9 +219,9 @@ public class SchemaRegistryService {
                             .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
                             .retrieve()
                             .onStatus(NOT_FOUND::equals,
-                                    throwIfNotFoundStatus(formatted("No such schema %s", schemaName)))
+                                    throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
                             .bodyToMono(Void.class);
-                }).orElse(Mono.error(new NotFoundException("No such cluster")));
+                }).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
     }
 
     public Mono<Void> updateSchemaCompatibility(String clusterName, Mono<CompatibilityLevel> compatibilityLevel) {
@@ -241,24 +257,14 @@ public class SchemaRegistryService {
                         .contentType(MediaType.APPLICATION_JSON)
                         .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
                         .retrieve()
-                        .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName)))
+                        .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
                         .bodyToMono(InternalCompatibilityCheck.class)
                         .map(mapper::toCompatibilityCheckResponse)
                         .log()
-                ).orElse(Mono.error(new NotFoundException("No such cluster")));
+                ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
     }
 
     public String formatted(String str, Object... args) {
         return new Formatter().format(str, args).toString();
     }
-
-    public static String extractRecordType(String schema) {
-        if (schema.contains("record")) {
-            return "AVRO";
-        } else if (schema.contains("proto")) {
-            return "PROTO";
-        } else if (schema.contains("json")) {
-            return "JSON";
-        } else return schema;
-    }
 }

+ 13 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1338,12 +1338,15 @@ components:
           type: string
         compatibilityLevel:
           type: string
+        schemaType:
+          $ref: '#/components/schemas/SchemaType'
       required:
         - id
         - subject
         - version
         - schema
         - compatibilityLevel
+        - schemaType
 
     NewSchemaSubject:
       type: object
@@ -1352,9 +1355,12 @@ components:
           type: string
         schema:
           type: string
+        schemaType:
+          $ref: '#/components/schemas/SchemaType'
       required:
         - subject
         - schema
+        - schemaType
 
     CompatibilityLevel:
       type: object
@@ -1372,6 +1378,13 @@ components:
       required:
         - compatibility
 
+    SchemaType:
+      type: string
+      enum:
+        - AVRO
+        - JSON
+        - PROTOBUF
+
     CompatibilityCheckResponse:
       type: object
       properties: