Sfoglia il codice sorgente

Integration with ODD (#3289)

ODD integration
Exporting statistics:
- topics as DataSets (metadata + schemas)
- connectors as DataTransformers (metadata + inputs&outputs)Exporting to ODD:
Ilya Kuramshin 2 anni fa
parent
commit
59837394fb
25 ha cambiato i file con 2437 aggiunte e 52 eliminazioni
  1. 25 0
      kafka-ui-api/pom.xml
  2. 3 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java
  3. 2 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConfigSanitizer.java
  4. 31 44
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java
  5. 167 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorInfo.java
  6. 96 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java
  7. 106 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddExporter.java
  8. 27 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddExporterScheduler.java
  9. 31 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddIntegrationConfig.java
  10. 15 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddIntegrationProperties.java
  11. 79 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/Oddrn.java
  12. 111 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java
  13. 262 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java
  14. 38 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java
  15. 311 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java
  16. 230 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java
  17. 10 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java
  18. 2 3
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java
  19. 7 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KafkaConfigSanitizerTest.java
  20. 111 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java
  21. 167 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java
  22. 272 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java
  23. 145 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java
  24. 187 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java
  25. 2 0
      pom.xml

+ 25 - 0
kafka-ui-api/pom.xml

@@ -199,6 +199,31 @@
             <version>${antlr4-maven-plugin.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.opendatadiscovery</groupId>
+            <artifactId>oddrn-generator-java</artifactId>
+            <version>${odd-oddrn-generator.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opendatadiscovery</groupId>
+            <artifactId>ingestion-contract-client</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter-webflux</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.projectreactor</groupId>
+                    <artifactId>reactor-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.projectreactor.ipc</groupId>
+                    <artifactId>reactor-netty</artifactId>
+                </exclusion>
+            </exclusions>
+            <version>${odd-oddrn-client.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.springframework.security</groupId>
             <artifactId>spring-security-ldap</artifactId>

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java

@@ -37,10 +37,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
   public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
                                                             ServerWebExchange exchange) {
 
-    Flux<ConnectDTO> flux = Flux.fromIterable(kafkaConnectService.getConnects(getCluster(clusterName)))
+    Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
         .filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
 
-    return Mono.just(ResponseEntity.ok(flux));
+    return Mono.just(ResponseEntity.ok(availableConnects));
   }
 
   @Override
@@ -54,7 +54,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .build());
 
     return validateAccess.thenReturn(
-        ResponseEntity.ok(kafkaConnectService.getConnectors(getCluster(clusterName), connectName))
+        ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName))
     );
   }
 

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConfigSanitizer.java

