From a617a09bc59aff9ebe3e916085883dbacf29cc2e Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Thu, 4 Mar 2021 15:02:09 +0300 Subject: [PATCH] Include schema type to schema objects. By default it's AVRO --- .../schemaregistry/InternalNewSchema.java | 10 ++- .../service/SchemaRegistryService.java | 64 ++++++++++--------- .../main/resources/swagger/kafka-ui-api.yaml | 13 ++++ 3 files changed, 56 insertions(+), 31 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java index 2becd8a011..00a41217cc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java @@ -1,10 +1,16 @@ package com.provectus.kafka.ui.cluster.model.schemaregistry; -import lombok.AllArgsConstructor; +import com.provectus.kafka.ui.model.NewSchemaSubject; +import com.provectus.kafka.ui.model.SchemaType; import lombok.Data; @Data -@AllArgsConstructor public class InternalNewSchema { private String schema; + private SchemaType schemaType; + + public InternalNewSchema(NewSchemaSubject schemaSubject) { + this.schema = schemaSubject.getSchema(); + this.schemaType = schemaSubject.getSchemaType(); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java index 4f7b7cae49..c3acfa2bf9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -10,10 +10,7 @@ import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibility import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityLevel; import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalNewSchema; import com.provectus.kafka.ui.cluster.model.schemaregistry.SubjectIdResponse; -import com.provectus.kafka.ui.model.CompatibilityCheckResponse; -import com.provectus.kafka.ui.model.CompatibilityLevel; -import com.provectus.kafka.ui.model.NewSchemaSubject; -import com.provectus.kafka.ui.model.SchemaSubject; +import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.jetbrains.annotations.NotNull; @@ -37,6 +34,10 @@ import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY; @Log4j2 @RequiredArgsConstructor public class SchemaRegistryService { + public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s"; + public static final String NO_SUCH_SCHEMA = "No such schema %s"; + public static final String NO_SUCH_CLUSTER = "No such cluster"; + private static final String URL_SUBJECTS = "/subjects"; private static final String URL_SUBJECT = "/subjects/{schemaName}"; private static final String URL_SUBJECT_VERSIONS = "/subjects/{schemaName}/versions"; @@ -62,7 +63,7 @@ public class SchemaRegistryService { .bodyToMono(String[].class) .doOnError(log::error) ) - .orElse(Mono.error(new NotFoundException("No such cluster"))); + .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } public Flux getAllVersionsBySubject(String clusterName, String subject) { @@ -76,9 +77,9 @@ public class SchemaRegistryService { .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName) .retrieve() .onStatus(NOT_FOUND::equals, - throwIfNotFoundStatus(formatted("No such schema %s")) + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA)) ).bodyToFlux(Integer.class) - ).orElse(Flux.error(new NotFoundException("No such cluster"))); + ).orElse(Flux.error(new NotFoundException(NO_SUCH_CLUSTER))); } public Mono getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) { @@ -95,8 +96,9 @@ public class SchemaRegistryService { .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version) .retrieve() .onStatus(NOT_FOUND::equals, - throwIfNotFoundStatus(formatted("No such schema %s with version %s", schemaName, version)) + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) ).bodyToMono(SchemaSubject.class) + .map(this::withSchemaType) .zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName)) .map(tuple -> { SchemaSubject schema = tuple.getT1(); @@ -105,7 +107,21 @@ public class SchemaRegistryService { return schema; }) ) - .orElse(Mono.error(new NotFoundException("No such cluster"))); + .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + } + + /** + * If {@link SchemaSubject#getSchemaType()} is null, then AVRO, otherwise, adds the schema type as is. + */ + @NotNull + private SchemaSubject withSchemaType(SchemaSubject s) { + SchemaType schemaType = Objects.nonNull(s.getSchemaType()) ? s.getSchemaType() : SchemaType.AVRO; + return new SchemaSubject() + .schema(s.getSchema()) + .subject(s.getSubject()) + .version(s.getVersion()) + .id(s.getId()) + .schemaType(schemaType); } public Mono> deleteSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) { @@ -122,9 +138,9 @@ public class SchemaRegistryService { .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version) .retrieve() .onStatus(NOT_FOUND::equals, - throwIfNotFoundStatus(formatted("No such schema %s with version %s", schemaName, version)) + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) ).toBodilessEntity() - ).orElse(Mono.error(new NotFoundException("No such cluster"))); + ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } public Mono> deleteSchemaSubjectEntirely(String clusterName, String schemaName) { @@ -132,10 +148,10 @@ public class SchemaRegistryService { .map(cluster -> webClient.delete() .uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName) .retrieve() - .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName)) + .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)) ) .toBodilessEntity()) - .orElse(Mono.error(new NotFoundException("No such cluster"))); + .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } /** @@ -145,7 +161,7 @@ public class SchemaRegistryService { public Mono registerNewSchema(String clusterName, Mono newSchemaSubject) { return newSchemaSubject .flatMap(schema -> { - Mono newSchema = Mono.just(new InternalNewSchema(schema.getSchema())); + Mono newSchema = Mono.just(new InternalNewSchema(schema)); String subject = schema.getSubject(); return clustersStorage.getClusterByName(clusterName) .map(KafkaCluster::getSchemaRegistry) @@ -153,7 +169,7 @@ public class SchemaRegistryService { .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl)) .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject)) ) - .orElse(Mono.error(new NotFoundException("No such cluster"))); + .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); }); } @@ -203,9 +219,9 @@ public class SchemaRegistryService { .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class)) .retrieve() .onStatus(NOT_FOUND::equals, - throwIfNotFoundStatus(formatted("No such schema %s", schemaName))) + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) .bodyToMono(Void.class); - }).orElse(Mono.error(new NotFoundException("No such cluster"))); + }).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } public Mono updateSchemaCompatibility(String clusterName, Mono compatibilityLevel) { @@ -241,24 +257,14 @@ public class SchemaRegistryService { .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) .retrieve() - .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName))) + .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) .bodyToMono(InternalCompatibilityCheck.class) .map(mapper::toCompatibilityCheckResponse) .log() - ).orElse(Mono.error(new NotFoundException("No such cluster"))); + ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } public String formatted(String str, Object... args) { return new Formatter().format(str, args).toString(); } - - public static String extractRecordType(String schema) { - if (schema.contains("record")) { - return "AVRO"; - } else if (schema.contains("proto")) { - return "PROTO"; - } else if (schema.contains("json")) { - return "JSON"; - } else return schema; - } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index a5e98b3152..0999c16869 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1338,12 +1338,15 @@ components: type: string compatibilityLevel: type: string + schemaType: + $ref: '#/components/schemas/SchemaType' required: - id - subject - version - schema - compatibilityLevel + - schemaType NewSchemaSubject: type: object @@ -1352,9 +1355,12 @@ components: type: string schema: type: string + schemaType: + $ref: '#/components/schemas/SchemaType' required: - subject - schema + - schemaType CompatibilityLevel: type: object @@ -1372,6 +1378,13 @@ components: required: - compatibility + SchemaType: + type: string + enum: + - AVRO + - JSON + - PROTOBUF + CompatibilityCheckResponse: type: object properties: