refs resolving

This commit is contained in:
iliax 2023-04-27 21:05:54 +04:00
parent 5429e0f9b3
commit 864bff081a
13 changed files with 182 additions and 27 deletions

View file

@ -91,7 +91,7 @@ public class SchemaRegistryService {
private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName, private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
String version) { String version) {
return api(cluster) return api(cluster)
.mono(c -> c.getSubjectVersion(schemaName, version)) .mono(c -> c.getSubjectVersion(schemaName, version, false))
.zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName)) .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
.map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2())) .map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException())); .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));

View file

@ -0,0 +1,58 @@
package com.provectus.kafka.ui.service.integration.odd;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import java.util.List;
import reactor.core.publisher.Mono;
// logic copied from AbstractSchemaProvider:resolveReferences
// https://github.com/confluentinc/schema-registry/blob/fd59613e2c5adf62e36705307f420712e4c8c1ea/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java#L54
class SchemaReferencesResolver {
private final KafkaSrClientApi client;
SchemaReferencesResolver(KafkaSrClientApi client) {
this.client = client;
}
Mono<ImmutableMap<String, String>> resolve(List<SchemaReference> references) {
ImmutableMap<String, String> result = ImmutableMap.of();
ImmutableSet<String> visited = ImmutableSet.of();
return resolveReferences(references, result, visited);
}
private Mono<ImmutableMap<String, String>> resolveReferences(List<SchemaReference> references,
ImmutableMap<String, String> inputSchemas,
ImmutableSet<String> visited) {
if (references == null || references.isEmpty()) {
return Mono.just(inputSchemas);
}
Mono<ImmutableMap<String, String>> result = Mono.just(inputSchemas);
for (SchemaReference reference : references) {
if (!visited.contains(reference.getName())) {
visited = ImmutableSet.<String>builder().addAll(visited).add(reference.getName()).build();
final ImmutableSet<String> finalVisited = visited;
result = result.flatMap(registeredSchemas ->
client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true)
.flatMap(subj -> {
checkNotNull(subj.getSchema(), "Subject '%s' schema is null", subj.getSubject());
if (registeredSchemas.containsKey(reference.getName())) {
return Mono.just(registeredSchemas);
}
return resolveReferences(subj.getReferences(), registeredSchemas, finalVisited)
.map(updated -> ImmutableMap.<String, String>builder()
.putAll(updated)
.put(reference.getName(), subj.getSchema())
.build());
}));
}
}
return result;
}
}

View file

@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Statistics; import com.provectus.kafka.ui.model.Statistics;
import com.provectus.kafka.ui.service.StatisticsCache; import com.provectus.kafka.ui.service.StatisticsCache;
import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors; import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -24,6 +25,8 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
import org.springframework.web.reactive.function.client.WebClientResponseException; import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
@ -98,9 +101,8 @@ class TopicsExporter {
return Mono.just(List.of()); return Mono.just(List.of());
} }
String subject = topic + (isKey ? "-key" : "-value"); String subject = topic + (isKey ? "-key" : "-value");
return cluster.getSchemaRegistryClient() return getSubjWithResolvedRefs(cluster, subject)
.mono(client -> client.getSubjectVersion(subject, "latest")) .map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey))
.map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of())) .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
.onErrorResume(th -> true, th -> { .onErrorResume(th -> true, th -> {
log.warn("Error retrieving subject {} for cluster {}", subject, cluster.getName(), th); log.warn("Error retrieving subject {} for cluster {}", subject, cluster.getName(), th);
@ -108,4 +110,13 @@ class TopicsExporter {
}); });
} }
private Mono<Tuple2<SchemaSubject, ImmutableMap<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster,
String subjectName) {
return cluster.getSchemaRegistryClient()
.mono(client ->
client.getSubjectVersion(subjectName, "latest", false)
.flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences())
.map(resolvedRefs -> Tuples.of(subj, resolvedRefs))));
}
} }

View file