@@ -17,7 +17,8 @@ import org.springframework.stereotype.Component;
 class KafkaConfigSanitizer extends Sanitizer {
   private static final List<String> DEFAULT_PATTERNS_TO_SANITIZE = Arrays.asList(
       "basic.auth.user.info",  /* For Schema Registry credentials */
-      "password", "secret", "token", "key", ".*credentials.*"  /* General credential patterns */
+      "password", "secret", "token", "key", ".*credentials.*",   /* General credential patterns */
+      "aws.access.*", "aws.secret.*", "aws.session.*"   /* AWS-related credential patterns */
   );
 
   KafkaConfigSanitizer(

+ 31 - 44
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -28,10 +28,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -40,7 +40,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
 @Service
@@ -52,18 +51,18 @@ public class KafkaConnectService {
   private final ObjectMapper objectMapper;
   private final KafkaConfigSanitizer kafkaConfigSanitizer;
 
-  public List<ConnectDTO> getConnects(KafkaCluster cluster) {
-    return Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
-        .map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
-        .orElse(List.of());
+  public Flux<ConnectDTO> getConnects(KafkaCluster cluster) {
+    return Flux.fromIterable(
+        Optional.ofNullable(cluster.getOriginalProperties().getKafkaConnect())
+            .map(lst -> lst.stream().map(clusterMapper::toKafkaConnect).toList())
+            .orElse(List.of())
+    );
   }
 
   public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
-                                                     final String search) {
-    Mono<Flux<ConnectDTO>> clusters = Mono.just(Flux.fromIterable(getConnects(cluster))); // TODO get rid
-    return clusters
-        .flatMapMany(Function.identity())
-        .flatMap(connect -> getConnectorNames(cluster, connect.getName()))
+                                                     @Nullable final String search) {
+    return getConnects(cluster)
+        .flatMap(connect -> getConnectorNames(cluster, connect.getName()).map(cn -> Tuples.of(connect.getName(), cn)))
         .flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
         .flatMap(connector ->
             getConnectorConfig(cluster, connector.getConnect(), connector.getName())
@@ -99,56 +98,46 @@ public class KafkaConnectService {
         .filter(matchesSearchTerm(search));
   }
 
-  private Predicate<FullConnectorInfoDTO> matchesSearchTerm(final String search) {
-    return connector -> getSearchValues(connector)
-        .anyMatch(value -> value.contains(
-            StringUtils.defaultString(
-                    search,
-                    StringUtils.EMPTY)
-                .toUpperCase()));
+  private Predicate<FullConnectorInfoDTO> matchesSearchTerm(@Nullable final String search) {
+    if (search == null) {
+      return c -> true;
+    }
+    return connector -> getStringsForSearch(connector)
+        .anyMatch(string -> StringUtils.containsIgnoreCase(string, search));
   }
 
-  private Stream<String> getSearchValues(FullConnectorInfoDTO fullConnectorInfo) {
+  private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
     return Stream.of(
-            fullConnectorInfo.getName(),
-            fullConnectorInfo.getStatus().getState().getValue(),
-            fullConnectorInfo.getType().getValue())
-        .map(String::toUpperCase);
+        fullConnectorInfo.getName(),
+        fullConnectorInfo.getStatus().getState().getValue(),
+        fullConnectorInfo.getType().getValue());
   }
 
-  private Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName,
-                                                   String connectorName) {
+  public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName,
+                                                  String connectorName) {
     return api(cluster, connectClusterName)
         .mono(c -> c.getConnectorTopics(connectorName))
         .map(result -> result.get(connectorName))
-        // old connectors don't have this api, setting empty list for
+        // old Connect API versions don't have this endpoint, setting empty list for
         // backward-compatibility
         .onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of())));
   }
 
-  private Flux<Tuple2<String, String>> getConnectorNames(KafkaCluster cluster, String connectName) {
-    return getConnectors(cluster, connectName)
-        .collectList().map(e -> e.get(0))
+  public Flux<String> getConnectorNames(KafkaCluster cluster, String connectName) {
+    return api(cluster, connectName)
+        .flux(client -> client.getConnectors(null))
         // for some reason `getConnectors` method returns the response as a single string
-        .map(this::parseToList)
-        .flatMapMany(Flux::fromIterable)
-        .map(connector -> Tuples.of(connectName, connector));
+        .collectList().map(e -> e.get(0))
+        .map(this::parseConnectorsNamesStringToList)
+        .flatMapMany(Flux::fromIterable);
   }
 
   @SneakyThrows
-  private List<String> parseToList(String json) {
+  private List<String> parseConnectorsNamesStringToList(String json) {
     return objectMapper.readValue(json, new TypeReference<>() {
     });
   }
 
-  public Flux<String> getConnectors(KafkaCluster cluster, String connectName) {
-    return api(cluster, connectName)
-        .flux(client ->
-            client.getConnectors(null)
-                .doOnError(e -> log.error("Unexpected error upon getting connectors", e))
-        );
-  }
-
   public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
                                             Mono<NewConnectorDTO> connector) {
     return api(cluster, connectName)
@@ -171,9 +160,7 @@ public class KafkaConnectService {
   private Mono<Boolean> connectorExists(KafkaCluster cluster, String connectName,
                                         String connectorName) {
     return getConnectorNames(cluster, connectName)
-        .map(Tuple2::getT2)
-        .collectList()
-        .map(connectorNames -> connectorNames.contains(connectorName));
+        .any(name -> name.equals(connectorName));
   }
 
   public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,

+ 167 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorInfo.java

@@ -0,0 +1,167 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import com.provectus.kafka.ui.model.ConnectorTypeDTO;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.opendatadiscovery.oddrn.JdbcUrlParser;
+import org.opendatadiscovery.oddrn.model.HivePath;
+import org.opendatadiscovery.oddrn.model.MysqlPath;
+import org.opendatadiscovery.oddrn.model.PostgreSqlPath;
+import org.opendatadiscovery.oddrn.model.SnowflakePath;
+
+record ConnectorInfo(List<String> inputs,
+                     List<String> outputs) {
+
+  static ConnectorInfo extract(String className,
+                               ConnectorTypeDTO type,
+                               Map<String, Object> config,
+                               List<String> topicsFromApi, // can be empty for old Connect API versions
+                               Function<String, String> topicOddrnBuilder) {
+    return switch (className) {
+      case "org.apache.kafka.connect.file.FileStreamSinkConnector",
+          "org.apache.kafka.connect.file.FileStreamSourceConnector",
+          "FileStreamSource",
+          "FileStreamSink" -> extractFileIoConnector(type, topicsFromApi, config, topicOddrnBuilder);
+      case "io.confluent.connect.s3.S3SinkConnector" -> extractS3Sink(type, topicsFromApi, config, topicOddrnBuilder);
+      case "io.confluent.connect.jdbc.JdbcSinkConnector" ->
+          extractJdbcSink(type, topicsFromApi, config, topicOddrnBuilder);
+      case "io.debezium.connector.postgresql.PostgresConnector" -> extractDebeziumPg(config);
+      case "io.debezium.connector.mysql.MySqlConnector" -> extractDebeziumMysql(config);
+      default -> new ConnectorInfo(
+          extractInputs(type, topicsFromApi, config, topicOddrnBuilder),
+          extractOutputs(type, topicsFromApi, config, topicOddrnBuilder)
+      );
+    };
+  }
+
+  private static ConnectorInfo extractFileIoConnector(ConnectorTypeDTO type,
+                                                      List<String> topics,
+                                                      Map<String, Object> config,
+                                                      Function<String, String> topicOddrnBuilder) {
+    return new ConnectorInfo(
+        extractInputs(type, topics, config, topicOddrnBuilder),
+        extractOutputs(type, topics, config, topicOddrnBuilder)
+    );
+  }
+
+  private static ConnectorInfo extractJdbcSink(ConnectorTypeDTO type,
+                                               List<String> topics,
+                                               Map<String, Object> config,
+                                               Function<String, String> topicOddrnBuilder) {
+    String tableNameFormat = (String) config.getOrDefault("table.name.format", "${topic}");
+    List<String> targetTables = extractTopicNamesBestEffort(topics, config)
+        .map(topic -> tableNameFormat.replace("${kafka}", topic))
+        .toList();
+
+    String connectionUrl = (String) config.get("connection.url");
+    List<String> outputs = new ArrayList<>();
+    @Nullable var knownJdbcPath = new JdbcUrlParser().parse(connectionUrl);
+    if (knownJdbcPath instanceof PostgreSqlPath p) {
+      targetTables.forEach(t -> outputs.add(p.toBuilder().table(t).build().oddrn()));
+    }
+    if (knownJdbcPath instanceof MysqlPath p) {
+      targetTables.forEach(t -> outputs.add(p.toBuilder().table(t).build().oddrn()));
+    }
+    if (knownJdbcPath instanceof HivePath p) {
+      targetTables.forEach(t -> outputs.add(p.toBuilder().table(t).build().oddrn()));
+    }
+    if (knownJdbcPath instanceof SnowflakePath p) {
+      targetTables.forEach(t -> outputs.add(p.toBuilder().table(t).build().oddrn()));
+    }
+    return new ConnectorInfo(
+        extractInputs(type, topics, config, topicOddrnBuilder),
+        outputs
+    );
+  }
+
+  private static ConnectorInfo extractDebeziumPg(Map<String, Object> config) {
+    String host = (String) config.get("database.hostname");
+    String dbName = (String) config.get("database.dbname");
+    var inputs = List.of(
+        PostgreSqlPath.builder()
+            .host(host)
+            .database(dbName)
+            .build().oddrn()
+    );
+    return new ConnectorInfo(inputs, List.of());
+  }
+
+  private static ConnectorInfo extractDebeziumMysql(Map<String, Object> config) {
+    String host = (String) config.get("database.hostname");
+    var inputs = List.of(
+        MysqlPath.builder()
+            .host(host)
+            .build()
+            .oddrn()
+    );
+    return new ConnectorInfo(inputs, List.of());
+  }
+
+  private static ConnectorInfo extractS3Sink(ConnectorTypeDTO type,
+                                             List<String> topics,
+                                             Map<String, Object> config,
+                                             Function<String, String> topicOrrdnBuilder) {
+    String bucketName = (String) config.get("s3.bucket.name");
+    String topicsDir = (String) config.getOrDefault("topics.dir", "topics");
+    String directoryDelim = (String) config.getOrDefault("directory.delim", "/");
+    List<String> outputs = extractTopicNamesBestEffort(topics, config)
+        .map(topic -> Oddrn.awsS3Oddrn(bucketName, topicsDir + directoryDelim + topic))
+        .toList();
+    return new ConnectorInfo(
+        extractInputs(type, topics, config, topicOrrdnBuilder),
+        outputs
+    );
+  }
+
+  private static List<String> extractInputs(ConnectorTypeDTO type,
+                                            List<String> topicsFromApi,
+                                            Map<String, Object> config,
+                                            Function<String, String> topicOrrdnBuilder) {
+    return type == ConnectorTypeDTO.SINK
+        ? extractTopicsOddrns(config, topicsFromApi, topicOrrdnBuilder)
+        : List.of();
+  }
+
+  private static List<String> extractOutputs(ConnectorTypeDTO type,
+                                             List<String> topicsFromApi,
+                                             Map<String, Object> config,
+                                             Function<String, String> topicOrrdnBuilder) {
+    return type == ConnectorTypeDTO.SOURCE
+        ? extractTopicsOddrns(config, topicsFromApi, topicOrrdnBuilder)
+        : List.of();
+  }
+
+  private static Stream<String> extractTopicNamesBestEffort(
+      // topic list can be empty for old Connect API versions
+      List<String> topicsFromApi,
+      Map<String, Object> config
+  ) {
+    if (CollectionUtils.isNotEmpty(topicsFromApi)) {
+      return topicsFromApi.stream();
+    }
+
+    // trying to extract topic names from config
+    String topicsString = (String) config.get("topics");
+    String topicString = (String) config.get("topic");
+    return Stream.of(topicsString, topicString)
+        .filter(Objects::nonNull)
+        .flatMap(str -> Stream.of(str.split(",")))
+        .map(String::trim)
+        .filter(s -> !s.isBlank());
+  }
+
+  private static List<String> extractTopicsOddrns(Map<String, Object> config,
+                                                  List<String> topicsFromApi,
+                                                  Function<String, String> topicOrrdnBuilder) {
+    return extractTopicNamesBestEffort(topicsFromApi, config)
+        .map(topicOrrdnBuilder)
+        .toList();
+  }
+
+}

+ 96 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java

@@ -0,0 +1,96 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import com.provectus.kafka.ui.connect.model.ConnectorTopics;
+import com.provectus.kafka.ui.model.ConnectDTO;
+import com.provectus.kafka.ui.model.ConnectorDTO;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.KafkaConnectService;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.opendatadiscovery.client.model.DataEntity;
+import org.opendatadiscovery.client.model.DataEntityList;
+import org.opendatadiscovery.client.model.DataEntityType;
+import org.opendatadiscovery.client.model.DataSource;
+import org.opendatadiscovery.client.model.DataTransformer;
+import org.opendatadiscovery.client.model.MetadataExtension;
+import reactor.core.publisher.Flux;
+
+@RequiredArgsConstructor
+class ConnectorsExporter {
+
+  private final KafkaConnectService kafkaConnectService;
+
+  Flux<DataEntityList> export(KafkaCluster cluster) {
+    return kafkaConnectService.getConnects(cluster)
+        .flatMap(connect -> kafkaConnectService.getConnectorNames(cluster, connect.getName())
+            .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
+            .flatMap(connectorDTO ->
+                kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName())
+                    .map(topics -> createConnectorDataEntity(cluster, connect, connectorDTO, topics)))
+            .buffer(100)
+            .map(connectDataEntities -> {
+              String dsOddrn = Oddrn.connectDataSourceOddrn(connect.getAddress());
+              return new DataEntityList()
+                  .dataSourceOddrn(dsOddrn)
+                  .items(connectDataEntities);
+            })
+        );
+  }
+
+  Flux<DataSource> getConnectDataSources(KafkaCluster cluster) {
+    return kafkaConnectService.getConnects(cluster)
+        .map(ConnectorsExporter::toDataSource);
+  }
+
+  private static DataSource toDataSource(ConnectDTO connect) {
+    return new DataSource()
+        .oddrn(Oddrn.connectDataSourceOddrn(connect.getAddress()))
+        .name(connect.getName())
+        .description("Kafka Connect");
+  }
+
+  private static DataEntity createConnectorDataEntity(KafkaCluster cluster,
+                                                      ConnectDTO connect,
+                                                      ConnectorDTO connector,
+                                                      ConnectorTopics connectorTopics) {
+    var metadata = new HashMap<>(extractMetadata(connector));
+    metadata.put("type", connector.getType().name());
+
+    var info = extractConnectorInfo(cluster, connector, connectorTopics);
+    DataTransformer transformer = new DataTransformer();
+    transformer.setInputs(info.inputs());
+    transformer.setOutputs(info.outputs());
+
+    return new DataEntity()
+        .oddrn(Oddrn.connectorOddrn(connect.getAddress(), connector.getName()))
+        .name(connector.getName())
+        .description("Kafka Connector \"%s\" (%s)".formatted(connector.getName(), connector.getType()))
+        .type(DataEntityType.JOB)
+        .dataTransformer(transformer)
+        .metadata(List.of(
+            new MetadataExtension()
+                .schemaUrl(URI.create("wontbeused.oops"))
+                .metadata(metadata)));
+  }
+
+  private static Map<String, Object> extractMetadata(ConnectorDTO connector) {
+    // will be sanitized by KafkaConfigSanitizer (if it's enabled)
+    return connector.getConfig();
+  }
+
+  private static ConnectorInfo extractConnectorInfo(KafkaCluster cluster,
+                                                    ConnectorDTO connector,
+                                                    ConnectorTopics topics) {
+    return ConnectorInfo.extract(
+        (String) connector.getConfig().get("connector.class"),
+        connector.getType(),
+        connector.getConfig(),
+        topics.getTopics(),
+        topic -> Oddrn.topicOddrn(cluster, topic)
+    );
+  }
+
+}

+ 106 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddExporter.java

@@ -0,0 +1,106 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.KafkaConnectService;
+import com.provectus.kafka.ui.service.StatisticsCache;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import lombok.SneakyThrows;
+import org.opendatadiscovery.client.ApiClient;
+import org.opendatadiscovery.client.api.OpenDataDiscoveryIngestionApi;
+import org.opendatadiscovery.client.model.DataEntity;
+import org.opendatadiscovery.client.model.DataEntityList;
+import org.opendatadiscovery.client.model.DataSource;
+import org.opendatadiscovery.client.model.DataSourceList;
+import org.springframework.http.HttpHeaders;
+import reactor.core.publisher.Mono;
+
+class OddExporter {
+
+  private final OpenDataDiscoveryIngestionApi oddApi;
+  private final TopicsExporter topicsExporter;
+  private final ConnectorsExporter connectorsExporter;
+
+  public OddExporter(StatisticsCache statisticsCache,
+                     KafkaConnectService connectService,
+                     OddIntegrationProperties oddIntegrationProperties) {
+    this(
+        createApiClient(oddIntegrationProperties),
+        new TopicsExporter(createTopicsFilter(oddIntegrationProperties), statisticsCache),
+        new ConnectorsExporter(connectService)
+    );
+  }
+
+  @VisibleForTesting
+  OddExporter(OpenDataDiscoveryIngestionApi oddApi,
+              TopicsExporter topicsExporter,
+              ConnectorsExporter connectorsExporter) {
+    this.oddApi = oddApi;
+    this.topicsExporter = topicsExporter;
+    this.connectorsExporter = connectorsExporter;
+  }
+
+  private static Predicate<String> createTopicsFilter(OddIntegrationProperties properties) {
+    if (properties.getTopicsRegex() == null) {
+      return topic -> !topic.startsWith("_");
+    }
+    Pattern pattern = Pattern.compile(properties.getTopicsRegex());
+    return topic -> pattern.matcher(topic).matches();
+  }
+
+  private static OpenDataDiscoveryIngestionApi createApiClient(OddIntegrationProperties properties) {
+    Preconditions.checkNotNull(properties.getUrl(), "ODD url not set");
+    Preconditions.checkNotNull(properties.getToken(), "ODD token not set");
+    var apiClient = new ApiClient()
+        .setBasePath(properties.getUrl())
+        .addDefaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + properties.getToken());
+    return new OpenDataDiscoveryIngestionApi(apiClient);
+  }
+
+  public Mono<Void> export(KafkaCluster cluster) {
+    return exportTopics(cluster)
+        .then(exportKafkaConnects(cluster));
+  }
+
+  private Mono<Void> exportTopics(KafkaCluster c) {
+    return createKafkaDataSource(c)
+        .thenMany(topicsExporter.export(c))
+        .concatMap(this::sentDataEntities)
+        .then();
+  }
+
+  private Mono<Void> exportKafkaConnects(KafkaCluster cluster) {
+    return createConnectDataSources(cluster)
+        .thenMany(connectorsExporter.export(cluster))
+        .concatMap(this::sentDataEntities)
+        .then();
+  }
+
+  private Mono<Void> createConnectDataSources(KafkaCluster cluster) {
+    return connectorsExporter.getConnectDataSources(cluster)
+        .buffer(100)
+        .concatMap(dataSources -> oddApi.createDataSource(new DataSourceList().items(dataSources)))
+        .then();
+  }
+
+  private Mono<Void> createKafkaDataSource(KafkaCluster cluster) {
+    String clusterOddrn = Oddrn.clusterOddrn(cluster);
+    return oddApi.createDataSource(
+        new DataSourceList()
+            .addItemsItem(
+                new DataSource()
+                    .oddrn(clusterOddrn)
+                    .name(cluster.getName())
+                    .description("Kafka cluster")
+            )
+    );
+  }
+
+  private Mono<Void> sentDataEntities(DataEntityList dataEntityList) {
+    return oddApi.postDataEntityList(dataEntityList);
+  }
+
+}

+ 27 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddExporterScheduler.java

@@ -0,0 +1,27 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import com.provectus.kafka.ui.service.ClustersStorage;
+import lombok.RequiredArgsConstructor;
+import org.springframework.scheduling.annotation.Scheduled;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+@RequiredArgsConstructor
+class OddExporterScheduler {
+
+  private final ClustersStorage clustersStorage;
+  private final OddExporter oddExporter;
+
+  @Scheduled(fixedRateString = "${kafka.send-stats-to-odd-millis:30000}")
+  public void sendMetricsToOdd() {
+    Flux.fromIterable(clustersStorage.getKafkaClusters())
+        .parallel()
+        .runOn(Schedulers.parallel())
+        .flatMap(oddExporter::export)
+        .then()
+        .block();
+  }
+
+
+}
+

+ 31 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddIntegrationConfig.java

@@ -0,0 +1,31 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import com.provectus.kafka.ui.service.ClustersStorage;
+import com.provectus.kafka.ui.service.KafkaConnectService;
+import com.provectus.kafka.ui.service.StatisticsCache;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnProperty(value = "integration.odd.url")
+class OddIntegrationConfig {
+
+  @Bean
+  OddIntegrationProperties oddIntegrationProperties() {
+    return new OddIntegrationProperties();
+  }
+
+  @Bean
+  OddExporter oddExporter(StatisticsCache statisticsCache,
+                          KafkaConnectService connectService,
+                          OddIntegrationProperties oddIntegrationProperties) {
+    return new OddExporter(statisticsCache, connectService, oddIntegrationProperties);
+  }
+
+  @Bean
+  OddExporterScheduler oddExporterScheduler(ClustersStorage storage, OddExporter exporter) {
+    return new OddExporterScheduler(storage, exporter);
+  }
+
+}

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddIntegrationProperties.java

@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@Data
+@ConfigurationProperties("integration.odd")
+public class OddIntegrationProperties {
+
+  String url;
+  String token;
+  String topicsRegex;
+
+}

+ 79 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/Oddrn.java

@@ -0,0 +1,79 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import java.net.URI;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.experimental.UtilityClass;
+import org.opendatadiscovery.oddrn.Generator;
+import org.opendatadiscovery.oddrn.model.AwsS3Path;
+import org.opendatadiscovery.oddrn.model.KafkaConnectorPath;
+import org.opendatadiscovery.oddrn.model.KafkaPath;
+
+@UtilityClass
+public class Oddrn {
+
+  private static final Generator GENERATOR = new Generator();
+
+  String clusterOddrn(KafkaCluster cluster) {
+    return KafkaPath.builder()
+        .cluster(bootstrapServersForOddrn(cluster.getBootstrapServers()))
+        .build()
+        .oddrn();
+  }
+
+  KafkaPath topicOddrnPath(KafkaCluster cluster, String topic) {
+    return KafkaPath.builder()
+        .cluster(bootstrapServersForOddrn(cluster.getBootstrapServers()))
+        .topic(topic)
+        .build();
+  }
+
+  String topicOddrn(KafkaCluster cluster, String topic) {
+    return topicOddrnPath(cluster, topic).oddrn();
+  }
+
+  String awsS3Oddrn(String bucket, String key) {
+    return AwsS3Path.builder()
+        .bucket(bucket)
+        .key(key)
+        .build()
+        .oddrn();
+  }
+
+  String connectDataSourceOddrn(String connectUrl) {
+    return KafkaConnectorPath.builder()
+        .host(normalizedConnectHosts(connectUrl))
+        .build()
+        .oddrn();
+  }
+
+  private String normalizedConnectHosts(String connectUrlStr) {
+    return Stream.of(connectUrlStr.split(","))
+        .map(String::trim)
+        .sorted()
+        .map(url -> {
+          var uri = URI.create(url);
+          String host = uri.getHost();
+          String portSuffix = (uri.getPort() > 0 ? (":" + uri.getPort()) : "");
+          return host + portSuffix;
+        })
+        .collect(Collectors.joining(","));
+  }
+
+  String connectorOddrn(String connectUrl, String connectorName) {
+    return KafkaConnectorPath.builder()
+        .host(normalizedConnectHosts(connectUrl))
+        .connector(connectorName)
+        .build()
+        .oddrn();
+  }
+
+  private String bootstrapServersForOddrn(String bootstrapServers) {
+    return Stream.of(bootstrapServers.split(","))
+        .map(String::trim)
+        .sorted()
+        .collect(Collectors.joining(","));
+  }
+
+}

+ 111 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java

@@ -0,0 +1,111 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import com.google.common.collect.ImmutableMap;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Statistics;
+import com.provectus.kafka.ui.service.StatisticsCache;
+import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.opendatadiscovery.client.model.DataEntity;
+import org.opendatadiscovery.client.model.DataEntityList;
+import org.opendatadiscovery.client.model.DataEntityType;
+import org.opendatadiscovery.client.model.DataSet;
+import org.opendatadiscovery.client.model.DataSetField;
+import org.opendatadiscovery.client.model.MetadataExtension;
+import org.opendatadiscovery.oddrn.model.KafkaPath;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Slf4j
+@RequiredArgsConstructor
+class TopicsExporter {
+
+  private final Predicate<String> topicFilter;
+  private final StatisticsCache statisticsCache;
+
+  Flux<DataEntityList> export(KafkaCluster cluster) {
+    String clusterOddrn = Oddrn.clusterOddrn(cluster);
+    Statistics stats = statisticsCache.get(cluster);
+    return Flux.fromIterable(stats.getTopicDescriptions().keySet())
+        .filter(topicFilter)
+        .flatMap(topic -> createTopicDataEntity(cluster, topic, stats))
+        .buffer(100)
+        .map(topicsEntities ->
+            new DataEntityList()
+                .dataSourceOddrn(clusterOddrn)
+                .items(topicsEntities));
+  }
+
+  private Mono<DataEntity> createTopicDataEntity(KafkaCluster cluster, String topic, Statistics stats) {
+    KafkaPath topicOddrnPath = Oddrn.topicOddrnPath(cluster, topic);
+    return
+        Mono.zip(
+                getTopicSchema(cluster, topic, topicOddrnPath, true),
+                getTopicSchema(cluster, topic, topicOddrnPath, false)
+            )
+            .map(keyValueFields -> {
+                  var dataset = new DataSet();
+                  keyValueFields.getT1().forEach(dataset::addFieldListItem);
+                  keyValueFields.getT2().forEach(dataset::addFieldListItem);
+                  return new DataEntity()
+                      .name(topic)
+                      .description("Kafka topic \"%s\"".formatted(topic))
+                      .oddrn(Oddrn.topicOddrn(cluster, topic))
+                      .type(DataEntityType.KAFKA_TOPIC)
+                      .dataset(dataset)
+                      .addMetadataItem(
+                          new MetadataExtension()
+                              .schemaUrl(URI.create("wontbeused.oops"))
+                              .metadata(getTopicMetadata(topic, stats)));
+                }
+            );
+  }
+
+  private Map<String, Object> getNonDefaultConfigs(String topic, Statistics stats) {
+    List<ConfigEntry> config = stats.getTopicConfigs().get(topic);
+    if (config == null) {
+      return Map.of();
+    }
+    return config.stream()
+        .filter(c -> c.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
+        .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
+  }
+
+  private Map<String, Object> getTopicMetadata(String topic, Statistics stats) {
+    TopicDescription topicDescription = stats.getTopicDescriptions().get(topic);
+    return ImmutableMap.<String, Object>builder()
+        .put("partitions", topicDescription.partitions().size())
+        .put("replication_factor", topicDescription.partitions().get(0).replicas().size())
+        .putAll(getNonDefaultConfigs(topic, stats))
+        .build();
+  }
+
+  private Mono<List<DataSetField>> getTopicSchema(KafkaCluster cluster,
+                                                  String topic,
+                                                  KafkaPath topicOddrn,
+                                                  //currently we only retrieve value schema
+                                                  boolean isKey) {
+    if (cluster.getSchemaRegistryClient() == null) {
+      return Mono.just(List.of());
+    }
+    String subject = topic + (isKey ? "-key" : "-value");
+    return cluster.getSchemaRegistryClient()
+        .mono(client -> client.getSubjectVersion(subject, "latest"))
+        .map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey))
+        .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
+        .onErrorResume(th -> true, th -> {
+          log.warn("Error retrieving subject {} for cluster {}", subject, cluster.getName(), th);
+          return Mono.just(List.of());
+        });
+  }
+
+}

+ 262 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractor.java

@@ -0,0 +1,262 @@
+package com.provectus.kafka.ui.service.integration.odd.schema;
+
+import com.google.common.collect.ImmutableSet;
+import com.provectus.kafka.ui.service.integration.odd.Oddrn;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.experimental.UtilityClass;
+import org.apache.avro.Schema;
+import org.opendatadiscovery.client.model.DataSetField;
+import org.opendatadiscovery.client.model.DataSetFieldType;
+import org.opendatadiscovery.oddrn.model.KafkaPath;
+
+@UtilityClass
+class AvroExtractor {
+
+  static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
+    var schema = new Schema.Parser().parse(subject.getSchema());
+    List<DataSetField> result = new ArrayList<>();
+    result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
+    extract(
+        schema,
+        topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value"),
+        null,
+        null,
+        null,
+        false,
+        ImmutableSet.of(),
+        result
+    );
+    return result;
+  }
+
+  private void extract(Schema schema,
+                       String parentOddr,
+                       String oddrn, //null for root
+                       String name,
+                       String doc,
+                       Boolean nullable,
+                       ImmutableSet<String> registeredRecords,
+                       List<DataSetField> sink
+  ) {
+    switch (schema.getType()) {
+      case RECORD -> extractRecord(schema, parentOddr, oddrn, name, doc, nullable, registeredRecords, sink);
+      case UNION -> extractUnion(schema, parentOddr, oddrn, name, doc, registeredRecords, sink);
+      case ARRAY -> extractArray(schema, parentOddr, oddrn, name, doc, nullable, registeredRecords, sink);
+      case MAP -> extractMap(schema, parentOddr, oddrn, name, doc, nullable, registeredRecords, sink);
+      default -> extractPrimitive(schema, parentOddr, oddrn, name, doc, nullable, sink);
+    }
+  }
+
+  private DataSetField createDataSetField(String name,
+                                          String doc,
+                                          String parentOddrn,
+                                          String oddrn,
+                                          Schema schema,
+                                          Boolean nullable) {
+    return new DataSetField()
+        .name(name)
+        .description(doc)
+        .parentFieldOddrn(parentOddrn)
+        .oddrn(oddrn)
+        .type(mapSchema(schema, nullable));
+  }
+
+  private void extractRecord(Schema schema,
+                             String parentOddr,
+                             String oddrn, //null for root
+                             String name,
+                             String doc,
+                             Boolean nullable,
+                             ImmutableSet<String> registeredRecords,
+                             List<DataSetField> sink) {
+    boolean isRoot = oddrn == null;
+    if (!isRoot) {
+      sink.add(createDataSetField(name, doc, parentOddr, oddrn, schema, nullable));
+      if (registeredRecords.contains(schema.getFullName())) {
+        // avoiding recursion by checking if record already registered in parsing chain
+        return;
+      }
+    }
+    var newRegisteredRecords = ImmutableSet.<String>builder()
+        .addAll(registeredRecords)
+        .add(schema.getFullName())
+        .build();
+
+    schema.getFields().forEach(f ->
+        extract(
+            f.schema(),
+            isRoot ? parentOddr : oddrn,
+            isRoot
+                ? parentOddr + "/" + f.name()
+                : oddrn + "/fields/" + f.name(),
+            f.name(),
+            f.doc(),
+            false,
+            newRegisteredRecords,
+            sink
+        ));
+  }
+
+  private void extractUnion(Schema schema,
+                            String parentOddr,
+                            String oddrn, //null for root
+                            String name,
+                            String doc,
+                            ImmutableSet<String> registeredRecords,
+                            List<DataSetField> sink) {
+    boolean isRoot = oddrn == null;
+    boolean containsNull = schema.getTypes().stream().map(Schema::getType).anyMatch(t -> t == Schema.Type.NULL);
+    // if it is not root and there is only 2 values for union (null and smth else)
+    // we registering this field as optional without mentioning union
+    if (!isRoot && containsNull && schema.getTypes().size() == 2) {
+      var nonNullSchema = schema.getTypes().stream()
+          .filter(s -> s.getType() != Schema.Type.NULL)
+          .findFirst()
+          .orElseThrow(IllegalStateException::new);
+      extract(
+          nonNullSchema,
+          parentOddr,
+          oddrn,
+          name,
+          doc,
+          true,
+          registeredRecords,
+          sink
+      );
+      return;
+    }
+    oddrn = isRoot ? parentOddr + "/union" : oddrn;
+    if (isRoot) {
+      sink.add(createDataSetField("Avro root union", doc, parentOddr, oddrn, schema, containsNull));
+    } else {
+      sink.add(createDataSetField(name, doc, parentOddr, oddrn, schema, containsNull));
+    }
+    for (Schema t : schema.getTypes()) {
+      if (t.getType() != Schema.Type.NULL) {
+        extract(
+            t,
+            oddrn,
+            oddrn + "/values/" + t.getName(),
+            t.getName(),
+            t.getDoc(),
+            containsNull,
+            registeredRecords,
+            sink
+        );
+      }
+    }
+  }
+
+  private void extractArray(Schema schema,
+                            String parentOddr,
+                            String oddrn, //null for root
+                            String name,
+                            String doc,
+                            Boolean nullable,
+                            ImmutableSet<String> registeredRecords,
+                            List<DataSetField> sink) {
+    boolean isRoot = oddrn == null;
+    oddrn = isRoot ? parentOddr + "/array" : oddrn;
+    if (isRoot) {
+      sink.add(createDataSetField("Avro root Array", doc, parentOddr, oddrn, schema, nullable));
+    } else {
+      sink.add(createDataSetField(name, doc, parentOddr, oddrn, schema, nullable));
+    }
+    extract(
+        schema.getElementType(),
+        oddrn,
+        oddrn + "/items/" + schema.getElementType().getName(),
+        schema.getElementType().getName(),
+        schema.getElementType().getDoc(),
+        false,
+        registeredRecords,
+        sink
+    );
+  }
+
+  private void extractMap(Schema schema,
+                          String parentOddr,
+                          String oddrn, //null for root
+                          String name,
+                          String doc,
+                          Boolean nullable,
+                          ImmutableSet<String> registeredRecords,
+                          List<DataSetField> sink) {
+    boolean isRoot = oddrn == null;
+    oddrn = isRoot ? parentOddr + "/map" : oddrn;
+    if (isRoot) {
+      sink.add(createDataSetField("Avro root map", doc, parentOddr, oddrn, schema, nullable));
+    } else {
+      sink.add(createDataSetField(name, doc, parentOddr, oddrn, schema, nullable));
+    }
+    extract(
+        new Schema.Parser().parse("\"string\""),
+        oddrn,
+        oddrn + "/key",
+        "key",
+        null,
+        nullable,
+        registeredRecords,
+        sink
+    );
+    extract(
+        schema.getValueType(),
+        oddrn,
+        oddrn + "/value",
+        "value",
+        null,
+        nullable,
+        registeredRecords,
+        sink
+    );
+  }
+
+
+  private void extractPrimitive(Schema schema,
+                                String parentOddr,
+                                String oddrn, //null for root
+                                String name,
+                                String doc,
+                                Boolean nullable,
+                                List<DataSetField> sink) {
+    boolean isRoot = oddrn == null;
+    String primOddrn = isRoot ? (parentOddr + "/" + schema.getType()) : oddrn;
+    if (isRoot) {
+      sink.add(createDataSetField("Root avro " + schema.getType(),
+          doc, parentOddr, primOddrn, schema, nullable));
+    } else {
+      sink.add(createDataSetField(name, doc, parentOddr, primOddrn, schema, nullable));
+    }
+  }
+
+  private DataSetFieldType.TypeEnum mapType(Schema.Type type) {
+    return switch (type) {
+      case INT, LONG -> DataSetFieldType.TypeEnum.INTEGER;
+      case FLOAT, DOUBLE, FIXED -> DataSetFieldType.TypeEnum.NUMBER;
+      case STRING, ENUM -> DataSetFieldType.TypeEnum.STRING;
+      case BOOLEAN -> DataSetFieldType.TypeEnum.BOOLEAN;
+      case BYTES -> DataSetFieldType.TypeEnum.BINARY;
+      case ARRAY -> DataSetFieldType.TypeEnum.LIST;
+      case RECORD -> DataSetFieldType.TypeEnum.STRUCT;
+      case MAP -> DataSetFieldType.TypeEnum.MAP;
+      case UNION -> DataSetFieldType.TypeEnum.UNION;
+      case NULL -> DataSetFieldType.TypeEnum.UNKNOWN;
+    };
+  }
+
+  private DataSetFieldType mapSchema(Schema schema, Boolean nullable) {
+    return new DataSetFieldType()
+        .logicalType(logicalType(schema))
+        .isNullable(nullable)
+        .type(mapType(schema.getType()));
+  }
+
+  private String logicalType(Schema schema) {
+    return schema.getType() == Schema.Type.RECORD
+        ? schema.getFullName()
+        : schema.getType().toString().toLowerCase();
+  }
+
+}

