瀏覽代碼

SchemaRegistry references support (#3747)

1. references fields added to io dtos
2. json schema references parsing fixed (using confluent JsonSchema)
3. schema refs resolving added to ODD schema extractors
Ilya Kuramshin 1 年之前
父節點
當前提交
d4001b5a39
共有 18 個文件被更改,包括 289 次插入38 次删除
  1. 5 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
  2. 7 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java
  3. 4 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java
  4. 2 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java
  5. 55 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java
  6. 14 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java
  7. 3 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java
  8. 14 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java
  9. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java
  10. 2 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java
  11. 54 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java
  12. 86 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java
  13. 4 4
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java
  14. 2 3
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java
  15. 2 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java
  16. 2 3
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java
  17. 8 0
      kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml
  18. 23 2
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+ 5 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -123,9 +123,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("deleteTopic")
         .build();
 
-    return accessControlService.validateAccess(context).then(
-        topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    return accessControlService.validateAccess(context)
+        .then(
+            topicsService.deleteTopic(getCluster(clusterName), topicName)
+                .thenReturn(ResponseEntity.ok().<Void>build())
+        ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
 

+ 7 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaSrMapper.java

@@ -3,18 +3,21 @@ package com.provectus.kafka.ui.mapper;
 import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
 import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
 import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
+import com.provectus.kafka.ui.model.SchemaReferenceDTO;
 import com.provectus.kafka.ui.model.SchemaSubjectDTO;
 import com.provectus.kafka.ui.model.SchemaTypeDTO;
 import com.provectus.kafka.ui.service.SchemaRegistryService;
 import com.provectus.kafka.ui.sr.model.Compatibility;
 import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
 import com.provectus.kafka.ui.sr.model.NewSubject;
+import com.provectus.kafka.ui.sr.model.SchemaReference;
 import com.provectus.kafka.ui.sr.model.SchemaType;
+import java.util.List;
 import java.util.Optional;
 import org.mapstruct.Mapper;
 
 
-@Mapper(componentModel = "spring")
+@Mapper
 public interface KafkaSrMapper {
 
   default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLevel s) {
@@ -24,9 +27,12 @@ public interface KafkaSrMapper {
         .subject(s.getSubject())
         .schema(s.getSchema())
         .schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue()))
+        .references(toDto(s.getReferences()))
         .compatibilityLevel(s.getCompatibility().toString());
   }
 
+  List<SchemaReferenceDTO> toDto(List<SchemaReference> references);
+
   CompatibilityCheckResponseDTO toDto(CompatibilityCheckResponse ccr);
 
   CompatibilityLevelDTO.CompatibilityEnum toDto(Compatibility compatibility);

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

@@ -20,6 +20,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
@@ -217,7 +218,9 @@ public class SchemaRegistrySerde implements BuiltInSerde {
       case AVRO -> new AvroJsonSchemaConverter()
           .convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
           .toJson();
-      case JSON -> schema.getSchema();
+      case JSON ->
+        //need to use confluent JsonSchema since it includes resolved references
+        ((JsonSchema) parsedSchema).rawSchema().toString();
     };
   }
 

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -14,8 +14,7 @@ import com.provectus.kafka.ui.sr.model.CompatibilityLevelChange;
 import com.provectus.kafka.ui.sr.model.NewSubject;
 import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import com.provectus.kafka.ui.util.ReactiveFailover;
-import com.provectus.kafka.ui.util.WebClientConfigurator;
-import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.List;
 import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
@@ -92,7 +91,7 @@ public class SchemaRegistryService {
   private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
                                                                String version) {
     return api(cluster)
-        .mono(c -> c.getSubjectVersion(schemaName, version))
+        .mono(c -> c.getSubjectVersion(schemaName, version, false))
         .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
         .map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
         .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));

+ 55 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolver.java

