From 2bff27244f2d893583ea7b2c3f09151eada91379 Mon Sep 17 00:00:00 2001 From: iliax Date: Thu, 27 Apr 2023 22:45:20 +0400 Subject: [PATCH] tests added --- .../odd/SchemaReferencesResolver.java | 55 +++++++++---------- .../kafka/ui/SchemaRegistryServiceTests.java | 55 ++++++++++++++++++- .../odd/SchemaReferencesResolverTest.java | 1 + 3 files changed, 80 insertions(+), 31 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java index 003c90bd56..8e6ca22922 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java @@ -1,15 +1,10 @@ package com.provectus.kafka.ui.service.integration.odd; -import static com.google.common.base.Preconditions.checkNotNull; - import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.provectus.kafka.ui.sr.api.KafkaSrClientApi; import com.provectus.kafka.ui.sr.model.SchemaReference; import java.util.List; -import java.util.Optional; import reactor.core.publisher.Mono; // logic copied from AbstractSchemaProvider:resolveReferences @@ -22,41 +17,41 @@ class SchemaReferencesResolver { this.client = client; } - Mono> resolve(List references) { - return resolveReferences(references, new State(ImmutableMap.of(), ImmutableSet.of())) - .map(State::resolved); + Mono> resolve(List refs) { + return resolveReferences( + refs == null ? List.of() : refs, + new Resolving(ImmutableMap.of(), ImmutableSet.of())).map( + Resolving::resolved + ); } - private record State(ImmutableMap resolved, ImmutableSet visited) { + private record Resolving(ImmutableMap resolved, ImmutableSet visited) { - State visit(String name) { - return new State(resolved, ImmutableSet.builder().addAll(visited).add(name).build()); + Resolving visit(String name) { + return new Resolving(resolved, ImmutableSet.builder().addAll(visited).add(name).build()); } - State resolve(String ref, String schema) { - return new State(ImmutableMap.builder().putAll(resolved).put(ref, schema).build(), visited); + Resolving resolve(String ref, String schema) { + return new Resolving(ImmutableMap.builder().putAll(resolved).put(ref, schema).build(), visited); } } - private Mono resolveReferences(List references, - State initState) { - Mono result = Mono.just(initState); - for (var reference : Optional.ofNullable(references).orElse(List.of())) { + private Mono resolveReferences(List references, Resolving initState) { + Mono result = Mono.just(initState); + for (SchemaReference reference : references) { result = result.flatMap(state -> { - if (state.visited().contains(reference.getName())) { - return Mono.just(state); - } else { - final var newState = state.visit(reference.getName()); - return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true) - .flatMap(subj -> - resolveReferences(subj.getReferences(), newState) - .map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema())) - ); - } - } - ); + if (state.visited().contains(reference.getName())) { + return Mono.just(state); + } else { + final var newState = state.visit(reference.getName()); + return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true) + .flatMap(subj -> + resolveReferences(subj.getReferences(), newState) + .map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema())) + ); + } + }); } return result; } - } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java index 60959be049..5fa9aee766 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui; import com.provectus.kafka.ui.model.CompatibilityLevelDTO; import com.provectus.kafka.ui.model.NewSchemaSubjectDTO; +import com.provectus.kafka.ui.model.SchemaReferenceDTO; import com.provectus.kafka.ui.model.SchemaSubjectDTO; import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO; import com.provectus.kafka.ui.model.SchemaTypeDTO; @@ -190,6 +191,58 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest { Assertions.assertEquals(schema, actual.getSchema()); } + + @Test + void shouldCreateNewProtobufSchemaWithRefs() { + NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO() + .schemaType(SchemaTypeDTO.PROTOBUF) + .subject(subject + "-ref") + .schema(""" + syntax = "proto3"; + message MyRecord { + int32 id = 1; + string name = 2; + } + """); + + webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class)) + .exchange() + .expectStatus() + .isOk(); + + requestBody = new NewSchemaSubjectDTO() + .schemaType(SchemaTypeDTO.PROTOBUF) + .subject(subject) + .schema(""" + syntax = "proto3"; + import "MyRecord.proto"; + message MyRecordWithRef { + int32 id = 1; + MyRecord my_ref = 2; + } + """) + .references(List.of(new SchemaReferenceDTO().name("MyRecord.proto").subject(subject + "-ref").version(1))); + + SchemaSubjectDTO actual = webTestClient + .post() + .uri("/api/clusters/{clusterName}/schemas", LOCAL) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class)) + .exchange() + .expectStatus() + .isOk() + .expectBody(SchemaSubjectDTO.class) + .returnResult() + .getResponseBody(); + + Assertions.assertNotNull(actual); + Assertions.assertEquals(requestBody.getReferences(), actual.getReferences()); + } + @Test public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() { webTestClient @@ -278,7 +331,7 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest { void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() { String schema = "{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":" - + "\"{\\\"type\\\": \\\"string\\\"}\"}"; + + "\"{\\\"type\\\": \\\"string\\\"}\"}"; webTestClient .post() diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java index d6f02af81b..c2fd899ad7 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java @@ -55,6 +55,7 @@ class SchemaReferencesResolverTest { assertThat(result.block()) .containsExactlyEntriesOf( + // checking map should be ordered ImmutableMap.builder() .put("ref1", "schema1") .put("ref2_1_1", "schema2_1_1")