+ 38 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/DataSetFieldsExtractors.java

@@ -0,0 +1,38 @@
+package com.provectus.kafka.ui.service.integration.odd.schema;
+
+import com.provectus.kafka.ui.service.integration.odd.Oddrn;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import com.provectus.kafka.ui.sr.model.SchemaType;
+import java.util.List;
+import java.util.Optional;
+import lombok.experimental.UtilityClass;
+import org.opendatadiscovery.client.model.DataSetField;
+import org.opendatadiscovery.client.model.DataSetFieldType;
+import org.opendatadiscovery.oddrn.model.KafkaPath;
+
+@UtilityClass
+public class DataSetFieldsExtractors {
+
+  public List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
+    SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO);
+    return switch (schemaType) {
+      case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey);
+      case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey);
+      case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey);
+    };
+  }
+
+
+  DataSetField rootField(KafkaPath topicOddrn, boolean isKey) {
+    var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");
+    return new DataSetField()
+        .name(isKey ? "key" : "value")
+        .description("Topic's " + (isKey ? "key" : "value") + " schema")
+        .parentFieldOddrn(topicOddrn.oddrn())
+        .oddrn(rootOddrn)
+        .type(new DataSetFieldType()
+            .type(DataSetFieldType.TypeEnum.STRUCT)
+            .isNullable(true));
+  }
+
+}