@@ -0,0 +1,55 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
+import com.provectus.kafka.ui.sr.model.SchemaReference;
+import java.util.List;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import reactor.core.publisher.Mono;
+
+// logic copied from AbstractSchemaProvider:resolveReferences
+// https://github.com/confluentinc/schema-registry/blob/fd59613e2c5adf62e36705307f420712e4c8c1ea/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java#L54
+class SchemaReferencesResolver {
+
+  private final KafkaSrClientApi client;
+
+  SchemaReferencesResolver(KafkaSrClientApi client) {
+    this.client = client;
+  }
+
+  Mono<ImmutableMap<String, String>> resolve(List<SchemaReference> refs) {
+    return resolveReferences(refs, new Resolving(ImmutableMap.of(), ImmutableSet.of()))
+        .map(Resolving::resolved);
+  }
+
+  private record Resolving(ImmutableMap<String, String> resolved, ImmutableSet<String> visited) {
+
+    Resolving visit(String name) {
+      return new Resolving(resolved, ImmutableSet.<String>builder().addAll(visited).add(name).build());
+    }
+
+    Resolving resolve(String ref, String schema) {
+      return new Resolving(ImmutableMap.<String, String>builder().putAll(resolved).put(ref, schema).build(), visited);
+    }
+  }
+
+  private Mono<Resolving> resolveReferences(@Nullable List<SchemaReference> refs, Resolving initState) {
+    Mono<Resolving> result = Mono.just(initState);
+    for (SchemaReference reference : Optional.ofNullable(refs).orElse(List.of())) {
+      result = result.flatMap(state -> {
+        if (state.visited().contains(reference.getName())) {
+          return Mono.just(state);
+        } else {
+          final var newState = state.visit(reference.getName());
+          return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true)
+              .flatMap(subj ->
+                  resolveReferences(subj.getReferences(), newState)
+                      .map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema())));
+        }
+      });
+    }
+    return result;
+  }
+}

+ 14 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java

@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.Statistics;
 import com.provectus.kafka.ui.service.StatisticsCache;
 import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +25,8 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 @Slf4j
 @RequiredArgsConstructor
@@ -100,12 +103,20 @@ class TopicsExporter {
       return Mono.just(List.of());
     }
     String subject = topic + (isKey ? "-key" : "-value");
-    return cluster.getSchemaRegistryClient()
-        .mono(client -> client.getSubjectVersion(subject, "latest"))
-        .map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey))
+    return getSubjWithResolvedRefs(cluster, subject)
+        .map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey))
         .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
         .onErrorMap(WebClientResponseException.class, err ->
             new IllegalStateException("Error retrieving subject %s".formatted(subject), err));
   }
 
+  private Mono<Tuple2<SchemaSubject, Map<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster,
+                                                                                   String subjectName) {
+    return cluster.getSchemaRegistryClient()
+        .mono(client ->
+            client.getSubjectVersion(subjectName, "latest", false)
+                .flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences())
+                    .map(resolvedRefs -> Tuples.of(subj, resolvedRefs))));
+  }
+
 }

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java

