From 864bff081a82e35be7ca393ec41778471b019b25 Mon Sep 17 00:00:00 2001 From: iliax Date: Thu, 27 Apr 2023 21:05:54 +0400 Subject: [PATCH] refs resolving --- .../ui/service/SchemaRegistryService.java | 2 +- .../odd/SchemaReferencesResolver.java | 58 +++++++++++++++ .../integration/odd/TopicsExporter.java | 17 ++++- .../integration/odd/schema/AvroExtractor.java | 6 +- .../odd/schema/DataSetFieldsExtractors.java | 18 ++++- .../odd/schema/JsonSchemaExtractor.java | 4 +- .../odd/schema/ProtoExtractor.java | 5 +- .../odd/SchemaReferencesResolverTest.java | 74 +++++++++++++++++++ .../integration/odd/TopicsExporterTest.java | 7 +- .../odd/schema/AvroExtractorTest.java | 5 +- .../odd/schema/JsonSchemaExtractorTest.java | 4 +- .../odd/schema/ProtoExtractorTest.java | 5 +- .../main/resources/swagger/kafka-sr-api.yaml | 4 + 13 files changed, 182 insertions(+), 27 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index 99f8af588f..cae29ba93d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -91,7 +91,7 @@ public class SchemaRegistryService { private Mono getSchemaSubject(KafkaCluster cluster, String schemaName, String version) { return api(cluster) - .mono(c -> c.getSubjectVersion(schemaName, version)) + .mono(c -> c.getSubjectVersion(schemaName, version, false)) .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName)) .map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2())) .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException())); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java new file mode 100644 index 0000000000..cba4720dda --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java @@ -0,0 +1,58 @@ +package com.provectus.kafka.ui.service.integration.odd; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.provectus.kafka.ui.sr.api.KafkaSrClientApi; +import com.provectus.kafka.ui.sr.model.SchemaReference; +import java.util.List; +import reactor.core.publisher.Mono; + +// logic copied from AbstractSchemaProvider:resolveReferences +// https://github.com/confluentinc/schema-registry/blob/fd59613e2c5adf62e36705307f420712e4c8c1ea/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java#L54 +class SchemaReferencesResolver { + + private final KafkaSrClientApi client; + + SchemaReferencesResolver(KafkaSrClientApi client) { + this.client = client; + } + + Mono> resolve(List references) { + ImmutableMap result = ImmutableMap.of(); + ImmutableSet visited = ImmutableSet.of(); + return resolveReferences(references, result, visited); + } + + private Mono> resolveReferences(List references, + ImmutableMap inputSchemas, + ImmutableSet visited) { + if (references == null || references.isEmpty()) { + return Mono.just(inputSchemas); + } + + Mono> result = Mono.just(inputSchemas); + for (SchemaReference reference : references) { + if (!visited.contains(reference.getName())) { + visited = ImmutableSet.builder().addAll(visited).add(reference.getName()).build(); + final ImmutableSet finalVisited = visited; + result = result.flatMap(registeredSchemas -> + client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true) + .flatMap(subj -> { + checkNotNull(subj.getSchema(), "Subject '%s' schema is null", subj.getSubject()); + if (registeredSchemas.containsKey(reference.getName())) { + return Mono.just(registeredSchemas); + } + return resolveReferences(subj.getReferences(), registeredSchemas, finalVisited) + .map(updated -> ImmutableMap.builder() + .putAll(updated) + .put(reference.getName(), subj.getSchema()) + .build()); + })); + } + } + return result; + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java index ad72e6f1dc..e05dafb36c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java @@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Statistics; import com.provectus.kafka.ui.service.StatisticsCache; import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors; +import com.provectus.kafka.ui.sr.model.SchemaSubject; import java.net.URI; import java.util.List; import java.util.Map; @@ -24,6 +25,8 @@ import org.opendatadiscovery.oddrn.model.KafkaPath; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; @Slf4j @RequiredArgsConstructor @@ -98,9 +101,8 @@ class TopicsExporter { return Mono.just(List.of()); } String subject = topic + (isKey ? "-key" : "-value"); - return cluster.getSchemaRegistryClient() - .mono(client -> client.getSubjectVersion(subject, "latest")) - .map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey)) + return getSubjWithResolvedRefs(cluster, subject) + .map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey)) .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of())) .onErrorResume(th -> true, th -> { log.warn("Error retrieving subject {} for cluster {}", subject, cluster.getName(), th); @@ -108,4 +110,13 @@ class TopicsExporter { }); } + private Mono>> getSubjWithResolvedRefs(KafkaCluster cluster, + String subjectName) { + return cluster.getSchemaRegistryClient() + .mono(client -> + client.getSubjectVersion(subjectName, "latest", false) + .flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences()) + .map(resolvedRefs -> Tuples.of(subj, resolvedRefs)))); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java index cc799a9e10..f942396293 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java @@ -1,7 +1,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema; import com.google.common.collect.ImmutableSet; -import com.provectus.kafka.ui.sr.model.SchemaSubject; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import java.util.ArrayList; import java.util.List; import org.apache.avro.Schema; @@ -14,8 +14,8 @@ final class AvroExtractor { private AvroExtractor() { } - static List extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { - var schema = new Schema.Parser().parse(subject.getSchema()); + static List extract(AvroSchema avroSchema, KafkaPath topicOddrn, boolean isKey) { + var schema = avroSchema.rawSchema(); List result = new ArrayList<>(); result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey)); extract( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java index e357db3079..b9093262bd 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java @@ -2,7 +2,11 @@ package com.provectus.kafka.ui.service.integration.odd.schema; import com.provectus.kafka.ui.sr.model.SchemaSubject; import com.provectus.kafka.ui.sr.model.SchemaType; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import java.util.List; +import java.util.Map; import java.util.Optional; import org.opendatadiscovery.client.model.DataSetField; import org.opendatadiscovery.client.model.DataSetFieldType; @@ -10,12 +14,18 @@ import org.opendatadiscovery.oddrn.model.KafkaPath; public final class DataSetFieldsExtractors { - public static List extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { + public static List extract(SchemaSubject subject, + Map resolvedRefs, + KafkaPath topicOddrn, + boolean isKey) { SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO); return switch (schemaType) { - case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey); - case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey); - case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey); + case AVRO -> AvroExtractor.extract( + new AvroSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey); + case JSON -> JsonSchemaExtractor.extract( + new JsonSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey); + case PROTOBUF -> ProtoExtractor.extract( + new ProtobufSchema(subject.getSchema(), List.of(), resolvedRefs, null, null), topicOddrn, isKey); }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java index 06201b1ce7..93adbdbe0c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java @@ -30,8 +30,8 @@ final class JsonSchemaExtractor { private JsonSchemaExtractor() { } - static List extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { - Schema schema = new JsonSchema(subject.getSchema()).rawSchema(); + static List extract(JsonSchema jsonSchema, KafkaPath topicOddrn, boolean isKey) { + Schema schema = jsonSchema.rawSchema(); List result = new ArrayList<>(); result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey)); extract( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java index c1316172f3..01b25ff48d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java @@ -15,7 +15,6 @@ import com.google.protobuf.Timestamp; import com.google.protobuf.UInt32Value; import com.google.protobuf.UInt64Value; import com.google.protobuf.Value; -import com.provectus.kafka.ui.sr.model.SchemaSubject; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import java.util.ArrayList; import java.util.List; @@ -42,8 +41,8 @@ final class ProtoExtractor { private ProtoExtractor() { } - static List extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { - Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor(); + static List extract(ProtobufSchema protobufSchema, KafkaPath topicOddrn, boolean isKey) { + Descriptor schema = protobufSchema.toDescriptor(); List result = new ArrayList<>(); result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey)); var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value"); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java new file mode 100644 index 0000000000..d6f02af81b --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java @@ -0,0 +1,74 @@ +package com.provectus.kafka.ui.service.integration.odd; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import com.provectus.kafka.ui.sr.api.KafkaSrClientApi; +import com.provectus.kafka.ui.sr.model.SchemaReference; +import com.provectus.kafka.ui.sr.model.SchemaSubject; +import java.util.List; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +class SchemaReferencesResolverTest { + + private final KafkaSrClientApi srClientMock = mock(KafkaSrClientApi.class); + + private final SchemaReferencesResolver schemaReferencesResolver = new SchemaReferencesResolver(srClientMock); + + @Test + void test() { + mockSrCall("sub1", 1, + new SchemaSubject() + .schema("schema1")); + + mockSrCall("sub2", 1, + new SchemaSubject() + .schema("schema2") + .references( + List.of( + new SchemaReference().name("ref2_1").subject("sub2_1").version(2), + new SchemaReference().name("ref2_2").subject("sub1").version(1)))); + + mockSrCall("sub2_1", 2, + new SchemaSubject() + .schema("schema2_1") + .references( + List.of( + new SchemaReference().name("ref2_1_1").subject("sub2_1_1").version(3), + new SchemaReference().name("ref1").subject("should_not_be_called").version(1) + )) + ); + + mockSrCall("sub2_1_1", 3, + new SchemaSubject() + .schema("schema2_1_1")); + + var result = schemaReferencesResolver.resolve( + List.of( + new SchemaReference().name("ref1").subject("sub1").version(1), + new SchemaReference().name("ref2").subject("sub2").version(1) + ) + ); + + assertThat(result.block()) + .containsExactlyEntriesOf( + ImmutableMap.builder() + .put("ref1", "schema1") + .put("ref2_1_1", "schema2_1_1") + .put("ref2_1", "schema2_1") + .put("ref2_2", "schema1") + .put("ref2", "schema2") + .build() + ); + } + + + private void mockSrCall(String subject, int version, SchemaSubject subjectToReturn) { + when(srClientMock.getSubjectVersion(subject, version + "", true)) + .thenReturn(Mono.just(subjectToReturn)); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java index 4d512612a6..34e6caf263 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service.integration.odd; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -52,7 +53,7 @@ class TopicsExporterTest { @Test void doesNotExportTopicsWhichDontFitFiltrationRule() { - when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString())) + when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString(), anyBoolean())) .thenReturn(Mono.error(new RuntimeException("Not found"))); stats = Statistics.empty() @@ -83,14 +84,14 @@ class TopicsExporterTest { @Test void doesExportTopicData() { - when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest")) + when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest", false)) .thenReturn(Mono.just( new SchemaSubject() .schema("\"string\"") .schemaType(SchemaType.AVRO) )); - when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest")) + when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest", false)) .thenReturn(Mono.just( new SchemaSubject() .schema("\"int\"") diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java index d523d7cd41..cd1baf7798 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java @@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema; import static org.assertj.core.api.Assertions.assertThat; -import com.provectus.kafka.ui.sr.model.SchemaSubject; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opendatadiscovery.client.model.DataSetField; @@ -15,8 +15,7 @@ class AvroExtractorTest { @ValueSource(booleans = {true, false}) void test(boolean isKey) { var list = AvroExtractor.extract( - new SchemaSubject() - .schema(""" + new AvroSchema(""" { "type": "record", "name": "Message", diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java index 7968e52e6d..30a1e6229c 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java @@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema; import static org.assertj.core.api.Assertions.assertThat; -import com.provectus.kafka.ui.sr.model.SchemaSubject; +import io.confluent.kafka.schemaregistry.json.JsonSchema; import java.net.URI; import java.util.List; import java.util.Map; @@ -40,7 +40,7 @@ class JsonSchemaExtractorTest { } """; var fields = JsonSchemaExtractor.extract( - new SchemaSubject().schema(jsonSchema), + new JsonSchema(jsonSchema), KafkaPath.builder() .cluster("localhost:9092") .topic("someTopic") diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java index cbb97a859c..8d6344d7cc 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java @@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema; import static org.assertj.core.api.Assertions.assertThat; -import com.provectus.kafka.ui.sr.model.SchemaSubject; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opendatadiscovery.client.model.DataSetField; @@ -54,8 +54,7 @@ class ProtoExtractorTest { }"""; var list = ProtoExtractor.extract( - new SchemaSubject() - .schema(protoSchema), + new ProtobufSchema(protoSchema), KafkaPath.builder() .cluster("localhost:9092") .topic("someTopic") diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml index 354b19996e..0320e891ec 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml @@ -77,6 +77,10 @@ paths: required: true schema: type: string + - name: deleted + in: query + schema: + type: boolean responses: 200: description: OK