+ 311 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractor.java

@@ -0,0 +1,311 @@
+package com.provectus.kafka.ui.service.integration.odd.schema;
+
+import com.google.common.collect.ImmutableSet;
+import com.provectus.kafka.ui.service.integration.odd.Oddrn;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.json.JsonSchema;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import lombok.experimental.UtilityClass;
+import org.everit.json.schema.ArraySchema;
+import org.everit.json.schema.BooleanSchema;
+import org.everit.json.schema.CombinedSchema;
+import org.everit.json.schema.FalseSchema;
+import org.everit.json.schema.NullSchema;
+import org.everit.json.schema.NumberSchema;
+import org.everit.json.schema.ObjectSchema;
+import org.everit.json.schema.ReferenceSchema;
+import org.everit.json.schema.Schema;
+import org.everit.json.schema.StringSchema;
+import org.everit.json.schema.TrueSchema;
+import org.opendatadiscovery.client.model.DataSetField;
+import org.opendatadiscovery.client.model.DataSetFieldType;
+import org.opendatadiscovery.client.model.MetadataExtension;
+import org.opendatadiscovery.oddrn.model.KafkaPath;
+
+@UtilityClass
+class JsonSchemaExtractor {
+
+  static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
+    Schema schema = new JsonSchema(subject.getSchema()).rawSchema();
+    List<DataSetField> result = new ArrayList<>();
+    result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
+    extract(
+        schema,
+        topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value"),
+        null,
+        null,
+        null,
+        ImmutableSet.of(),
+        result
+    );
+    return result;
+  }
+
+  private void extract(Schema schema,
+                       String parentOddr,
+                       String oddrn, //null for root
+                       String name,
+                       Boolean nullable,
+                       ImmutableSet<String> registeredRecords,
+                       List<DataSetField> sink) {
+    if (schema instanceof ReferenceSchema s) {
+      Optional.ofNullable(s.getReferredSchema())
+          .ifPresent(refSchema -> extract(refSchema, parentOddr, oddrn, name, nullable, registeredRecords, sink));
+    } else if (schema instanceof ObjectSchema s) {
+      extractObject(s, parentOddr, oddrn, name, nullable, registeredRecords, sink);
+    } else if (schema instanceof ArraySchema s) {
+      extractArray(s, parentOddr, oddrn, name, nullable, registeredRecords, sink);
+    } else if (schema instanceof CombinedSchema cs) {
+      extractCombined(cs, parentOddr, oddrn, name, nullable, registeredRecords, sink);
+    } else if (schema instanceof BooleanSchema
+        || schema instanceof NumberSchema
+        || schema instanceof StringSchema
+        || schema instanceof NullSchema
+    ) {
+      extractPrimitive(schema, parentOddr, oddrn, name, nullable, sink);
+    } else {
+      extractUnknown(schema, parentOddr, oddrn, name, nullable, sink);
+    }
+  }
+
+  private void extractPrimitive(Schema schema,
+                                String parentOddr,
+                                String oddrn, //null for root
+                                String name,
+                                Boolean nullable,
+                                List<DataSetField> sink) {
+    boolean isRoot = oddrn == null;
+    sink.add(
+        createDataSetField(
+            schema,
+            isRoot ? "Root JSON primitive" : name,
+            parentOddr,
+            isRoot ? (parentOddr + "/" + logicalTypeName(schema)) : oddrn,
+            mapType(schema),
+            logicalTypeName(schema),
+            nullable
+        )
+    );
+  }
+
+  private void extractUnknown(Schema schema,
+                              String parentOddr,
+                              String oddrn, //null for root
+                              String name,
+                              Boolean nullable,
+                              List<DataSetField> sink) {
+    boolean isRoot = oddrn == null;
+    sink.add(
+        createDataSetField(
+            schema,
+            isRoot ? "Root type " + logicalTypeName(schema) : name,
+            parentOddr,
+            isRoot ? (parentOddr + "/" + logicalTypeName(schema)) : oddrn,
+            DataSetFieldType.TypeEnum.UNKNOWN,
+            logicalTypeName(schema),
+            nullable
+        )
+    );
+  }
+
+  private void extractObject(ObjectSchema schema,
+                             String parentOddr,
+                             String oddrn, //null for root
+                             String name,
+                             Boolean nullable,
+                             ImmutableSet<String> registeredRecords,
+                             List<DataSetField> sink) {
+    boolean isRoot = oddrn == null;
+    // schemaLocation can be null for empty object schemas (like if it used in anyOf)
+    @Nullable var schemaLocation = schema.getSchemaLocation();
+    if (!isRoot) {
+      sink.add(createDataSetField(
+          schema,
+          name,
+          parentOddr,
+          oddrn,
+          DataSetFieldType.TypeEnum.STRUCT,
+          logicalTypeName(schema),
+          nullable
+      ));
+      if (schemaLocation != null && registeredRecords.contains(schemaLocation)) {
+        // avoiding recursion by checking if record already registered in parsing chain
+        return;
+      }
+    }
+
+    var newRegisteredRecords = schemaLocation == null
+        ? registeredRecords
+        : ImmutableSet.<String>builder()
+        .addAll(registeredRecords)
+        .add(schemaLocation)
+        .build();
+
+    schema.getPropertySchemas().forEach((propertyName, propertySchema) -> {
+      boolean required = schema.getRequiredProperties().contains(propertyName);
+      extract(
+          propertySchema,
+          isRoot ? parentOddr : oddrn,
+          isRoot
+              ? parentOddr + "/" + propertyName
+              : oddrn + "/fields/" + propertyName,
+          propertyName,
+          !required,
+          newRegisteredRecords,
+          sink
+      );
+    });
+  }
+
+  private void extractArray(ArraySchema schema,
+                            String parentOddr,
+                            String oddrn, //null for root
+                            String name,
+                            Boolean nullable,
+                            ImmutableSet<String> registeredRecords,
+                            List<DataSetField> sink) {
+    boolean isRoot = oddrn == null;
+    oddrn = isRoot ? parentOddr + "/array" : oddrn;
+    if (isRoot) {
+      sink.add(
+          createDataSetField(
+              schema,
+              "Json array root",
+              parentOddr,
+              oddrn,
+              DataSetFieldType.TypeEnum.LIST,
+              "array",
+              nullable
+          ));
+    } else {
+      sink.add(
+          createDataSetField(
+              schema,
+              name,
+              parentOddr,
+              oddrn,
+              DataSetFieldType.TypeEnum.LIST,
+              "array",
+              nullable
+          ));
+    }
+    @Nullable var itemsSchema = schema.getAllItemSchema();
+    if (itemsSchema != null) {
+      extract(
+          itemsSchema,
+          oddrn,
+          oddrn + "/items/" + logicalTypeName(itemsSchema),
+          logicalTypeName(itemsSchema),
+          false,
+          registeredRecords,
+          sink
+      );
+    }
+  }
+
+  private void extractCombined(CombinedSchema schema,
+                               String parentOddr,
+                               String oddrn, //null for root
+                               String name,
+                               Boolean nullable,
+                               ImmutableSet<String> registeredRecords,
+                               List<DataSetField> sink) {
+    String combineType = "unknown";
+    if (schema.getCriterion() == CombinedSchema.ALL_CRITERION) {
+      combineType = "allOf";
+    }
+    if (schema.getCriterion() == CombinedSchema.ANY_CRITERION) {
+      combineType = "anyOf";
+    }
+    if (schema.getCriterion() == CombinedSchema.ONE_CRITERION) {
+      combineType = "oneOf";
+    }
+
+    boolean isRoot = oddrn == null;
+    oddrn = isRoot ? (parentOddr + "/" + combineType) : (oddrn + "/" + combineType);
+    sink.add(
+        createDataSetField(
+            schema,
+            isRoot ? "Root %s".formatted(combineType) : name,
+            parentOddr,
+            oddrn,
+            DataSetFieldType.TypeEnum.UNION,
+            combineType,
+            nullable
+        ).addMetadataItem(new MetadataExtension()
+            .schemaUrl(URI.create("wontbeused.oops"))
+            .metadata(Map.of("criterion", combineType)))
+    );
+
+    for (Schema subschema : schema.getSubschemas()) {
+      extract(
+          subschema,
+          oddrn,
+          oddrn + "/values/" + logicalTypeName(subschema),
+          logicalTypeName(subschema),
+          nullable,
+          registeredRecords,
+          sink
+      );
+    }
+  }
+
+  private String getDescription(Schema schema) {
+    return Optional.ofNullable(schema.getTitle())
+        .orElse(schema.getDescription());
+  }
+
+  private String logicalTypeName(Schema schema) {
+    return schema.getClass()
+        .getSimpleName()
+        .replace("Schema", "");
+  }
+
+  private DataSetField createDataSetField(Schema schema,
+                                          String name,
+                                          String parentOddrn,
+                                          String oddrn,
+                                          DataSetFieldType.TypeEnum type,
+                                          String logicalType,
+                                          Boolean nullable) {
+    return new DataSetField()
+        .name(name)
+        .parentFieldOddrn(parentOddrn)
+        .oddrn(oddrn)
+        .description(getDescription(schema))
+        .type(
+            new DataSetFieldType()
+                .isNullable(nullable)
+                .logicalType(logicalType)
+                .type(type)
+        );
+  }
+
+  private DataSetFieldType.TypeEnum mapType(Schema type) {
+    if (type instanceof NumberSchema) {
+      return DataSetFieldType.TypeEnum.NUMBER;
+    }
+    if (type instanceof StringSchema) {
+      return DataSetFieldType.TypeEnum.STRING;
+    }
+    if (type instanceof BooleanSchema || type instanceof TrueSchema || type instanceof FalseSchema) {
+      return DataSetFieldType.TypeEnum.BOOLEAN;
+    }
+    if (type instanceof ObjectSchema) {
+      return DataSetFieldType.TypeEnum.STRUCT;
+    }
+    if (type instanceof ReferenceSchema s) {
+      return mapType(s.getReferredSchema());
+    }
+    if (type instanceof CombinedSchema) {
+      return DataSetFieldType.TypeEnum.UNION;
+    }
+    return DataSetFieldType.TypeEnum.UNKNOWN;
+  }
+
+}