@@ -1,7 +1,7 @@
 package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import com.google.common.collect.ImmutableSet;
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.avro.Schema;
@@ -14,8 +14,8 @@ final class AvroExtractor {
   private AvroExtractor() {
   }
 
-  static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
-    var schema = new Schema.Parser().parse(subject.getSchema());
+  static List<DataSetField> extract(AvroSchema avroSchema, KafkaPath topicOddrn, boolean isKey) {
+    var schema = avroSchema.rawSchema();
     List<DataSetField> result = new ArrayList<>();
     result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
     extract(

+ 14 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java

@@ -2,7 +2,11 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import com.provectus.kafka.ui.sr.model.SchemaType;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import org.opendatadiscovery.client.model.DataSetField;
 import org.opendatadiscovery.client.model.DataSetFieldType;
@@ -10,12 +14,18 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
 
 public final class DataSetFieldsExtractors {
 
-  public static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
+  public static List<DataSetField> extract(SchemaSubject subject,
+                                           Map<String, String> resolvedRefs,
+                                           KafkaPath topicOddrn,
+                                           boolean isKey) {
     SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO);
     return switch (schemaType) {
-      case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey);
-      case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey);
-      case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey);
+      case AVRO -> AvroExtractor.extract(
+          new AvroSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
+      case JSON -> JsonSchemaExtractor.extract(
+          new JsonSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
+      case PROTOBUF -> ProtoExtractor.extract(
+          new ProtobufSchema(subject.getSchema(), List.of(), resolvedRefs, null, null), topicOddrn, isKey);
     };
   }
 

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java

@@ -30,8 +30,8 @@ final class JsonSchemaExtractor {
   private JsonSchemaExtractor() {
   }
 
-  static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
-    Schema schema = new JsonSchema(subject.getSchema()).rawSchema();
+  static List<DataSetField> extract(JsonSchema jsonSchema, KafkaPath topicOddrn, boolean isKey) {
+    Schema schema = jsonSchema.rawSchema();
     List<DataSetField> result = new ArrayList<>();
     result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
     extract(

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java

@@ -15,7 +15,6 @@ import com.google.protobuf.Timestamp;
 import com.google.protobuf.UInt32Value;
 import com.google.protobuf.UInt64Value;
 import com.google.protobuf.Value;
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,8 +41,8 @@ final class ProtoExtractor {
   private ProtoExtractor() {
   }
 
-  static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
-    Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor();
+  static List<DataSetField> extract(ProtobufSchema protobufSchema, KafkaPath topicOddrn, boolean isKey) {
+    Descriptor schema = protobufSchema.toDescriptor();
     List<DataSetField> result = new ArrayList<>();
     result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
     var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");

+ 54 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui;
 
 import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
 import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
+import com.provectus.kafka.ui.model.SchemaReferenceDTO;
 import com.provectus.kafka.ui.model.SchemaSubjectDTO;
 import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
 import com.provectus.kafka.ui.model.SchemaTypeDTO;
@@ -190,6 +191,58 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest {
     Assertions.assertEquals(schema, actual.getSchema());
   }
 
+
+  @Test
+  void shouldCreateNewProtobufSchemaWithRefs() {
+    NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO()
+        .schemaType(SchemaTypeDTO.PROTOBUF)
+        .subject(subject + "-ref")
+        .schema("""
+            syntax = "proto3";
+            message MyRecord {
+              int32 id = 1;
+              string name = 2;
+            }
+            """);
+
+    webTestClient
+        .post()
+        .uri("/api/clusters/{clusterName}/schemas", LOCAL)
+        .contentType(MediaType.APPLICATION_JSON)
+        .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
+        .exchange()
+        .expectStatus()
+        .isOk();
+
+    requestBody = new NewSchemaSubjectDTO()
+        .schemaType(SchemaTypeDTO.PROTOBUF)
+        .subject(subject)
+        .schema("""
+            syntax = "proto3";
+            import "MyRecord.proto";
+            message MyRecordWithRef {
+              int32 id = 1;
+              MyRecord my_ref = 2;
+            }
+            """)
+        .references(List.of(new SchemaReferenceDTO().name("MyRecord.proto").subject(subject + "-ref").version(1)));
+
+    SchemaSubjectDTO actual = webTestClient
+        .post()
+        .uri("/api/clusters/{clusterName}/schemas", LOCAL)
+        .contentType(MediaType.APPLICATION_JSON)
+        .body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
+        .exchange()
+        .expectStatus()
+        .isOk()
+        .expectBody(SchemaSubjectDTO.class)
+        .returnResult()
+        .getResponseBody();
+
+    Assertions.assertNotNull(actual);
+    Assertions.assertEquals(requestBody.getReferences(), actual.getReferences());
+  }
+
   @Test
   public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
     webTestClient
@@ -278,7 +331,7 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest {
   void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
     String schema =
         "{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":"
-        + "\"{\\\"type\\\": \\\"string\\\"}\"}";
+            + "\"{\\\"type\\\": \\\"string\\\"}\"}";
 
     webTestClient
         .post()

+ 86 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/SchemaReferencesResolverTest.java

@@ -0,0 +1,86 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
+import com.provectus.kafka.ui.sr.model.SchemaReference;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import java.util.List;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class SchemaReferencesResolverTest {
+
+  private final KafkaSrClientApi srClientMock = mock(KafkaSrClientApi.class);
+
+  private final SchemaReferencesResolver schemaReferencesResolver = new SchemaReferencesResolver(srClientMock);
+
+  @Test
+  void resolvesRefsUsingSrClient() {
+    mockSrCall("sub1", 1,
+        new SchemaSubject()
+            .schema("schema1"));
+
+    mockSrCall("sub2", 1,
+        new SchemaSubject()
+            .schema("schema2")
+            .references(
+                List.of(
+                    new SchemaReference().name("ref2_1").subject("sub2_1").version(2),
+                    new SchemaReference().name("ref2_2").subject("sub1").version(1))));
+
+    mockSrCall("sub2_1", 2,
+        new SchemaSubject()
+            .schema("schema2_1")
+            .references(
+                List.of(
+                    new SchemaReference().name("ref2_1_1").subject("sub2_1_1").version(3),
+                    new SchemaReference().name("ref1").subject("should_not_be_called").version(1)
+                ))
+    );
+
+    mockSrCall("sub2_1_1", 3,
+        new SchemaSubject()
+            .schema("schema2_1_1"));
+
+    var resolvedRefsMono = schemaReferencesResolver.resolve(
+        List.of(
+            new SchemaReference().name("ref1").subject("sub1").version(1),
+            new SchemaReference().name("ref2").subject("sub2").version(1)));
+
+    StepVerifier.create(resolvedRefsMono)
+        .assertNext(refs ->
+            assertThat(refs)
+                .containsExactlyEntriesOf(
+                    // checking map should be ordered
+                    ImmutableMap.<String, String>builder()
+                        .put("ref1", "schema1")
+                        .put("ref2_1_1", "schema2_1_1")
+                        .put("ref2_1", "schema2_1")
+                        .put("ref2_2", "schema1")
+                        .put("ref2", "schema2")
+                        .build()))
+        .verifyComplete();
+  }
+
+  @Test
+  void returnsEmptyMapOnEmptyInputs() {
+    StepVerifier.create(schemaReferencesResolver.resolve(null))
+        .assertNext(map -> assertThat(map).isEmpty())
+        .verifyComplete();
+
+    StepVerifier.create(schemaReferencesResolver.resolve(List.of()))
+        .assertNext(map -> assertThat(map).isEmpty())
+        .verifyComplete();
+  }
+
+  private void mockSrCall(String subject, int version, SchemaSubject subjectToReturn) {
+    when(srClientMock.getSubjectVersion(subject, version + "", true))
+        .thenReturn(Mono.just(subjectToReturn));
+  }
+
+}

+ 4 - 4
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.service.integration.odd;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -54,9 +55,8 @@ class TopicsExporterTest {
 
   @Test
   void doesNotExportTopicsWhichDontFitFiltrationRule() {
-    when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString()))
+    when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString(), anyBoolean()))
         .thenReturn(Mono.error(WebClientResponseException.create(404, "NF", new HttpHeaders(), null, null, null)));
-
     stats = Statistics.empty()
         .toBuilder()
         .topicDescriptions(
@@ -85,14 +85,14 @@ class TopicsExporterTest {
 
   @Test
   void doesExportTopicData() {
-    when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest"))
+    when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest", false))
         .thenReturn(Mono.just(
             new SchemaSubject()
                 .schema("\"string\"")
                 .schemaType(SchemaType.AVRO)
         ));
 
-    when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest"))
+    when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest", false))
         .thenReturn(Mono.just(
             new SchemaSubject()
                 .schema("\"int\"")

+ 2 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java

@@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.opendatadiscovery.client.model.DataSetField;
@@ -15,8 +15,7 @@ class AvroExtractorTest {
   @ValueSource(booleans = {true, false})
   void test(boolean isKey) {
     var list = AvroExtractor.extract(
-        new SchemaSubject()
-            .schema("""
+        new AvroSchema("""
                 {
                     "type": "record",
                     "name": "Message",

+ 2 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java

@@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +40,7 @@ class JsonSchemaExtractorTest {
         }
         """;
     var fields = JsonSchemaExtractor.extract(
-        new SchemaSubject().schema(jsonSchema),
+        new JsonSchema(jsonSchema),
         KafkaPath.builder()
             .cluster("localhost:9092")
             .topic("someTopic")

+ 2 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java

@@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.opendatadiscovery.client.model.DataSetField;
@@ -54,8 +54,7 @@ class ProtoExtractorTest {
         }""";
 
     var list = ProtoExtractor.extract(
-        new SchemaSubject()
-            .schema(protoSchema),
+        new ProtobufSchema(protoSchema),
         KafkaPath.builder()
             .cluster("localhost:9092")
             .topic("someTopic")

+ 8 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-sr-api.yaml

@@ -77,6 +77,10 @@ paths:
                 required: true
                 schema:
                   type: string
+              - name: deleted
+                in: query
+                schema:
+                  type: boolean
             responses:
                 200:
                     description: OK
@@ -317,6 +321,10 @@ components:
                 type: string
               schemaType:
                   $ref: '#/components/schemas/SchemaType'
+              references:
+                type: array
+                items:
+                  $ref: '#/components/schemas/SchemaReference'
             required:
               - id
               - subject

+ 23 - 2
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -2916,6 +2916,10 @@ components:
           type: string
         schemaType:
           $ref: '#/components/schemas/SchemaType'
+        references:
+          type: array
+          items:
+            $ref: '#/components/schemas/SchemaReference'
       required:
         - id
         - subject
@@ -2933,13 +2937,30 @@ components:
         schema:
           type: string
         schemaType:
-          $ref: '#/components/schemas/SchemaType'
-          # upon updating a schema, the type of existing schema can't be changed
+          $ref: '#/components/schemas/SchemaType' # upon updating a schema, the type of existing schema can't be changed
+        references:
+          type: array
+          items:
+            $ref: '#/components/schemas/SchemaReference'
       required:
         - subject
         - schema
         - schemaType
 
+    SchemaReference:
+      type: object
+      properties:
+        name:
+          type: string
+        subject:
+          type: string
+        version:
+          type: integer
+      required:
+        - name
+        - subject
+        - version
+
     CompatibilityLevel:
       type: object
       properties: