From e02dd844912ff831666649afbe665661c1ac8420 Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Wed, 10 Mar 2021 15:14:15 +0300 Subject: [PATCH] #200 Update schema subject object (#229) * Return id, version, schema, and subject after creating a new schema * Throw 422 error code if incoming new schema is unprocessable entity due to invalid fields * Return 409/Conflict error code if schema is duplicate. Change endpoint of createNewSchema method * Fix submitting new subject or new version if the subject already exists * Include schema type to schema objects. By default it's AVRO * [ISSUE-200] Update FE to support new version of api * Add one more schema-registry with version 5.5.0 to docker-compose files and app properties * Upgrade Confluent service versions in tests up to 5.5.0 * Set schemaType is required and ignore when submitting to Schema Registry if it's NULL Co-authored-by: Oleg Shuralev --- docker/kafka-clusters-only.yaml | 20 ++- docker/kafka-ui.yaml | 18 ++ .../exception/DuplicateEntityException.java | 15 ++ .../UnprocessableEntityException.java | 15 ++ .../ui/cluster/mapper/ClusterMapper.java | 3 +- .../InternalCompatibilityCheck.java | 2 +- .../InternalCompatibilityLevel.java | 2 +- .../schemaregistry/InternalNewSchema.java | 17 ++ .../schemaregistry/SubjectIdResponse.java | 8 + .../service/SchemaRegistryService.java | 164 +++++++++++------- .../kafka/ui/rest/MetricsRestController.java | 10 +- .../src/main/resources/application-local.yml | 2 +- .../src/main/resources/application-sdp.yml | 2 +- .../provectus/kafka/ui/AbstractBaseTest.java | 2 +- .../kafka/ui/KafkaConnectServiceTests.java | 2 +- .../kafka/ui/SchemaRegistryServiceTests.java | 82 ++++++++- .../main/resources/swagger/kafka-ui-api.yaml | 85 +++++---- .../__snapshots__/Details.spec.tsx.snap | 6 + .../Schemas/Details/__test__/fixtures.ts | 5 +- .../__snapshots__/ListItem.spec.tsx.snap | 1 + .../Schemas/List/__test__/fixtures.ts | 5 +- .../src/components/Schemas/New/New.tsx | 11 +- .../src/redux/actions/__test__/fixtures.ts | 4 +- .../src/redux/actions/__test__/thunks.spec.ts | 28 ++- .../src/redux/actions/thunks.ts | 3 - .../__snapshots__/reducer.spec.ts.snap | 6 + .../reducers/schemas/__test__/fixtures.ts | 12 +- 27 files changed, 385 insertions(+), 145 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/DuplicateEntityException.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/UnprocessableEntityException.java rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/{ => schemaregistry}/InternalCompatibilityCheck.java (76%) rename kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/{ => schemaregistry}/InternalCompatibilityLevel.java (64%) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/SubjectIdResponse.java diff --git a/docker/kafka-clusters-only.yaml b/docker/kafka-clusters-only.yaml index 16cb50b659..68116d9499 100644 --- a/docker/kafka-clusters-only.yaml +++ b/docker/kafka-clusters-only.yaml @@ -86,7 +86,25 @@ services: SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas ports: - - 8081:8081 + - 8085:8085 + + schemaregistry1: + image: confluentinc/cp-schema-registry:5.5.0 + ports: + - 18085:8085 + depends_on: + - zookeeper1 + - kafka1 + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092 + SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper1:2181 + SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT + SCHEMA_REGISTRY_HOST_NAME: schemaregistry1 + SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085 + + SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" + SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO + SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas kafka-connect0: image: confluentinc/cp-kafka-connect:5.2.4 diff --git a/docker/kafka-ui.yaml b/docker/kafka-ui.yaml index 8d8790b97e..c384a8f4b3 100644 --- a/docker/kafka-ui.yaml +++ b/docker/kafka-ui.yaml @@ -96,6 +96,24 @@ services: SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas + schemaregistry1: + image: confluentinc/cp-schema-registry:5.5.0 + ports: + - 18085:8085 + depends_on: + - zookeeper1 + - kafka1 + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092 + SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper1:2181 + SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT + SCHEMA_REGISTRY_HOST_NAME: schemaregistry1 + SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085 + + SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" + SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO + SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas + kafka-connect0: image: confluentinc/cp-kafka-connect:5.2.4 ports: diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/DuplicateEntityException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/DuplicateEntityException.java new file mode 100644 index 0000000000..04c6be1590 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/DuplicateEntityException.java @@ -0,0 +1,15 @@ +package com.provectus.kafka.ui.cluster.exception; + +import org.springframework.http.HttpStatus; + +public class DuplicateEntityException extends CustomBaseException{ + + public DuplicateEntityException(String message) { + super(message); + } + + @Override + public HttpStatus getResponseStatusCode() { + return HttpStatus.CONFLICT; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/UnprocessableEntityException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/UnprocessableEntityException.java new file mode 100644 index 0000000000..bafc8c8180 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/UnprocessableEntityException.java @@ -0,0 +1,15 @@ +package com.provectus.kafka.ui.cluster.exception; + +import org.springframework.http.HttpStatus; + +public class UnprocessableEntityException extends CustomBaseException{ + + public UnprocessableEntityException(String message) { + super(message); + } + + @Override + public HttpStatus getResponseStatusCode() { + return HttpStatus.UNPROCESSABLE_ENTITY; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java index a9e48bd627..befddfc328 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java @@ -2,7 +2,8 @@ package com.provectus.kafka.ui.cluster.mapper; import com.provectus.kafka.ui.cluster.config.ClustersProperties; import com.provectus.kafka.ui.cluster.model.*; -import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck; +import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityCheck; +import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityLevel; import com.provectus.kafka.ui.model.*; import java.util.Properties; import org.mapstruct.Mapper; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityCheck.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalCompatibilityCheck.java similarity index 76% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityCheck.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalCompatibilityCheck.java index 001823f275..da072c46f6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityCheck.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalCompatibilityCheck.java @@ -1,4 +1,4 @@ -package com.provectus.kafka.ui.cluster.model; +package com.provectus.kafka.ui.cluster.model.schemaregistry; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityLevel.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalCompatibilityLevel.java similarity index 64% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityLevel.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalCompatibilityLevel.java index ed2bfc70d7..d66fc80c8d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalCompatibilityLevel.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalCompatibilityLevel.java @@ -1,4 +1,4 @@ -package com.provectus.kafka.ui.cluster.model; +package com.provectus.kafka.ui.cluster.model.schemaregistry; import lombok.Data; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java new file mode 100644 index 0000000000..b121943fe3 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/InternalNewSchema.java @@ -0,0 +1,17 @@ +package com.provectus.kafka.ui.cluster.model.schemaregistry; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.provectus.kafka.ui.model.SchemaType; +import lombok.Data; + +@Data +public class InternalNewSchema { + private String schema; + @JsonInclude(JsonInclude.Include.NON_NULL) + private SchemaType schemaType; + + public InternalNewSchema(String schema, SchemaType schemaType) { + this.schema = schema; + this.schemaType = schemaType; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/SubjectIdResponse.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/SubjectIdResponse.java new file mode 100644 index 0000000000..3a6eefee27 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/schemaregistry/SubjectIdResponse.java @@ -0,0 +1,8 @@ +package com.provectus.kafka.ui.cluster.model.schemaregistry; + +import lombok.Data; + +@Data +public class SubjectIdResponse { + private Integer id; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java index 232e25d026..b364cbc829 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -1,36 +1,43 @@ package com.provectus.kafka.ui.cluster.service; +import com.provectus.kafka.ui.cluster.exception.DuplicateEntityException; import com.provectus.kafka.ui.cluster.exception.NotFoundException; +import com.provectus.kafka.ui.cluster.exception.UnprocessableEntityException; import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; -import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck; -import com.provectus.kafka.ui.cluster.model.InternalCompatibilityLevel; -import com.provectus.kafka.ui.model.CompatibilityCheckResponse; -import com.provectus.kafka.ui.model.CompatibilityLevel; -import com.provectus.kafka.ui.model.NewSchemaSubject; -import com.provectus.kafka.ui.model.SchemaSubject; -import java.util.Formatter; +import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityCheck; +import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityLevel; +import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalNewSchema; +import com.provectus.kafka.ui.cluster.model.schemaregistry.SubjectIdResponse; +import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; -import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpStatus; +import org.jetbrains.annotations.NotNull; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.Arrays; -import java.util.List; +import java.util.Formatter; import java.util.Objects; +import java.util.function.Function; + +import static org.springframework.http.HttpStatus.NOT_FOUND; +import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY; @Service @Log4j2 @RequiredArgsConstructor public class SchemaRegistryService { + public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s"; + public static final String NO_SUCH_SCHEMA = "No such schema %s"; + public static final String NO_SUCH_CLUSTER = "No such cluster"; + private static final String URL_SUBJECTS = "/subjects"; private static final String URL_SUBJECT = "/subjects/{schemaName}"; private static final String URL_SUBJECT_VERSIONS = "/subjects/{schemaName}/versions"; @@ -45,7 +52,7 @@ public class SchemaRegistryService { var allSubjectNames = getAllSubjectNames(clusterName); return allSubjectNames .flatMapMany(Flux::fromArray) - .flatMap(subject -> getLatestSchemaSubject(clusterName, subject)); + .flatMap(subject -> getLatestSchemaVersionBySubject(clusterName, subject)); } public Mono getAllSubjectNames(String clusterName) { @@ -56,7 +63,7 @@ public class SchemaRegistryService { .bodyToMono(String[].class) .doOnError(log::error) ) - .orElse(Mono.error(new NotFoundException("No such cluster"))); + .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } public Flux getAllVersionsBySubject(String clusterName, String subject) { @@ -69,19 +76,17 @@ public class SchemaRegistryService { .map(cluster -> webClient.get() .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException(formatted("No such schema %s")) - ) + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA)) ).bodyToFlux(Integer.class) - ).orElse(Flux.error(new NotFoundException("No such cluster"))); + ).orElse(Flux.error(new NotFoundException(NO_SUCH_CLUSTER))); } public Mono getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) { return this.getSchemaSubject(clusterName, schemaName, String.valueOf(version)); } - public Mono getLatestSchemaSubject(String clusterName, String schemaName) { + public Mono getLatestSchemaVersionBySubject(String clusterName, String schemaName) { return this.getSchemaSubject(clusterName, schemaName, LATEST); } @@ -90,13 +95,10 @@ public class SchemaRegistryService { .map(cluster -> webClient.get() .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException( - formatted("No such schema %s with version %s", schemaName, version) - ) - ) + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) ).bodyToMono(SchemaSubject.class) + .map(this::withSchemaType) .zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName)) .map(tuple -> { SchemaSubject schema = tuple.getT1(); @@ -105,7 +107,21 @@ public class SchemaRegistryService { return schema; }) ) - .orElseThrow(); + .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + } + + /** + * If {@link SchemaSubject#getSchemaType()} is null, then AVRO, otherwise, adds the schema type as is. + */ + @NotNull + private SchemaSubject withSchemaType(SchemaSubject s) { + SchemaType schemaType = Objects.nonNull(s.getSchemaType()) ? s.getSchemaType() : SchemaType.AVRO; + return new SchemaSubject() + .schema(s.getSchema()) + .subject(s.getSubject()) + .version(s.getVersion()) + .id(s.getId()) + .schemaType(schemaType); } public Mono> deleteSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) { @@ -121,46 +137,71 @@ public class SchemaRegistryService { .map(cluster -> webClient.delete() .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException( - formatted("No such schema %s with version %s", schemaName, version) - ) - ) + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) ).toBodilessEntity() - ).orElse(Mono.error(new NotFoundException("No such cluster"))); + ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } - public Mono> deleteSchemaSubject(String clusterName, String schemaName) { + public Mono> deleteSchemaSubjectEntirely(String clusterName, String schemaName) { return clustersStorage.getClusterByName(clusterName) .map(cluster -> webClient.delete() .uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException( - formatted("No such schema %s", schemaName) - ) - ) + .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)) ) .toBodilessEntity()) - .orElse(Mono.error(new NotFoundException("No such cluster"))); + .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } - public Mono> createNewSubject(String clusterName, String schemaName, Mono newSchemaSubject) { - return clustersStorage.getClusterByName(clusterName) - .map(cluster -> webClient.post() - .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) - .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error( - new NotFoundException(formatted("No such schema %s", schemaName))) - ) - .toEntity(SchemaSubject.class) - .log()) - .orElse(Mono.error(new NotFoundException("No such cluster"))); + /** + * Checks whether the provided schema duplicates the previous or not, creates a new schema + * and then returns the whole content by requesting its latest version. + */ + public Mono registerNewSchema(String clusterName, Mono newSchemaSubject) { + return newSchemaSubject + .flatMap(schema -> { + SchemaType schemaType = SchemaType.AVRO == schema.getSchemaType() ? null : schema.getSchemaType(); + Mono newSchema = Mono.just(new InternalNewSchema(schema.getSchema(), schemaType)); + String subject = schema.getSubject(); + return clustersStorage.getClusterByName(clusterName) + .map(KafkaCluster::getSchemaRegistry) + .map(schemaRegistryUrl -> checkSchemaOnDuplicate(subject, newSchema, schemaRegistryUrl) + .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl)) + .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject)) + ) + .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); + }); + } + + @NotNull + private Mono submitNewSchema(String subject, Mono newSchemaSubject, String schemaRegistryUrl) { + return webClient.post() + .uri(schemaRegistryUrl + URL_SUBJECT_VERSIONS, subject) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) + .retrieve() + .onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params"))) + .bodyToMono(SubjectIdResponse.class); + } + + @NotNull + private Mono checkSchemaOnDuplicate(String subject, Mono newSchemaSubject, String schemaRegistryUrl) { + return webClient.post() + .uri(schemaRegistryUrl + URL_SUBJECT, subject) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) + .retrieve() + .onStatus(NOT_FOUND::equals, res -> Mono.empty()) + .onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params"))) + .bodyToMono(SchemaSubject.class) + .filter(s -> Objects.isNull(s.getId())) + .switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists"))); + } + + @NotNull + private Function> throwIfNotFoundStatus(String formatted) { + return resp -> Mono.error(new NotFoundException(formatted)); } /** @@ -178,10 +219,10 @@ public class SchemaRegistryService { .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class)) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName)))) + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) .bodyToMono(Void.class); - }).orElse(Mono.error(new NotFoundException("No such cluster"))); + }).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } public Mono updateSchemaCompatibility(String clusterName, Mono compatibilityLevel) { @@ -217,12 +258,11 @@ public class SchemaRegistryService { .contentType(MediaType.APPLICATION_JSON) .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class)) .retrieve() - .onStatus(HttpStatus.NOT_FOUND::equals, - resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName)))) + .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) .bodyToMono(InternalCompatibilityCheck.class) .map(mapper::toCompatibilityCheckResponse) .log() - ).orElse(Mono.error(new NotFoundException("No such cluster"))); + ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER))); } public String formatted(String str, Object... args) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index a2cacbba8e..9b86451b8f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -106,7 +106,7 @@ public class MetricsRestController implements ApiClustersApi { @Override public Mono> getLatestSchema(String clusterName, String subject, ServerWebExchange exchange) { - return schemaRegistryService.getLatestSchemaSubject(clusterName, subject).map(ResponseEntity::ok); + return schemaRegistryService.getLatestSchemaVersionBySubject(clusterName, subject).map(ResponseEntity::ok); } @Override @@ -138,14 +138,16 @@ public class MetricsRestController implements ApiClustersApi { @Override public Mono> deleteSchema(String clusterName, String subjectName, ServerWebExchange exchange) { - return schemaRegistryService.deleteSchemaSubject(clusterName, subjectName); + return schemaRegistryService.deleteSchemaSubjectEntirely(clusterName, subjectName); } @Override - public Mono> createNewSchema(String clusterName, String subject, + public Mono> createNewSchema(String clusterName, @Valid Mono newSchemaSubject, ServerWebExchange exchange) { - return schemaRegistryService.createNewSubject(clusterName, subject, newSchemaSubject); + return schemaRegistryService + .registerNewSchema(clusterName, newSchemaSubject) + .map(ResponseEntity::ok); } @Override diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index 951a1799e0..5822849109 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -13,7 +13,7 @@ kafka: name: secondLocal bootstrapServers: localhost:9093 zookeeper: localhost:2182 - schemaRegistry: http://localhost:8081 + schemaRegistry: http://localhost:18085 kafkaConnect: - name: first address: http://localhost:8083 diff --git a/kafka-ui-api/src/main/resources/application-sdp.yml b/kafka-ui-api/src/main/resources/application-sdp.yml index 9f6a293ec2..46a0377799 100644 --- a/kafka-ui-api/src/main/resources/application-sdp.yml +++ b/kafka-ui-api/src/main/resources/application-sdp.yml @@ -9,7 +9,7 @@ kafka: name: secondLocal zookeeper: zookeeper1:2181 bootstrapServers: kafka1:29092 - schemaRegistry: http://schemaregistry0:8085 + schemaRegistry: http://schemaregistry1:8085 admin-client-timeout: 5000 zookeeper: connection-timeout: 1000 diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java index 4065b4006d..cd00559694 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java @@ -18,7 +18,7 @@ public abstract class AbstractBaseTest { public static String LOCAL = "local"; public static String SECOND_LOCAL = "secondLocal"; - private static final String CONFLUENT_PLATFORM_VERSION = "5.2.1"; + private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0"; public static final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)) .withNetwork(Network.SHARED); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java index f7e6407b91..603390dbf5 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java @@ -249,7 +249,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest { .exchange() .expectStatus().isOk() .expectBodyList(ConnectorPlugin.class) - .value(plugins -> assertEquals(13, plugins.size())); + .value(plugins -> assertEquals(14, plugins.size())); } @Test diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java index c5af013e79..07d05cade6 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java @@ -1,7 +1,9 @@ package com.provectus.kafka.ui; import com.provectus.kafka.ui.model.CompatibilityLevel; +import com.provectus.kafka.ui.model.NewSchemaSubject; import com.provectus.kafka.ui.model.SchemaSubject; +import com.provectus.kafka.ui.model.SchemaType; import lombok.extern.log4j.Log4j2; import lombok.val; import org.junit.jupiter.api.Assertions; @@ -9,11 +11,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.web.reactive.server.EntityExchangeResult; import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.web.reactive.function.BodyInserters; +import reactor.core.publisher.Mono; import java.util.List; import java.util.UUID; @@ -50,6 +54,69 @@ class SchemaRegistryServiceTests extends AbstractBaseTest { .expectStatus().isNotFound(); } + /** + * It should create a new schema w/o submitting a schemaType field to Schema Registry + */ + @Test + void shouldBeBadRequestIfNoSchemaType() { + String schema = "{\"subject\":\"%s\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"; + + webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(schema.formatted(subject))) + .exchange() + .expectStatus().isBadRequest(); + } + + @Test + void shouldReturn409WhenSchemaDuplicatesThePreviousVersion() { + String schema = "{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}"; + + webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(schema.formatted(subject))) + .exchange() + .expectStatus().isEqualTo(HttpStatus.OK); + + webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(schema.formatted(subject))) + .exchange() + .expectStatus().isEqualTo(HttpStatus.CONFLICT); + } + + @Test + void shouldCreateNewProtobufSchema() { + String schema = "syntax = \"proto3\";\n\nmessage MyRecord {\n int32 id = 1;\n string name = 2;\n}\n"; + NewSchemaSubject requestBody = new NewSchemaSubject() + .schemaType(SchemaType.PROTOBUF) + .subject(subject) + .schema(schema); + SchemaSubject actual = webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubject.class)) + .exchange() + .expectStatus() + .isOk() + .expectBody(SchemaSubject.class) + .returnResult() + .getResponseBody(); + + Assertions.assertNotNull(actual); + Assertions.assertEquals(CompatibilityLevel.CompatibilityEnum.BACKWARD.name(), actual.getCompatibilityLevel()); + Assertions.assertEquals("1", actual.getVersion()); + Assertions.assertEquals(SchemaType.PROTOBUF, actual.getSchemaType()); + Assertions.assertEquals(schema, actual.getSchema()); + } + @Test public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() { webTestClient @@ -132,9 +199,9 @@ class SchemaRegistryServiceTests extends AbstractBaseTest { private void createNewSubjectAndAssert(String subject) { webTestClient .post() - .uri("/api/clusters/{clusterName}/schemas/{subject}", LOCAL, subject) + .uri("/api/clusters/{clusterName}/schemas", LOCAL) .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromValue("{\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}")) + .body(BodyInserters.fromValue("{\"subject\":\"%s\",\"schemaType\":\"AVRO\",\"schema\":\"{\\\"type\\\": \\\"string\\\"}\"}".formatted(subject))) .exchange() .expectStatus().isOk() .expectBody(SchemaSubject.class) @@ -151,16 +218,17 @@ class SchemaRegistryServiceTests extends AbstractBaseTest { Assertions.assertEquals("\"string\"", actualSchema.getSchema()); Assertions.assertNotNull(actualSchema.getCompatibilityLevel()); + Assertions.assertEquals(SchemaType.AVRO, actualSchema.getSchemaType()); Assertions.assertEquals(expectedCompatibility.name(), actualSchema.getCompatibilityLevel()); } private void assertResponseBodyWhenCreateNewSchema(EntityExchangeResult exchangeResult) { SchemaSubject responseBody = exchangeResult.getResponseBody(); Assertions.assertNotNull(responseBody); - Assertions.assertEquals(1, responseBody.getId(), "The schema ID should be non-null in the response"); - String message = "It should be null"; - Assertions.assertNull(responseBody.getSchema(), message); - Assertions.assertNull(responseBody.getSubject(), message); - Assertions.assertNull(responseBody.getVersion(), message); + Assertions.assertEquals("1", responseBody.getVersion()); + Assertions.assertNotNull(responseBody.getSchema()); + Assertions.assertNotNull(responseBody.getSubject()); + Assertions.assertNotNull(responseBody.getCompatibilityLevel()); + Assertions.assertEquals(SchemaType.AVRO, responseBody.getSchemaType()); } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 635f5b1693..f50695abfc 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -358,6 +358,35 @@ paths: $ref: '#/components/schemas/ConsumerGroup' /api/clusters/{clusterName}/schemas: + post: + tags: + - /api/clusters + summary: create a new subject schema + operationId: createNewSchema + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/NewSchemaSubject' + responses: + 200: + description: Ok + content: + application/json: + schema: + $ref: '#/components/schemas/SchemaSubject' + 400: + description: Bad request + 409: + description: Duplicate schema + 422: + description: Invalid parameters get: tags: - /api/clusters @@ -380,36 +409,6 @@ paths: $ref: '#/components/schemas/SchemaSubject' /api/clusters/{clusterName}/schemas/{subject}: - post: - tags: - - /api/clusters - summary: create a new subject schema - operationId: createNewSchema - parameters: - - name: clusterName - in: path - required: true - schema: - type: string - - name: subject - in: path - required: true - schema: - type: string - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/NewSchemaSubject' - responses: - 200: - description: Updated - content: - application/json: - schema: - $ref: '#/components/schemas/SchemaSubject' - 400: - description: Bad request delete: tags: - /api/clusters @@ -1360,16 +1359,29 @@ components: type: string compatibilityLevel: type: string + schemaType: + $ref: '#/components/schemas/SchemaType' required: - id + - subject + - version + - schema + - compatibilityLevel + - schemaType NewSchemaSubject: type: object properties: + subject: + type: string schema: type: string + schemaType: + $ref: '#/components/schemas/SchemaType' required: + - subject - schema + - schemaType CompatibilityLevel: type: object @@ -1387,13 +1399,12 @@ components: required: - compatibility -# CompatibilityLevelResponse: -# type: object -# properties: -# compatibilityLevel: -# type: string -# required: -# - compatibilityLevel + SchemaType: + type: string + enum: + - AVRO + - JSON + - PROTOBUF CompatibilityCheckResponse: type: object diff --git a/kafka-ui-react-app/src/components/Schemas/Details/__test__/__snapshots__/Details.spec.tsx.snap b/kafka-ui-react-app/src/components/Schemas/Details/__test__/__snapshots__/Details.spec.tsx.snap index 7871bb5588..32a8b9d34a 100644 --- a/kafka-ui-react-app/src/components/Schemas/Details/__test__/__snapshots__/Details.spec.tsx.snap +++ b/kafka-ui-react-app/src/components/Schemas/Details/__test__/__snapshots__/Details.spec.tsx.snap @@ -75,6 +75,7 @@ exports[`Details View Initial state matches snapshot 1`] = ` "compatibilityLevel": "BACKWARD", "id": 1, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "1", } @@ -181,6 +182,7 @@ exports[`Details View when page with schema versions is loading matches snapshot "compatibilityLevel": "BACKWARD", "id": 1, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "1", } @@ -266,6 +268,7 @@ exports[`Details View when page with schema versions loaded when schema has vers "compatibilityLevel": "BACKWARD", "id": 1, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "1", } @@ -299,6 +302,7 @@ exports[`Details View when page with schema versions loaded when schema has vers "compatibilityLevel": "BACKWARD", "id": 1, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "1", } @@ -311,6 +315,7 @@ exports[`Details View when page with schema versions loaded when schema has vers "compatibilityLevel": "BACKWARD", "id": 2, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord2\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "2", } @@ -397,6 +402,7 @@ exports[`Details View when page with schema versions loaded when versions are em "compatibilityLevel": "BACKWARD", "id": 1, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "1", } diff --git a/kafka-ui-react-app/src/components/Schemas/Details/__test__/fixtures.ts b/kafka-ui-react-app/src/components/Schemas/Details/__test__/fixtures.ts index 019532251b..d53190c9f6 100644 --- a/kafka-ui-react-app/src/components/Schemas/Details/__test__/fixtures.ts +++ b/kafka-ui-react-app/src/components/Schemas/Details/__test__/fixtures.ts @@ -1,4 +1,4 @@ -import { SchemaSubject } from 'generated-sources'; +import { SchemaSubject, SchemaType } from 'generated-sources'; export const schema: SchemaSubject = { subject: 'test', @@ -7,6 +7,7 @@ export const schema: SchemaSubject = { schema: '{"type":"record","name":"MyRecord1","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }; export const versions: SchemaSubject[] = [ @@ -17,6 +18,7 @@ export const versions: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord1","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, { subject: 'test', @@ -25,5 +27,6 @@ export const versions: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord2","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, ]; diff --git a/kafka-ui-react-app/src/components/Schemas/List/__test__/__snapshots__/ListItem.spec.tsx.snap b/kafka-ui-react-app/src/components/Schemas/List/__test__/__snapshots__/ListItem.spec.tsx.snap index 1783cdc994..e26acad840 100644 --- a/kafka-ui-react-app/src/components/Schemas/List/__test__/__snapshots__/ListItem.spec.tsx.snap +++ b/kafka-ui-react-app/src/components/Schemas/List/__test__/__snapshots__/ListItem.spec.tsx.snap @@ -32,6 +32,7 @@ exports[`ListItem matches snapshot 1`] = ` "compatibilityLevel": "BACKWARD", "id": 1, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "1", } diff --git a/kafka-ui-react-app/src/components/Schemas/List/__test__/fixtures.ts b/kafka-ui-react-app/src/components/Schemas/List/__test__/fixtures.ts index 7b337a23ec..46a4d5a105 100644 --- a/kafka-ui-react-app/src/components/Schemas/List/__test__/fixtures.ts +++ b/kafka-ui-react-app/src/components/Schemas/List/__test__/fixtures.ts @@ -1,4 +1,4 @@ -import { SchemaSubject } from 'generated-sources'; +import { SchemaSubject, SchemaType } from 'generated-sources'; export const schemas: SchemaSubject[] = [ { @@ -8,6 +8,7 @@ export const schemas: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord1","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, { subject: 'test2', @@ -16,6 +17,7 @@ export const schemas: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord2","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, { subject: 'test3', @@ -24,5 +26,6 @@ export const schemas: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord3","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, ]; diff --git a/kafka-ui-react-app/src/components/Schemas/New/New.tsx b/kafka-ui-react-app/src/components/Schemas/New/New.tsx index cc15b9709e..7a1325b924 100644 --- a/kafka-ui-react-app/src/components/Schemas/New/New.tsx +++ b/kafka-ui-react-app/src/components/Schemas/New/New.tsx @@ -1,17 +1,16 @@ import React from 'react'; -import { ClusterName, SchemaName, NewSchemaSubjectRaw } from 'redux/interfaces'; +import { ClusterName, NewSchemaSubjectRaw } from 'redux/interfaces'; import { useForm } from 'react-hook-form'; import { ErrorMessage } from '@hookform/error-message'; import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb'; import { clusterSchemaPath, clusterSchemasPath } from 'lib/paths'; -import { NewSchemaSubject } from 'generated-sources'; +import { NewSchemaSubject, SchemaType } from 'generated-sources'; import { SCHEMA_NAME_VALIDATION_PATTERN } from 'lib/constants'; import { useHistory, useParams } from 'react-router'; export interface NewProps { createSchema: ( clusterName: ClusterName, - subject: SchemaName, newSchemaSubject: NewSchemaSubject ) => void; } @@ -29,7 +28,11 @@ const New: React.FC = ({ createSchema }) => { const onSubmit = React.useCallback( async ({ subject, schema }: NewSchemaSubjectRaw) => { try { - await createSchema(clusterName, subject, { schema }); + await createSchema(clusterName, { + subject, + schema, + schemaType: SchemaType.AVRO, + }); history.push(clusterSchemaPath(clusterName, subject)); } catch (e) { // Show Error diff --git a/kafka-ui-react-app/src/redux/actions/__test__/fixtures.ts b/kafka-ui-react-app/src/redux/actions/__test__/fixtures.ts index d8e4a59804..663dbaaafc 100644 --- a/kafka-ui-react-app/src/redux/actions/__test__/fixtures.ts +++ b/kafka-ui-react-app/src/redux/actions/__test__/fixtures.ts @@ -1,4 +1,4 @@ -import { ClusterStats, NewSchemaSubject } from 'generated-sources'; +import { ClusterStats, NewSchemaSubject, SchemaType } from 'generated-sources'; export const clusterStats: ClusterStats = { brokerCount: 1, @@ -13,6 +13,8 @@ export const clusterStats: ClusterStats = { }; export const schemaPayload: NewSchemaSubject = { + subject: 'NewSchema', schema: '{"type":"record","name":"MyRecord1","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', + schemaType: SchemaType.JSON, }; diff --git a/kafka-ui-react-app/src/redux/actions/__test__/thunks.spec.ts b/kafka-ui-react-app/src/redux/actions/__test__/thunks.spec.ts index 0cb4267511..611b8ad174 100644 --- a/kafka-ui-react-app/src/redux/actions/__test__/thunks.spec.ts +++ b/kafka-ui-react-app/src/redux/actions/__test__/thunks.spec.ts @@ -108,11 +108,11 @@ describe('Thunks', () => { describe('createSchema', () => { it('creates POST_SCHEMA__SUCCESS when posting new schema', async () => { - fetchMock.postOnce(`/api/clusters/${clusterName}/schemas/${subject}`, { + fetchMock.postOnce(`/api/clusters/${clusterName}/schemas`, { body: schemaFixtures.schemaVersionsPayload[0], }); await store.dispatch( - thunks.createSchema(clusterName, subject, fixtures.schemaPayload) + thunks.createSchema(clusterName, fixtures.schemaPayload) ); expect(store.getActions()).toEqual([ actions.createSchemaAction.request(), @@ -122,19 +122,15 @@ describe('Thunks', () => { ]); }); - // it('creates POST_SCHEMA__FAILURE when posting new schema', async () => { - // fetchMock.postOnce( - // `/api/clusters/${clusterName}/schemas/${subject}`, - // 404 - // ); - // await store.dispatch( - // thunks.createSchema(clusterName, subject, fixtures.schemaPayload) - // ); - // expect(store.getActions()).toEqual([ - // actions.createSchemaAction.request(), - // actions.createSchemaAction.failure(), - // ]); - // expect(store.getActions()).toThrow(); - // }); + it('creates POST_SCHEMA__FAILURE when posting new schema', async () => { + fetchMock.postOnce(`/api/clusters/${clusterName}/schemas`, 404); + await store.dispatch( + thunks.createSchema(clusterName, fixtures.schemaPayload) + ); + expect(store.getActions()).toEqual([ + actions.createSchemaAction.request(), + actions.createSchemaAction.failure(), + ]); + }); }); }); diff --git a/kafka-ui-react-app/src/redux/actions/thunks.ts b/kafka-ui-react-app/src/redux/actions/thunks.ts index f650d011bf..31222ebb09 100644 --- a/kafka-ui-react-app/src/redux/actions/thunks.ts +++ b/kafka-ui-react-app/src/redux/actions/thunks.ts @@ -285,19 +285,16 @@ export const fetchSchemaVersions = ( export const createSchema = ( clusterName: ClusterName, - subject: SchemaName, newSchemaSubject: NewSchemaSubject ): PromiseThunkResult => async (dispatch) => { dispatch(actions.createSchemaAction.request()); try { const schema: SchemaSubject = await apiClient.createNewSchema({ clusterName, - subject, newSchemaSubject, }); dispatch(actions.createSchemaAction.success(schema)); } catch (e) { dispatch(actions.createSchemaAction.failure()); - throw e; } }; diff --git a/kafka-ui-react-app/src/redux/reducers/schemas/__test__/__snapshots__/reducer.spec.ts.snap b/kafka-ui-react-app/src/redux/reducers/schemas/__test__/__snapshots__/reducer.spec.ts.snap index bf18ffa14f..61332659b7 100644 --- a/kafka-ui-react-app/src/redux/reducers/schemas/__test__/__snapshots__/reducer.spec.ts.snap +++ b/kafka-ui-react-app/src/redux/reducers/schemas/__test__/__snapshots__/reducer.spec.ts.snap @@ -12,6 +12,7 @@ Object { "compatibilityLevel": "BACKWARD", "id": 2, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord2\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "2", }, @@ -19,6 +20,7 @@ Object { "compatibilityLevel": "BACKWARD", "id": 4, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord4\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test2", "version": "3", }, @@ -26,6 +28,7 @@ Object { "compatibilityLevel": "BACKWARD", "id": 5, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test3", "version": "1", }, @@ -43,6 +46,7 @@ Object { "compatibilityLevel": "BACKWARD", "id": 1, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "1", }, @@ -50,6 +54,7 @@ Object { "compatibilityLevel": "BACKWARD", "id": 2, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord2\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "2", }, @@ -67,6 +72,7 @@ Object { "compatibilityLevel": "BACKWARD", "id": 1, "schema": "{\\"type\\":\\"record\\",\\"name\\":\\"MyRecord1\\",\\"namespace\\":\\"com.mycompany\\",\\"fields\\":[{\\"name\\":\\"id\\",\\"type\\":\\"long\\"}]}", + "schemaType": "JSON", "subject": "test", "version": "1", }, diff --git a/kafka-ui-react-app/src/redux/reducers/schemas/__test__/fixtures.ts b/kafka-ui-react-app/src/redux/reducers/schemas/__test__/fixtures.ts index 5a75ed212b..a53f1dced7 100644 --- a/kafka-ui-react-app/src/redux/reducers/schemas/__test__/fixtures.ts +++ b/kafka-ui-react-app/src/redux/reducers/schemas/__test__/fixtures.ts @@ -1,5 +1,5 @@ import { SchemasState } from 'redux/interfaces'; -import { SchemaSubject } from 'generated-sources'; +import { SchemaSubject, SchemaType } from 'generated-sources'; export const initialState: SchemasState = { byName: {}, @@ -15,6 +15,7 @@ export const clusterSchemasPayload: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord4","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, { subject: 'test3', @@ -23,6 +24,7 @@ export const clusterSchemasPayload: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, { subject: 'test', @@ -31,6 +33,7 @@ export const clusterSchemasPayload: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord2","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, ]; @@ -42,6 +45,7 @@ export const schemaVersionsPayload: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord1","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, { subject: 'test', @@ -50,6 +54,7 @@ export const schemaVersionsPayload: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord2","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, ]; @@ -60,6 +65,7 @@ export const newSchemaPayload: SchemaSubject = { schema: '{"type":"record","name":"MyRecord4","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }; export const clusterSchemasPayloadWithNewSchema: SchemaSubject[] = [ @@ -70,6 +76,7 @@ export const clusterSchemasPayloadWithNewSchema: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord4","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, { subject: 'test3', @@ -78,6 +85,7 @@ export const clusterSchemasPayloadWithNewSchema: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, { subject: 'test', @@ -86,6 +94,7 @@ export const clusterSchemasPayloadWithNewSchema: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord2","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, { subject: 'test4', @@ -94,5 +103,6 @@ export const clusterSchemasPayloadWithNewSchema: SchemaSubject[] = [ schema: '{"type":"record","name":"MyRecord4","namespace":"com.mycompany","fields":[{"name":"id","type":"long"}]}', compatibilityLevel: 'BACKWARD', + schemaType: SchemaType.JSON, }, ];