+ 230 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractor.java

@@ -0,0 +1,230 @@
+package com.provectus.kafka.ui.service.integration.odd.schema;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.Duration;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.UInt32Value;
+import com.google.protobuf.UInt64Value;
+import com.google.protobuf.Value;
+import com.provectus.kafka.ui.service.integration.odd.Oddrn;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import lombok.experimental.UtilityClass;
+import org.opendatadiscovery.client.model.DataSetField;
+import org.opendatadiscovery.client.model.DataSetFieldType;
+import org.opendatadiscovery.client.model.DataSetFieldType.TypeEnum;
+import org.opendatadiscovery.oddrn.model.KafkaPath;
+
+@UtilityClass
+class ProtoExtractor {
+
+  private static final Set<String> PRIMITIVES_WRAPPER_TYPE_NAMES = Set.of(
+      BoolValue.getDescriptor().getFullName(),
+      Int32Value.getDescriptor().getFullName(),
+      UInt32Value.getDescriptor().getFullName(),
+      Int64Value.getDescriptor().getFullName(),
+      UInt64Value.getDescriptor().getFullName(),
+      StringValue.getDescriptor().getFullName(),
+      BytesValue.getDescriptor().getFullName(),
+      FloatValue.getDescriptor().getFullName(),
+      DoubleValue.getDescriptor().getFullName()
+  );
+
+  List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
+    Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor();
+    List<DataSetField> result = new ArrayList<>();
+    result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
+    var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");
+    schema.getFields().forEach(f ->
+        extract(f,
+            rootOddrn,
+            rootOddrn + "/" + f.getName(),
+            f.getName(),
+            !f.isRequired(),
+            f.isRepeated(),
+            ImmutableSet.of(schema.getFullName()),
+            result
+        ));
+    return result;
+  }
+
+  private void extract(Descriptors.FieldDescriptor field,
+                       String parentOddr,
+                       String oddrn, //null for root
+                       String name,
+                       boolean nullable,
+                       boolean repeated,
+                       ImmutableSet<String> registeredRecords,
+                       List<DataSetField> sink) {
+    if (repeated) {
+      extractRepeated(field, parentOddr, oddrn, name, nullable, registeredRecords, sink);
+    } else if (field.getType() == Descriptors.FieldDescriptor.Type.MESSAGE) {
+      extractMessage(field, parentOddr, oddrn, name, nullable, registeredRecords, sink);
+    } else {
+      extractPrimitive(field, parentOddr, oddrn, name, nullable, sink);
+    }
+  }
+
+  // converts some(!) Protobuf Well-known type (from google.protobuf.* packages)
+  // see JsonFormat::buildWellKnownTypePrinters for impl details
+  private boolean extractProtoWellKnownType(Descriptors.FieldDescriptor field,
+                                            String parentOddr,
+                                            String oddrn, //null for root
+                                            String name,
+                                            boolean nullable,
+                                            List<DataSetField> sink) {
+    // all well-known types are messages
+    if (field.getType() != Descriptors.FieldDescriptor.Type.MESSAGE) {
+      return false;
+    }
+    String typeName = field.getMessageType().getFullName();
+    if (typeName.equals(Timestamp.getDescriptor().getFullName())) {
+      sink.add(createDataSetField(name, parentOddr, oddrn, TypeEnum.DATETIME, typeName, nullable));
+      return true;
+    }
+    if (typeName.equals(Duration.getDescriptor().getFullName())) {
+      sink.add(createDataSetField(name, parentOddr, oddrn, TypeEnum.DURATION, typeName, nullable));
+      return true;
+    }
+    if (typeName.equals(Value.getDescriptor().getFullName())) {
+      //TODO: use ANY type when it will appear in ODD
+      sink.add(createDataSetField(name, parentOddr, oddrn, TypeEnum.UNKNOWN, typeName, nullable));
+      return true;
+    }
+    if (PRIMITIVES_WRAPPER_TYPE_NAMES.contains(typeName)) {
+      var wrapped = field.getMessageType().findFieldByName("value");
+      sink.add(createDataSetField(name, parentOddr, oddrn, mapType(wrapped.getType()), typeName, true));
+      return true;
+    }
+    return false;
+  }
+
+  private void extractRepeated(Descriptors.FieldDescriptor field,
+                               String parentOddr,
+                               String oddrn, //null for root
+                               String name,
+                               boolean nullable,
+                               ImmutableSet<String> registeredRecords,
+                               List<DataSetField> sink) {
+    sink.add(createDataSetField(name, parentOddr, oddrn, TypeEnum.LIST, "repeated", nullable));
+
+    String itemName = field.getType() == Descriptors.FieldDescriptor.Type.MESSAGE
+        ? field.getMessageType().getName()
+        : field.getType().name().toLowerCase();
+
+    extract(
+        field,
+        oddrn,
+        oddrn + "/items/" + itemName,
+        itemName,
+        nullable,
+        false,
+        registeredRecords,
+        sink
+    );
+  }
+
+  private void extractMessage(Descriptors.FieldDescriptor field,
+                              String parentOddr,
+                              String oddrn, //null for root
+                              String name,
+                              boolean nullable,
+                              ImmutableSet<String> registeredRecords,
+                              List<DataSetField> sink) {
+    if (extractProtoWellKnownType(field, parentOddr, oddrn, name, nullable, sink)) {
+      return;
+    }
+    sink.add(createDataSetField(name, parentOddr, oddrn, TypeEnum.STRUCT, getLogicalTypeName(field), nullable));
+
+    String msgTypeName = field.getMessageType().getFullName();
+    if (registeredRecords.contains(msgTypeName)) {
+      // avoiding recursion by checking if record already registered in parsing chain
+      return;
+    }
+    var newRegisteredRecords = ImmutableSet.<String>builder()
+        .addAll(registeredRecords)
+        .add(msgTypeName)
+        .build();
+
+    field.getMessageType()
+        .getFields()
+        .forEach(f -> {
+          extract(f,
+              oddrn,
+              oddrn + "/fields/" + f.getName(),
+              f.getName(),
+              !f.isRequired(),
+              f.isRepeated(),
+              newRegisteredRecords,
+              sink
+          );
+        });
+  }
+
+  private void extractPrimitive(Descriptors.FieldDescriptor field,
+                                String parentOddr,
+                                String oddrn,
+                                String name,
+                                boolean nullable,
+                                List<DataSetField> sink) {
+    sink.add(
+        createDataSetField(
+            name,
+            parentOddr,
+            oddrn,
+            mapType(field.getType()),
+            getLogicalTypeName(field),
+            nullable
+        )
+    );
+  }
+
+  private String getLogicalTypeName(Descriptors.FieldDescriptor f) {
+    return f.getType() == Descriptors.FieldDescriptor.Type.MESSAGE
+        ? f.getMessageType().getFullName()
+        : f.getType().name().toLowerCase();
+  }
+
+  private DataSetField createDataSetField(String name,
+                                          String parentOddrn,
+                                          String oddrn,
+                                          TypeEnum type,
+                                          String logicalType,
+                                          Boolean nullable) {
+    return new DataSetField()
+        .name(name)
+        .parentFieldOddrn(parentOddrn)
+        .oddrn(oddrn)
+        .type(
+            new DataSetFieldType()
+                .isNullable(nullable)
+                .logicalType(logicalType)
+                .type(type)
+        );
+  }
+
+
+  private TypeEnum mapType(Descriptors.FieldDescriptor.Type type) {
+    return switch (type) {
+      case INT32, INT64, SINT32, SFIXED32, SINT64, UINT32, UINT64, FIXED32, FIXED64, SFIXED64 -> TypeEnum.INTEGER;
+      case FLOAT, DOUBLE -> TypeEnum.NUMBER;
+      case STRING, ENUM -> TypeEnum.STRING;
+      case BOOL -> TypeEnum.BOOLEAN;
+      case BYTES -> TypeEnum.BINARY;
+      case MESSAGE, GROUP -> TypeEnum.STRUCT;
+    };
+  }
+
+}