@ -1,7 +1,7 @@
package com.provectus.kafka.ui.service.integration.odd.schema; package com.provectus.kafka.ui.service.integration.odd.schema;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.provectus.kafka.ui.sr.model.SchemaSubject; import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.avro.Schema; import org.apache.avro.Schema;
@ -14,8 +14,8 @@ final class AvroExtractor {
private AvroExtractor() { private AvroExtractor() {
} }
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { static List<DataSetField> extract(AvroSchema avroSchema, KafkaPath topicOddrn, boolean isKey) {
var schema = new Schema.Parser().parse(subject.getSchema()); var schema = avroSchema.rawSchema();
List<DataSetField> result = new ArrayList<>(); List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey)); result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
extract( extract(

View file

@ -2,7 +2,11 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
import com.provectus.kafka.ui.sr.model.SchemaSubject; import com.provectus.kafka.ui.sr.model.SchemaSubject;
import com.provectus.kafka.ui.sr.model.SchemaType; import com.provectus.kafka.ui.sr.model.SchemaType;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.opendatadiscovery.client.model.DataSetField; import org.opendatadiscovery.client.model.DataSetField;
import org.opendatadiscovery.client.model.DataSetFieldType; import org.opendatadiscovery.client.model.DataSetFieldType;
@ -10,12 +14,18 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
public final class DataSetFieldsExtractors { public final class DataSetFieldsExtractors {
public static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { public static List<DataSetField> extract(SchemaSubject subject,
Map<String, String> resolvedRefs,
KafkaPath topicOddrn,
boolean isKey) {
SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO); SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO);
return switch (schemaType) { return switch (schemaType) {
case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey); case AVRO -> AvroExtractor.extract(
case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey); new AvroSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey); case JSON -> JsonSchemaExtractor.extract(
new JsonSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
case PROTOBUF -> ProtoExtractor.extract(
new ProtobufSchema(subject.getSchema(), List.of(), resolvedRefs, null, null), topicOddrn, isKey);
}; };
} }

View file

@ -30,8 +30,8 @@ final class JsonSchemaExtractor {
private JsonSchemaExtractor() { private JsonSchemaExtractor() {
} }
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { static List<DataSetField> extract(JsonSchema jsonSchema, KafkaPath topicOddrn, boolean isKey) {
Schema schema = new JsonSchema(subject.getSchema()).rawSchema(); Schema schema = jsonSchema.rawSchema();
List<DataSetField> result = new ArrayList<>(); List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey)); result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
extract( extract(

View file

@ -15,7 +15,6 @@ import com.google.protobuf.Timestamp;
import com.google.protobuf.UInt32Value; import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value; import com.google.protobuf.UInt64Value;
import com.google.protobuf.Value; import com.google.protobuf.Value;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -42,8 +41,8 @@ final class ProtoExtractor {
private ProtoExtractor() { private ProtoExtractor() {
} }
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) { static List<DataSetField> extract(ProtobufSchema protobufSchema, KafkaPath topicOddrn, boolean isKey) {
Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor(); Descriptor schema = protobufSchema.toDescriptor();
List<DataSetField> result = new ArrayList<>(); List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey)); result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value"); var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");

View file

@ -0,0 +1,74 @@
package com.provectus.kafka.ui.service.integration.odd;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import java.util.List;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
class SchemaReferencesResolverTest {
private final KafkaSrClientApi srClientMock = mock(KafkaSrClientApi.class);
private final SchemaReferencesResolver schemaReferencesResolver = new SchemaReferencesResolver(srClientMock);
@Test
void test() {
mockSrCall("sub1", 1,
new SchemaSubject()
.schema("schema1"));
mockSrCall("sub2", 1,
new SchemaSubject()
.schema("schema2")
.references(
List.of(
new SchemaReference().name("ref2_1").subject("sub2_1").version(2),
new SchemaReference().name("ref2_2").subject("sub1").version(1))));
mockSrCall("sub2_1", 2,
new SchemaSubject()
.schema("schema2_1")
.references(
List.of(
new SchemaReference().name("ref2_1_1").subject("sub2_1_1").version(3),
new SchemaReference().name("ref1").subject("should_not_be_called").version(1)
))
);
mockSrCall("sub2_1_1", 3,
new SchemaSubject()
.schema("schema2_1_1"));
var result = schemaReferencesResolver.resolve(
List.of(
new SchemaReference().name("ref1").subject("sub1").version(1),
new SchemaReference().name("ref2").subject("sub2").version(1)
)
);
assertThat(result.block())
.containsExactlyEntriesOf(
ImmutableMap.<String, String>builder()
.put("ref1", "schema1")
.put("ref2_1_1", "schema2_1_1")
.put("ref2_1", "schema2_1")
.put("ref2_2", "schema1")
.put("ref2", "schema2")
.build()
);
}
private void mockSrCall(String subject, int version, SchemaSubject subjectToReturn) {
when(srClientMock.getSubjectVersion(subject, version + "", true))
.thenReturn(Mono.just(subjectToReturn));
}
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.service.integration.odd; package com.provectus.kafka.ui.service.integration.odd;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -52,7 +53,7 @@ class TopicsExporterTest {
@Test @Test
void doesNotExportTopicsWhichDontFitFiltrationRule() { void doesNotExportTopicsWhichDontFitFiltrationRule() {
when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString())) when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString(), anyBoolean()))
.thenReturn(Mono.error(new RuntimeException("Not found"))); .thenReturn(Mono.error(new RuntimeException("Not found")));
stats = Statistics.empty() stats = Statistics.empty()
@ -83,14 +84,14 @@ class TopicsExporterTest {
@Test @Test
void doesExportTopicData() { void doesExportTopicData() {
when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest")) when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest", false))
.thenReturn(Mono.just( .thenReturn(Mono.just(
new SchemaSubject() new SchemaSubject()
.schema("\"string\"") .schema("\"string\"")
.schemaType(SchemaType.AVRO) .schemaType(SchemaType.AVRO)
)); ));
when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest")) when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest", false))
.thenReturn(Mono.just( .thenReturn(Mono.just(
new SchemaSubject() new SchemaSubject()
.schema("\"int\"") .schema("\"int\"")

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.sr.model.SchemaSubject; import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import org.opendatadiscovery.client.model.DataSetField; import org.opendatadiscovery.client.model.DataSetField;
@ -15,8 +15,7 @@ class AvroExtractorTest {
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
void test(boolean isKey) { void test(boolean isKey) {
var list = AvroExtractor.extract( var list = AvroExtractor.extract(
new SchemaSubject() new AvroSchema("""
.schema("""
{ {
"type": "record", "type": "record",
"name": "Message", "name": "Message",

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.sr.model.SchemaSubject; import io.confluent.kafka.schemaregistry.json.JsonSchema;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -40,7 +40,7 @@ class JsonSchemaExtractorTest {
} }
"""; """;
var fields = JsonSchemaExtractor.extract( var fields = JsonSchemaExtractor.extract(
new SchemaSubject().schema(jsonSchema), new JsonSchema(jsonSchema),
KafkaPath.builder() KafkaPath.builder()
.cluster("localhost:9092") .cluster("localhost:9092")
.topic("someTopic") .topic("someTopic")

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.sr.model.SchemaSubject; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import org.opendatadiscovery.client.model.DataSetField; import org.opendatadiscovery.client.model.DataSetField;
@ -54,8 +54,7 @@ class ProtoExtractorTest {
}"""; }""";
var list = ProtoExtractor.extract( var list = ProtoExtractor.extract(
new SchemaSubject() new ProtobufSchema(protoSchema),
.schema(protoSchema),
KafkaPath.builder() KafkaPath.builder()
.cluster("localhost:9092") .cluster("localhost:9092")
.topic("someTopic") .topic("someTopic")

View file

@ -77,6 +77,10 @@ paths:
required: true required: true
schema: schema:
type: string type: string
- name: deleted
in: query
schema:
type: boolean
responses: responses:
200: 200:
description: OK description: OK