+ 10 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ReactiveFailover.java

@@ -9,7 +9,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
-import org.springframework.web.reactive.function.client.WebClientRequestException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -25,6 +24,16 @@ public class ReactiveFailover<T> {
   private final Predicate<Throwable> failoverExceptionsPredicate;
   private final String noAvailablePublishersMsg;
 
+  // creates single-publisher failover (basically for tests usage)
+  public static <T> ReactiveFailover<T> createNoop(T publisher) {
+    return create(
+        List.of(publisher),
+        th -> true,
+        "publisher is not available",
+        DEFAULT_RETRY_GRACE_PERIOD_MS
+    );
+  }
+
   public static <T> ReactiveFailover<T> create(List<T> publishers,
                                                Predicate<Throwable> failoverExeptionsPredicate,
                                                String noAvailablePublishersMsg,

+ 2 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java

@@ -142,9 +142,8 @@ public class KafkaConnectServiceTests extends AbstractIntegrationTest {
         .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
         .exchange()
         .expectStatus().isOk()
-        .expectBody()
-        .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
-        .exists();
+        .expectBodyList(String.class)
+        .contains(connectorName);
   }
 
   @Test

+ 7 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KafkaConfigSanitizerTest.java

@@ -26,6 +26,13 @@ class KafkaConfigSanitizerTest {
     assertThat(sanitizer.sanitize("main.consumer.sasl.jaas.config", "secret")).isEqualTo("******");
     assertThat(sanitizer.sanitize("database.password", "secret")).isEqualTo("******");
     assertThat(sanitizer.sanitize("basic.auth.user.info", "secret")).isEqualTo("******");
+
+    //AWS var sanitizing
+    assertThat(sanitizer.sanitize("aws.access.key.id", "secret")).isEqualTo("******");
+    assertThat(sanitizer.sanitize("aws.accessKeyId", "secret")).isEqualTo("******");
+    assertThat(sanitizer.sanitize("aws.secret.access.key", "secret")).isEqualTo("******");
+    assertThat(sanitizer.sanitize("aws.secretAccessKey", "secret")).isEqualTo("******");
+    assertThat(sanitizer.sanitize("aws.sessionToken", "secret")).isEqualTo("******");
   }
 
   @Test

+ 111 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java

@@ -0,0 +1,111 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.provectus.kafka.ui.connect.model.ConnectorTopics;
+import com.provectus.kafka.ui.model.ConnectDTO;
+import com.provectus.kafka.ui.model.ConnectorDTO;
+import com.provectus.kafka.ui.model.ConnectorTypeDTO;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.KafkaConnectService;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+import org.opendatadiscovery.client.model.DataEntity;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class ConnectorsExporterTest {
+
+  private static final KafkaCluster CLUSTER = KafkaCluster.builder()
+      .name("test cluster")
+      .bootstrapServers("localhost:9092")
+      .build();
+
+  private final KafkaConnectService kafkaConnectService = mock(KafkaConnectService.class);
+  private final ConnectorsExporter exporter = new ConnectorsExporter(kafkaConnectService);
+
+  @Test
+  void exportsConnectorsAsDataTransformers() {
+    ConnectDTO connect = new ConnectDTO();
+    connect.setName("testConnect");
+    connect.setAddress("http://kconnect:8083");
+
+    ConnectorDTO sinkConnector = new ConnectorDTO();
+    sinkConnector.setName("testSink");
+    sinkConnector.setType(ConnectorTypeDTO.SINK);
+    sinkConnector.setConnect(connect.getName());
+    sinkConnector.setConfig(
+        Map.of(
+            "connector.class", "FileStreamSink",
+            "file", "filePathHere",
+            "topic", "inputTopic"
+        )
+    );
+
+    ConnectorDTO sourceConnector = new ConnectorDTO();
+    sourceConnector.setName("testSource");
+    sourceConnector.setConnect(connect.getName());
+    sourceConnector.setType(ConnectorTypeDTO.SOURCE);
+    sourceConnector.setConfig(
+        Map.of(
+            "connector.class", "FileStreamSource",
+            "file", "filePathHere",
+            "topic", "outputTopic"
+        )
+    );
+
+    when(kafkaConnectService.getConnects(CLUSTER))
+        .thenReturn(Flux.just(connect));
+
+    when(kafkaConnectService.getConnectorNames(CLUSTER, connect.getName()))
+        .thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName()));
+
+    when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName()))
+        .thenReturn(Mono.just(sinkConnector));
+
+    when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sourceConnector.getName()))
+        .thenReturn(Mono.just(sourceConnector));
+
+    when(kafkaConnectService.getConnectorTopics(CLUSTER, connect.getName(), sourceConnector.getName()))
+        .thenReturn(Mono.just(new ConnectorTopics().topics(List.of("outputTopic"))));
+
+    when(kafkaConnectService.getConnectorTopics(CLUSTER, connect.getName(), sinkConnector.getName()))
+        .thenReturn(Mono.just(new ConnectorTopics().topics(List.of("inputTopic"))));
+
+    StepVerifier.create(exporter.export(CLUSTER))
+        .assertNext(dataEntityList -> {
+          assertThat(dataEntityList.getDataSourceOddrn())
+              .isEqualTo("//kafkaconnect/host/kconnect:8083");
+
+          assertThat(dataEntityList.getItems())
+              .hasSize(2);
+
+          assertThat(dataEntityList.getItems())
+              .filteredOn(DataEntity::getOddrn, "//kafkaconnect/host/kconnect:8083/connectors/testSink")
+              .singleElement()
+              .satisfies(sink -> {
+                assertThat(sink.getMetadata().get(0).getMetadata())
+                    .containsOnlyKeys("type", "connector.class", "file", "topic");
+                assertThat(sink.getDataTransformer().getInputs()).contains(
+                    "//kafka/cluster/localhost:9092/topics/inputTopic");
+              });
+
+          assertThat(dataEntityList.getItems())
+              .filteredOn(DataEntity::getOddrn, "//kafkaconnect/host/kconnect:8083/connectors/testSource")
+              .singleElement()
+              .satisfies(source -> {
+                assertThat(source.getMetadata().get(0).getMetadata())
+                    .containsOnlyKeys("type", "connector.class", "file", "topic");
+                assertThat(source.getDataTransformer().getOutputs()).contains(
+                    "//kafka/cluster/localhost:9092/topics/outputTopic");
+              });
+
+        })
+        .verifyComplete();
+  }
+
+}

+ 167 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java

@@ -0,0 +1,167 @@
+package com.provectus.kafka.ui.service.integration.odd;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Statistics;
+import com.provectus.kafka.ui.service.StatisticsCache;
+import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import com.provectus.kafka.ui.sr.model.SchemaType;
+import com.provectus.kafka.ui.util.ReactiveFailover;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opendatadiscovery.client.model.DataEntity;
+import org.opendatadiscovery.client.model.DataEntityType;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+class TopicsExporterTest {
+
+  private final KafkaSrClientApi schemaRegistryClientMock = mock(KafkaSrClientApi.class);
+
+  private final KafkaCluster cluster = KafkaCluster.builder()
+      .name("testCluster")
+      .bootstrapServers("localhost:9092,localhost:19092")
+      .schemaRegistryClient(ReactiveFailover.createNoop(schemaRegistryClientMock))
+      .build();
+
+  private Statistics stats;
+
+  private TopicsExporter topicsExporter;
+
+  @BeforeEach
+  void init() {
+    var statisticsCacheMock = mock(StatisticsCache.class);
+    when(statisticsCacheMock.get(cluster)).thenAnswer(invocationOnMock -> stats);
+
+    topicsExporter = new TopicsExporter(
+        topic -> !topic.startsWith("_"),
+        statisticsCacheMock
+    );
+  }
+
+  @Test
+  void doesNotExportTopicsWhichDontFitFiltrationRule() {
+    when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString()))
+        .thenReturn(Mono.error(new RuntimeException("Not found")));
+
+    stats = Statistics.empty()
+        .toBuilder()
+        .topicDescriptions(
+            Map.of(
+                "_hidden", new TopicDescription("_hidden", false, List.of(
+                    new TopicPartitionInfo(0, null, List.of(), List.of())
+                )),
+                "visible", new TopicDescription("visible", false, List.of(
+                    new TopicPartitionInfo(0, null, List.of(), List.of())
+                ))
+            )
+        )
+        .build();
+
+    StepVerifier.create(topicsExporter.export(cluster))
+        .assertNext(entityList -> {
+          assertThat(entityList.getDataSourceOddrn())
+              .isNotEmpty();
+
+          assertThat(entityList.getItems())
+              .hasSize(1)
+              .allSatisfy(e -> e.getOddrn().contains("visible"));
+        })
+        .verifyComplete();
+  }
+
+  @Test
+  void doesExportTopicData() {
+    when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest"))
+        .thenReturn(Mono.just(
+            new SchemaSubject()
+                .schema("\"string\"")
+                .schemaType(SchemaType.AVRO)
+        ));
+
+    when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest"))
+        .thenReturn(Mono.just(
+            new SchemaSubject()
+                .schema("\"int\"")
+                .schemaType(SchemaType.AVRO)
+        ));
+
+    stats = Statistics.empty()
+        .toBuilder()
+        .topicDescriptions(
+            Map.of(
+                "testTopic",
+                new TopicDescription(
+                    "testTopic",
+                    false,
+                    List.of(
+                        new TopicPartitionInfo(
+                            0,
+                            null,
+                            List.of(
+                                new Node(1, "host1", 9092),
+                                new Node(2, "host2", 9092)
+                            ),
+                            List.of())
+                    ))
+            )
+        )
+        .topicConfigs(
+            Map.of(
+                "testTopic", List.of(
+                    new ConfigEntry(
+                        "custom.config",
+                        "100500",
+                        ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG,
+                        false,
+                        false,
+                        List.of(),
+                        ConfigEntry.ConfigType.INT,
+                        null
+                    )
+                )
+            )
+        )
+        .build();
+
+    StepVerifier.create(topicsExporter.export(cluster))
+        .assertNext(entityList -> {
+          assertThat(entityList.getItems())
+              .hasSize(1);
+
+          DataEntity topicEntity = entityList.getItems().get(0);
+          assertThat(topicEntity.getName()).isNotEmpty();
+          assertThat(topicEntity.getOddrn())
+              .isEqualTo("//kafka/cluster/localhost:19092,localhost:9092/topics/testTopic");
+          assertThat(topicEntity.getType()).isEqualTo(DataEntityType.KAFKA_TOPIC);
+          assertThat(topicEntity.getMetadata())
+              .hasSize(1)
+              .singleElement()
+              .satisfies(e ->
+                  assertThat(e.getMetadata())
+                      .containsExactlyInAnyOrderEntriesOf(
+                          Map.of(
+                              "partitions", 1,
+                              "replication_factor", 2,
+                              "custom.config", "100500")));
+
+          assertThat(topicEntity.getDataset()).isNotNull();
+          assertThat(topicEntity.getDataset().getFieldList())
+              .hasSize(4); // 2 field for key, 2 for value
+        })
+        .verifyComplete();
+  }
+
+
+}

+ 272 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/AvroExtractorTest.java

@@ -0,0 +1,272 @@
+package com.provectus.kafka.ui.service.integration.odd.schema;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.opendatadiscovery.client.model.DataSetField;
+import org.opendatadiscovery.client.model.DataSetFieldType;
+import org.opendatadiscovery.oddrn.model.KafkaPath;
+
+class AvroExtractorTest {
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void test(boolean isKey) {
+    var list = AvroExtractor.extract(
+        new SchemaSubject()
+            .schema("""
+                {
+                    "type": "record",
+                    "name": "Message",
+                    "namespace": "com.provectus.kafka",
+                    "fields":
+                    [
+                        {
+                            "name": "f1",
+                            "type":
+                            {
+                                "type": "array",
+                                "items":
+                                {
+                                    "type": "record",
+                                    "name": "ArrElement",
+                                    "fields":
+                                    [
+                                        {
+                                            "name": "longmap",
+                                            "type":
+                                            {
+                                                "type": "map",
+                                                "values": "long"
+                                            }
+                                        }
+                                    ]
+                                }
+                            }
+                        },
+                        {
+                            "name": "f2",
+                            "type":
+                            {
+                                "type": "record",
+                                "name": "InnerMessage",
+                                "fields":
+                                [
+                                    {
+                                        "name": "text",
+                                        "doc": "string field here",
+                                        "type": "string"
+                                    },
+                                    {
+                                        "name": "innerMsgRef",
+                                        "type": "InnerMessage"
+                                    },
+                                    {
+                                        "name": "nullable_union",
+                                        "type":
+                                        [
+                                            "null",
+                                            "string",
+                                            "int"
+                                        ],
+                                        "default": null
+                                    },
+                                    {
+                                        "name": "order_enum",
+                                        "type":
+                                        {
+                                            "type": "enum",
+                                            "name": "Suit",
+                                            "symbols":
+                                            [
+                                                "SPADES",
+                                                "HEARTS"
+                                            ]
+                                        }
+                                    },
+                                    {
+                                        "name": "str_list",
+                                        "type":
+                                        {
+                                            "type": "array",
+                                            "items": "string"
+                                        }
+                                    }
+                                ]
+                            }
+                        }
+                    ]
+                }
+                """),
+
+        KafkaPath.builder()
+            .cluster("localhost:9092")
+            .topic("someTopic")
+            .build(),
+        isKey
+    );
+
+    String baseOddrn = "//kafka/cluster/localhost:9092/topics/someTopic/columns/" + (isKey ? "key" : "value");
+
+    assertThat(list).contains(
+        DataSetFieldsExtractors.rootField(
+            KafkaPath.builder().cluster("localhost:9092").topic("someTopic").build(),
+            isKey
+        ),
+        new DataSetField()
+            .name("f1")
+            .parentFieldOddrn(baseOddrn)
+            .oddrn(baseOddrn + "/f1")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.LIST)
+                    .logicalType("array")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("ArrElement")
+            .parentFieldOddrn(baseOddrn + "/f1")
+            .oddrn(baseOddrn + "/f1/items/ArrElement")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.STRUCT)
+                    .logicalType("com.provectus.kafka.ArrElement")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("longmap")
+            .parentFieldOddrn(baseOddrn + "/f1/items/ArrElement")
+            .oddrn(baseOddrn + "/f1/items/ArrElement/fields/longmap")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.MAP)
+                    .logicalType("map")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("key")
+            .parentFieldOddrn(baseOddrn + "/f1/items/ArrElement/fields/longmap")
+            .oddrn(baseOddrn + "/f1/items/ArrElement/fields/longmap/key")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.STRING)
+                    .logicalType("string")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("value")
+            .parentFieldOddrn(baseOddrn + "/f1/items/ArrElement/fields/longmap")
+            .oddrn(baseOddrn + "/f1/items/ArrElement/fields/longmap/value")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.INTEGER)
+                    .logicalType("long")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("f2")
+            .parentFieldOddrn(baseOddrn)
+            .oddrn(baseOddrn + "/f2")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.STRUCT)
+                    .logicalType("com.provectus.kafka.InnerMessage")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("text")
+            .parentFieldOddrn(baseOddrn + "/f2")
+            .oddrn(baseOddrn + "/f2/fields/text")
+            .description("string field here")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.STRING)
+                    .logicalType("string")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("innerMsgRef")
+            .parentFieldOddrn(baseOddrn + "/f2")
+            .oddrn(baseOddrn + "/f2/fields/innerMsgRef")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.STRUCT)
+                    .logicalType("com.provectus.kafka.InnerMessage")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("nullable_union")
+            .parentFieldOddrn(baseOddrn + "/f2")
+            .oddrn(baseOddrn + "/f2/fields/nullable_union")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.UNION)
+                    .logicalType("union")
+                    .isNullable(true)
+            ),
+        new DataSetField()
+            .name("string")
+            .parentFieldOddrn(baseOddrn + "/f2/fields/nullable_union")
+            .oddrn(baseOddrn + "/f2/fields/nullable_union/values/string")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.STRING)
+                    .logicalType("string")
+                    .isNullable(true)
+            ),
+        new DataSetField()
+            .name("int")
+            .parentFieldOddrn(baseOddrn + "/f2/fields/nullable_union")
+            .oddrn(baseOddrn + "/f2/fields/nullable_union/values/int")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.INTEGER)
+                    .logicalType("int")
+                    .isNullable(true)
+            ),
+        new DataSetField()
+            .name("int")
+            .parentFieldOddrn(baseOddrn + "/f2/fields/nullable_union")
+            .oddrn(baseOddrn + "/f2/fields/nullable_union/values/int")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.INTEGER)
+                    .logicalType("int")
+                    .isNullable(true)
+            ),
+        new DataSetField()
+            .name("order_enum")
+            .parentFieldOddrn(baseOddrn + "/f2")
+            .oddrn(baseOddrn + "/f2/fields/order_enum")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.STRING)
+                    .logicalType("enum")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("str_list")
+            .parentFieldOddrn(baseOddrn + "/f2")
+            .oddrn(baseOddrn + "/f2/fields/str_list")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.LIST)
+                    .logicalType("array")
+                    .isNullable(false)
+            ),
+        new DataSetField()
+            .name("string")
+            .parentFieldOddrn(baseOddrn + "/f2/fields/str_list")
+            .oddrn(baseOddrn + "/f2/fields/str_list/items/string")
+            .type(
+                new DataSetFieldType()
+                    .type(DataSetFieldType.TypeEnum.STRING)
+                    .logicalType("string")
+                    .isNullable(false)
+            )
+    );
+  }
+
+}

+ 145 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/JsonSchemaExtractorTest.java

@@ -0,0 +1,145 @@
+package com.provectus.kafka.ui.service.integration.odd.schema;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.opendatadiscovery.client.model.DataSetField;
+import org.opendatadiscovery.client.model.DataSetFieldType;
+import org.opendatadiscovery.client.model.MetadataExtension;
+import org.opendatadiscovery.oddrn.model.KafkaPath;
+
+class JsonSchemaExtractorTest {
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void test(boolean isKey) {
+    String jsonSchema = """
+        {
+            "$id": "http://example.com/test.TestMsg",
+            "$schema": "https://json-schema.org/draft/2020-12/schema",
+            "type": "object",
+            "required": [ "int32_field" ],
+            "properties":
+            {
+                "int32_field": { "type": "integer", "title": "field title" },
+                "lst_s_field": { "type": "array", "items": { "type": "string" }, "description": "field descr" },
+                "untyped_struct_field": { "type": "object", "properties": {} },
+                "union_field": { "type": [ "number", "object", "null" ] },
+                "struct_field": {
+                    "type": "object",
+                    "properties": {
+                        "bool_field": { "type": "boolean" }
+                    }
+                }
+            }
+        }
+        """;
+    var fields = JsonSchemaExtractor.extract(
+        new SchemaSubject().schema(jsonSchema),
+        KafkaPath.builder()
+            .cluster("localhost:9092")
+            .topic("someTopic")
+            .build(),
+        isKey
+    );
+
+    String baseOddrn = "//kafka/cluster/localhost:9092/topics/someTopic/columns/" + (isKey ? "key" : "value");
+
+    assertThat(fields).contains(
+        DataSetFieldsExtractors.rootField(
+            KafkaPath.builder().cluster("localhost:9092").topic("someTopic").build(),
+            isKey
+        ),
+        new DataSetField()
+            .name("int32_field")
+            .parentFieldOddrn(baseOddrn)
+            .oddrn(baseOddrn + "/int32_field")
+            .description("field title")
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.NUMBER)
+                .logicalType("Number")
+                .isNullable(false)),
+        new DataSetField()
+            .name("lst_s_field")
+            .parentFieldOddrn(baseOddrn)
+            .oddrn(baseOddrn + "/lst_s_field")
+            .description("field descr")
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.LIST)
+                .logicalType("array")
+                .isNullable(true)),
+        new DataSetField()
+            .name("String")
+            .parentFieldOddrn(baseOddrn + "/lst_s_field")
+            .oddrn(baseOddrn + "/lst_s_field/items/String")
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.STRING)
+                .logicalType("String")
+                .isNullable(false)),
+        new DataSetField()
+            .name("untyped_struct_field")
+            .parentFieldOddrn(baseOddrn)
+            .oddrn(baseOddrn + "/untyped_struct_field")
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.STRUCT)
+                .logicalType("Object")
+                .isNullable(true)),
+        new DataSetField()
+            .name("union_field")
+            .parentFieldOddrn(baseOddrn)
+            .oddrn(baseOddrn + "/union_field/anyOf")
+            .metadata(List.of(new MetadataExtension()
+                .schemaUrl(URI.create("wontbeused.oops"))
+                .metadata(Map.of("criterion", "anyOf"))))
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.UNION)
+                .logicalType("anyOf")
+                .isNullable(true)),
+        new DataSetField()
+            .name("Number")
+            .parentFieldOddrn(baseOddrn + "/union_field/anyOf")
+            .oddrn(baseOddrn + "/union_field/anyOf/values/Number")
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.NUMBER)
+                .logicalType("Number")
+                .isNullable(true)),
+        new DataSetField()
+            .name("Object")
+            .parentFieldOddrn(baseOddrn + "/union_field/anyOf")
+            .oddrn(baseOddrn + "/union_field/anyOf/values/Object")
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.STRUCT)
+                .logicalType("Object")
+                .isNullable(true)),
+        new DataSetField()
+            .name("Null")
+            .parentFieldOddrn(baseOddrn + "/union_field/anyOf")
+            .oddrn(baseOddrn + "/union_field/anyOf/values/Null")
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.UNKNOWN)
+                .logicalType("Null")
+                .isNullable(true)),
+        new DataSetField()
+            .name("struct_field")
+            .parentFieldOddrn(baseOddrn)
+            .oddrn(baseOddrn + "/struct_field")
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.STRUCT)
+                .logicalType("Object")
+                .isNullable(true)),
+        new DataSetField()
+            .name("bool_field")
+            .parentFieldOddrn(baseOddrn + "/struct_field")
+            .oddrn(baseOddrn + "/struct_field/fields/bool_field")
+            .type(new DataSetFieldType()
+                .type(DataSetFieldType.TypeEnum.BOOLEAN)
+                .logicalType("Boolean")
+                .isNullable(true))
+    );
+  }
+}

+ 187 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/schema/ProtoExtractorTest.java

@@ -0,0 +1,187 @@
+package com.provectus.kafka.ui.service.integration.odd.schema;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.sr.model.SchemaSubject;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.opendatadiscovery.client.model.DataSetField;
+import org.opendatadiscovery.client.model.DataSetFieldType;
+import org.opendatadiscovery.oddrn.model.KafkaPath;
+
+class ProtoExtractorTest {
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void test(boolean isKey) {
+    String protoSchema = """
+        syntax = "proto3";
+        package test;
+
+        import "google/protobuf/timestamp.proto";
+        import "google/protobuf/duration.proto";
+        import "google/protobuf/struct.proto";
+        import "google/protobuf/wrappers.proto";
+
+        message TestMsg {
+            map<string, int32> mapField = 100;
+            int32 int32_field = 2;
+            bool bool_field = 3;
+            SampleEnum enum_field = 4;
+
+            enum SampleEnum {
+                ENUM_V1 = 0;
+                ENUM_V2 = 1;
+            }
+
+            google.protobuf.Timestamp ts_field = 5;
+            google.protobuf.Duration duration_field = 8;
+
+            oneof some_oneof1 {
+                google.protobuf.Value one_of_v1 = 9;
+                google.protobuf.Value one_of_v2 = 10;
+            }
+            // wrapper field:
+            google.protobuf.Int64Value int64_w_field = 11;
+
+            //embedded msg
+            EmbeddedMsg emb = 19;
+
+            message EmbeddedMsg {
+                int32 emb_f1 = 1;
+                TestMsg outer_ref = 2;
+            }
+        }""";
+
+    var list = ProtoExtractor.extract(
+        new SchemaSubject()
+            .schema(protoSchema),
+        KafkaPath.builder()
+            .cluster("localhost:9092")
+            .topic("someTopic")
+            .build(),
+        isKey
+    );
+
+    String baseOddrn = "//kafka/cluster/localhost:9092/topics/someTopic/columns/" + (isKey ? "key" : "value");
+
+    assertThat(list)
+        .contains(
+            DataSetFieldsExtractors.rootField(
+                KafkaPath.builder().cluster("localhost:9092").topic("someTopic").build(),
+                isKey
+            ),
+            new DataSetField()
+                .name("mapField")
+                .parentFieldOddrn(baseOddrn)
+                .oddrn(baseOddrn + "/mapField")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.LIST)
+                        .logicalType("repeated")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("int32_field")
+                .parentFieldOddrn(baseOddrn)
+                .oddrn(baseOddrn + "/int32_field")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.INTEGER)
+                        .logicalType("int32")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("enum_field")
+                .parentFieldOddrn(baseOddrn)
+                .oddrn(baseOddrn + "/enum_field")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.STRING)
+                        .logicalType("enum")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("ts_field")
+                .parentFieldOddrn(baseOddrn)
+                .oddrn(baseOddrn + "/ts_field")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.DATETIME)
+                        .logicalType("google.protobuf.Timestamp")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("duration_field")
+                .parentFieldOddrn(baseOddrn)
+                .oddrn(baseOddrn + "/duration_field")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.DURATION)
+                        .logicalType("google.protobuf.Duration")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("one_of_v1")
+                .parentFieldOddrn(baseOddrn)
+                .oddrn(baseOddrn + "/one_of_v1")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.UNKNOWN)
+                        .logicalType("google.protobuf.Value")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("one_of_v2")
+                .parentFieldOddrn(baseOddrn)
+                .oddrn(baseOddrn + "/one_of_v2")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.UNKNOWN)
+                        .logicalType("google.protobuf.Value")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("int64_w_field")
+                .parentFieldOddrn(baseOddrn)
+                .oddrn(baseOddrn + "/int64_w_field")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.INTEGER)
+                        .logicalType("google.protobuf.Int64Value")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("emb")
+                .parentFieldOddrn(baseOddrn)
+                .oddrn(baseOddrn + "/emb")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.STRUCT)
+                        .logicalType("test.TestMsg.EmbeddedMsg")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("emb_f1")
+                .parentFieldOddrn(baseOddrn + "/emb")
+                .oddrn(baseOddrn + "/emb/fields/emb_f1")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.INTEGER)
+                        .logicalType("int32")
+                        .isNullable(true)
+                ),
+            new DataSetField()
+                .name("outer_ref")
+                .parentFieldOddrn(baseOddrn + "/emb")
+                .oddrn(baseOddrn + "/emb/fields/outer_ref")
+                .type(
+                    new DataSetFieldType()
+                        .type(DataSetFieldType.TypeEnum.STRUCT)
+                        .logicalType("test.TestMsg")
+                        .isNullable(true)
+                )
+        );
+  }
+
+}

+ 2 - 0
pom.xml

@@ -42,6 +42,8 @@
         <spring-boot.version>2.7.5</spring-boot.version>
         <spring-security.version>5.7.5</spring-security.version>
         <kafka-ui-serde-api.version>1.0.0</kafka-ui-serde-api.version>
+        <odd-oddrn-generator.version>0.1.15</odd-oddrn-generator.version>
+        <odd-oddrn-client.version>0.1.19</odd-oddrn-client.version>
 
         <!-- Test dependency versions -->
         <junit.version>5.9.1</junit.version>