Bläddra i källkod

[BE] ProtobufFilesSerde imports implementation & refactoring (#3357)

* Feature: Support more complex Protobuf files

The changes in https://github.com/provectus/kafka-ui/pull/2874 added
initial support for using more than 1 Protobuf file in Kafka UI in
absence of a proper schema registry.

This change is building upon that functionality to support more complex
scenarios in which there are multiple Protobuf files being used and not
all of them are explicitly listed (for example imports).

It's using the already present Wire library from Square to do the heavy
lifting and create a comprehensive schema from all Protobuf files and
directories listed in the Kafka UI configuration.

* Refactor schema loading logic and reuse in tests

* Add support for reading Protobufs from ZIP archives

* Remove unused ProtobufFileSerde#toLocation(Path)

* wip

* wip

* wip

* wip

* wip

* wip

* wip

---------

Co-authored-by: Jochen Schalanda <jochen.schalanda@personio.de>
Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Ilya Kuramshin 2 år sedan
förälder
incheckning
4d20cb6958

+ 1 - 2
documentation/compose/kafka-ui-serdes.yaml

@@ -28,8 +28,7 @@ services:
             kafka.clusters.0.serde.0.name: ProtobufFile
             kafka.clusters.0.serde.0.topicKeysPattern: "topic1"
             kafka.clusters.0.serde.0.topicValuesPattern: "topic1"
-            kafka.clusters.0.serde.0.properties.protobufFiles.0: /protofiles/key-types.proto
-            kafka.clusters.0.serde.0.properties.protobufFiles.1: /protofiles/values.proto
+            kafka.clusters.0.serde.0.properties.protobufFilesDir: /protofiles/
             kafka.clusters.0.serde.0.properties.protobufMessageNameForKey: test.MyKey # default type for keys
             kafka.clusters.0.serde.0.properties.protobufMessageName: test.MyValue # default type for values
             kafka.clusters.0.serde.0.properties.protobufMessageNameForKeyByTopic.topic1: test.MySpecificTopicKey # keys type for topic "topic1"

+ 4 - 0
documentation/compose/proto/key-types.proto

@@ -1,11 +1,15 @@
 syntax = "proto3";
 package test;
 
+import "google/protobuf/wrappers.proto";
+
 message MyKey {
     string myKeyF1 = 1;
+    google.protobuf.UInt64Value uint_64_wrapper = 2;
 }
 
 message MySpecificTopicKey {
     string special_field1 = 1;
     string special_field2 = 2;
+    google.protobuf.FloatValue float_wrapper = 3;
 }

+ 13 - 9
documentation/guides/Protobuf.md

@@ -12,22 +12,26 @@ To configure Kafkaui to deserialize protobuf messages using a supplied protobuf
 ```yaml
 kafka:
   clusters:
-    - # Cluster configuration omitted.
-      # protobufFile is the path to the protobuf schema. (deprecated: please use "protobufFiles")
+    - # Cluster configuration omitted...
+      # protobufFilesDir specifies root location for proto files (will be scanned recursively)
+      # NOTE: if 'protobufFilesDir' specified, then 'protobufFile' and 'protobufFiles' settings will be ignored
+      protobufFilesDir: "/path/to/my-protobufs"
+      # (DEPRECATED) protobufFile is the path to the protobuf schema. (deprecated: please use "protobufFiles")
       protobufFile: path/to/my.proto
-      # protobufFiles is the path to one or more protobuf schemas.
-      protobufFiles: 
-        - /path/to/my.proto
-        - /path/to/another.proto
-      # protobufMessageName is the default protobuf type that is used to deserilize
-      # the message's value if the topic is not found in protobufMessageNameByTopic.
+      # (DEPRECATED) protobufFiles is the location of one or more protobuf schemas
+      protobufFiles:
+        - /path/to/my-protobufs/my.proto
+        - /path/to/my-protobufs/another.proto
+        - /path/to/my-protobufs:test/test.proto
+      # protobufMessageName is the default protobuf type that is used to deserialize
+      # the message's value if the topic is not found in protobufMessageNameByTopic.    
       protobufMessageName: my.DefaultValType
       # protobufMessageNameByTopic is a mapping of topic names to protobuf types.
       # This mapping is required and is used to deserialize the Kafka message's value.
       protobufMessageNameByTopic:
         topic1: my.Type1
         topic2: my.Type2
-      # protobufMessageNameForKey is the default protobuf type that is used to deserilize
+      # protobufMessageNameForKey is the default protobuf type that is used to deserialize
       # the message's key if the topic is not found in protobufMessageNameForKeyByTopic.
       protobufMessageNameForKey: my.DefaultKeyType
       # protobufMessageNameForKeyByTopic is a mapping of topic names to protobuf types.

+ 2 - 4
documentation/guides/Serialization.md

@@ -46,10 +46,8 @@ kafka:
       serde:
         - name: ProtobufFile
           properties:
-            # path to the protobuf schema files
-            protobufFiles:
-              - path/to/my.proto
-              - path/to/another.proto
+            # path to the protobuf schema files directory
+            protobufFilesDir: "path/to/protofiles"
             # default protobuf type that is used for KEY serialization/deserialization
             # optional
             protobufMessageNameForKey: my.Type1

+ 285 - 112
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerde.java

@@ -1,9 +1,36 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.AnyProto;
+import com.google.protobuf.ApiProto;
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DurationProto;
 import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.EmptyProto;
+import com.google.protobuf.FieldMaskProto;
+import com.google.protobuf.SourceContextProto;
+import com.google.protobuf.StructProto;
+import com.google.protobuf.TimestampProto;
+import com.google.protobuf.TypeProto;
+import com.google.protobuf.WrappersProto;
 import com.google.protobuf.util.JsonFormat;
+import com.google.type.ColorProto;
+import com.google.type.DateProto;
+import com.google.type.DateTimeProto;
+import com.google.type.DayOfWeekProto;
+import com.google.type.ExprProto;
+import com.google.type.FractionProto;
+import com.google.type.IntervalProto;
+import com.google.type.LatLngProto;
+import com.google.type.MoneyProto;
+import com.google.type.MonthProto;
+import com.google.type.PhoneNumberProto;
+import com.google.type.PostalAddressProto;
+import com.google.type.QuaternionProto;
+import com.google.type.TimeOfDayProto;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
@@ -11,13 +38,19 @@ import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
+import com.squareup.wire.schema.ErrorCollector;
+import com.squareup.wire.schema.Linker;
+import com.squareup.wire.schema.Loader;
+import com.squareup.wire.schema.Location;
+import com.squareup.wire.schema.ProtoFile;
+import com.squareup.wire.schema.internal.parser.ProtoFileElement;
+import com.squareup.wire.schema.internal.parser.ProtoParser;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -28,7 +61,10 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 
+@Slf4j
 public class ProtobufFileSerde implements BuiltInSerde {
 
   public static String name() {
@@ -51,132 +87,35 @@ public class ProtobufFileSerde implements BuiltInSerde {
   @Override
   public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties,
                                      PropertyResolver globalProperties) {
-    Optional<String> protobufFile = kafkaClusterProperties.getProperty("protobufFile", String.class);
-    Optional<List<String>> protobufFiles = kafkaClusterProperties.getListProperty("protobufFiles", String.class);
-    return protobufFile.isPresent() || protobufFiles.filter(files -> !files.isEmpty()).isPresent();
+    return Configuration.canBeAutoConfigured(kafkaClusterProperties);
   }
 
   @Override
   public void autoConfigure(PropertyResolver kafkaClusterProperties,
                             PropertyResolver globalProperties) {
-    configure(kafkaClusterProperties);
+    configure(Configuration.create(kafkaClusterProperties));
   }
 
   @Override
   public void configure(PropertyResolver serdeProperties,
                         PropertyResolver kafkaClusterProperties,
                         PropertyResolver globalProperties) {
-    configure(serdeProperties);
-  }
-
-  private void configure(PropertyResolver properties) {
-    Map<Path, ProtobufSchema> protobufSchemas = joinPathProperties(properties).stream()
-        .map(path -> Map.entry(path, new ProtobufSchema(readFileAsString(path))))
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
-    // Load all referenced message schemas and store their source proto file with the descriptors
-    Map<Descriptor, Path> descriptorPaths = new HashMap<>();
-    Optional<String> protobufMessageName = properties.getProperty("protobufMessageName", String.class);
-    protobufMessageName.ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
-
-    Optional<String> protobufMessageNameForKey =
-        properties.getProperty("protobufMessageNameForKey", String.class);
-    protobufMessageNameForKey
-        .ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
-
-    Optional<Map<String, String>> protobufMessageNameByTopic =
-        properties.getMapProperty("protobufMessageNameByTopic", String.class, String.class);
-    protobufMessageNameByTopic
-        .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
-
-    Optional<Map<String, String>> protobufMessageNameForKeyByTopic =
-        properties.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class);
-    protobufMessageNameForKeyByTopic
-        .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
-
-    // Fill dictionary for descriptor lookup by full message name
-    Map<String, Descriptor> descriptorMap = descriptorPaths.keySet().stream()
-        .collect(Collectors.toMap(Descriptor::getFullName, Function.identity()));
-
-    configure(
-        protobufMessageName.map(descriptorMap::get).orElse(null),
-        protobufMessageNameForKey.map(descriptorMap::get).orElse(null),
-        descriptorPaths,
-        protobufMessageNameByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of()),
-        protobufMessageNameForKeyByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of())
-    );
+    configure(Configuration.create(serdeProperties));
   }
 
   @VisibleForTesting
-  void configure(
-      @Nullable Descriptor defaultMessageDescriptor,
-      @Nullable Descriptor defaultKeyMessageDescriptor,
-      Map<Descriptor, Path> descriptorPaths,
-      Map<String, Descriptor> messageDescriptorMap,
-      Map<String, Descriptor> keyMessageDescriptorMap) {
-    if (defaultMessageDescriptor == null
-        && defaultKeyMessageDescriptor == null
-        && messageDescriptorMap.isEmpty()
-        && keyMessageDescriptorMap.isEmpty()) {
+  void configure(Configuration configuration) {
+    if (configuration.defaultMessageDescriptor() == null
+        && configuration.defaultKeyMessageDescriptor() == null
+        && configuration.messageDescriptorMap().isEmpty()
+        && configuration.keyMessageDescriptorMap().isEmpty()) {
       throw new ValidationException("Neither default, not per-topic descriptors defined for " + name() + " serde");
     }
-    this.defaultMessageDescriptor = defaultMessageDescriptor;
-    this.defaultKeyMessageDescriptor = defaultKeyMessageDescriptor;
-    this.descriptorPaths = descriptorPaths;
-    this.messageDescriptorMap = messageDescriptorMap;
-    this.keyMessageDescriptorMap = keyMessageDescriptorMap;
-  }
-
-  private static void addProtobufSchema(Map<Descriptor, Path> descriptorPaths,
-                                 Map<Path, ProtobufSchema> protobufSchemas,
-                                 String messageName) {
-    var descriptorAndPath = getDescriptorAndPath(protobufSchemas, messageName);
-    descriptorPaths.put(descriptorAndPath.getKey(), descriptorAndPath.getValue());
-  }
-
-  private static void addProtobufSchemas(Map<Descriptor, Path> descriptorPaths,
-                                  Map<Path, ProtobufSchema> protobufSchemas,
-                                  Map<String, String> messageNamesByTopic) {
-    messageNamesByTopic.values().stream()
-        .map(msgName -> getDescriptorAndPath(protobufSchemas, msgName))
-        .forEach(entry -> descriptorPaths.put(entry.getKey(), entry.getValue()));
-  }
-
-  private static List<Path> joinPathProperties(PropertyResolver propertyResolver) {
-    return Stream.concat(
-            propertyResolver.getProperty("protobufFile", String.class).map(List::of).stream(),
-            propertyResolver.getListProperty("protobufFiles", String.class).stream())
-        .flatMap(Collection::stream)
-        .distinct()
-        .map(Path::of)
-        .collect(Collectors.toList());
-  }
-
-  private static Map.Entry<Descriptor, Path> getDescriptorAndPath(Map<Path, ProtobufSchema> protobufSchemas,
-                                                                  String msgName) {
-    return protobufSchemas.entrySet().stream()
-            .filter(schema -> schema.getValue().toDescriptor(msgName) != null)
-            .map(schema -> Map.entry(schema.getValue().toDescriptor(msgName), schema.getKey()))
-            .findFirst()
-            .orElseThrow(() -> new NullPointerException(
-                    "The given message type not found in protobuf definition: " + msgName));
-  }
-
-  private static String readFileAsString(Path path) {
-    try {
-      return Files.readString(path);
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  private Map<String, Descriptor> populateDescriptors(Map<String, Descriptor> descriptorMap,
-                                                      Map<String, String> messageNameMap) {
-    Map<String, Descriptor> descriptors = new HashMap<>();
-    for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
-      descriptors.put(entry.getKey(), descriptorMap.get(entry.getValue()));
-    }
-    return descriptors;
+    this.defaultMessageDescriptor = configuration.defaultMessageDescriptor();
+    this.defaultKeyMessageDescriptor = configuration.defaultKeyMessageDescriptor();
+    this.descriptorPaths = configuration.descriptorPaths();
+    this.messageDescriptorMap = configuration.messageDescriptorMap();
+    this.keyMessageDescriptorMap = configuration.keyMessageDescriptorMap();
   }
 
   @Override
@@ -249,4 +188,238 @@ public class ProtobufFileSerde implements BuiltInSerde {
         Map.of("messageName", descriptor.getFullName())
     );
   }
+
+  @SneakyThrows
+  private static String readFileAsString(Path path) {
+    return Files.readString(path);
+  }
+
+  //----------------------------------------------------------------------------------------------------------------
+
+  @VisibleForTesting
+  record Configuration(@Nullable Descriptor defaultMessageDescriptor,
+                       @Nullable Descriptor defaultKeyMessageDescriptor,
+                       Map<Descriptor, Path> descriptorPaths,
+                       Map<String, Descriptor> messageDescriptorMap,
+                       Map<String, Descriptor> keyMessageDescriptorMap) {
+
+    static boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties) {
+      Optional<String> protobufFile = kafkaClusterProperties.getProperty("protobufFile", String.class);
+      Optional<List<String>> protobufFiles = kafkaClusterProperties.getListProperty("protobufFiles", String.class);
+      Optional<String> protobufFilesDir = kafkaClusterProperties.getProperty("protobufFilesDir", String.class);
+      return protobufFilesDir.isPresent()
+          || protobufFile.isPresent()
+          || protobufFiles.filter(files -> !files.isEmpty()).isPresent();
+    }
+
+    static Configuration create(PropertyResolver properties) {
+      var protobufSchemas = loadSchemas(
+          properties.getProperty("protobufFile", String.class),
+          properties.getListProperty("protobufFiles", String.class),
+          properties.getProperty("protobufFilesDir", String.class)
+      );
+
+      // Load all referenced message schemas and store their source proto file with the descriptors
+      Map<Descriptor, Path> descriptorPaths = new HashMap<>();
+      Optional<String> protobufMessageName = properties.getProperty("protobufMessageName", String.class);
+      protobufMessageName.ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
+
+      Optional<String> protobufMessageNameForKey =
+          properties.getProperty("protobufMessageNameForKey", String.class);
+      protobufMessageNameForKey
+          .ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
+
+      Optional<Map<String, String>> protobufMessageNameByTopic =
+          properties.getMapProperty("protobufMessageNameByTopic", String.class, String.class);
+      protobufMessageNameByTopic
+          .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
+
+      Optional<Map<String, String>> protobufMessageNameForKeyByTopic =
+          properties.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class);
+      protobufMessageNameForKeyByTopic
+          .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
+
+      // Fill dictionary for descriptor lookup by full message name
+      Map<String, Descriptor> descriptorMap = descriptorPaths.keySet().stream()
+          .collect(Collectors.toMap(Descriptor::getFullName, Function.identity()));
+
+      return new Configuration(
+          protobufMessageName.map(descriptorMap::get).orElse(null),
+          protobufMessageNameForKey.map(descriptorMap::get).orElse(null),
+          descriptorPaths,
+          protobufMessageNameByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of()),
+          protobufMessageNameForKeyByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of())
+      );
+    }
+
+    private static Map.Entry<Descriptor, Path> getDescriptorAndPath(Map<Path, ProtobufSchema> protobufSchemas,
+                                                                    String msgName) {
+      return protobufSchemas.entrySet().stream()
+          .filter(schema -> schema.getValue().toDescriptor(msgName) != null)
+          .map(schema -> Map.entry(schema.getValue().toDescriptor(msgName), schema.getKey()))
+          .findFirst()
+          .orElseThrow(() -> new NullPointerException(
+              "The given message type not found in protobuf definition: " + msgName));
+    }
+
+    private static Map<String, Descriptor> populateDescriptors(Map<String, Descriptor> descriptorMap,
+                                                               Map<String, String> messageNameMap) {
+      Map<String, Descriptor> descriptors = new HashMap<>();
+      for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
+        descriptors.put(entry.getKey(), descriptorMap.get(entry.getValue()));
+      }
+      return descriptors;
+    }
+
+    @VisibleForTesting
+    static Map<Path, ProtobufSchema> loadSchemas(Optional<String> protobufFile,
+                                                 Optional<List<String>> protobufFiles,
+                                                 Optional<String> protobufFilesDir) {
+      if (protobufFilesDir.isPresent()) {
+        if (protobufFile.isPresent() || protobufFiles.isPresent()) {
+          log.warn("protobufFile and protobufFiles properties will be ignored, since protobufFilesDir provided");
+        }
+        List<ProtoFile> loadedFiles = new ProtoSchemaLoader(protobufFilesDir.get()).load();
+        Map<String, ProtoFileElement> allPaths = loadedFiles.stream()
+            .collect(Collectors.toMap(f -> f.getLocation().getPath(), ProtoFile::toElement));
+        return loadedFiles.stream()
+            .collect(Collectors.toMap(
+                f -> Path.of(f.getLocation().getBase(), f.getLocation().getPath()),
+                f -> new ProtobufSchema(f.toElement(), List.of(), allPaths)));
+      }
+      //Supporting for backward-compatibility. Normally, protobufFilesDir setting should be used
+      return Stream.concat(
+              protobufFile.stream(),
+              protobufFiles.stream().flatMap(Collection::stream)
+          )
+          .distinct()
+          .map(Path::of)
+          .collect(Collectors.toMap(path -> path, path -> new ProtobufSchema(readFileAsString(path))));
+    }
+
+    private static void addProtobufSchema(Map<Descriptor, Path> descriptorPaths,
+                                          Map<Path, ProtobufSchema> protobufSchemas,
+                                          String messageName) {
+      var descriptorAndPath = getDescriptorAndPath(protobufSchemas, messageName);
+      descriptorPaths.put(descriptorAndPath.getKey(), descriptorAndPath.getValue());
+    }
+
+    private static void addProtobufSchemas(Map<Descriptor, Path> descriptorPaths,
+                                           Map<Path, ProtobufSchema> protobufSchemas,
+                                           Map<String, String> messageNamesByTopic) {
+      messageNamesByTopic.values().stream()
+          .map(msgName -> getDescriptorAndPath(protobufSchemas, msgName))
+          .forEach(entry -> descriptorPaths.put(entry.getKey(), entry.getValue()));
+    }
+  }
+
+  static class ProtoSchemaLoader {
+
+    private final Path baseLocation;
+
+    ProtoSchemaLoader(String baseLocationStr) {
+      this.baseLocation = Path.of(baseLocationStr);
+      if (!Files.isReadable(baseLocation)) {
+        throw new ValidationException("proto files directory not readable");
+      }
+    }
+
+    List<ProtoFile> load() {
+      Map<String, ProtoFile> knownTypes = knownProtoFiles();
+
+      Map<String, ProtoFile> filesByLocations = new HashMap<>();
+      filesByLocations.putAll(knownTypes);
+      filesByLocations.putAll(loadFilesWithLocations());
+
+      Linker linker = new Linker(
+          createFilesLoader(filesByLocations),
+          new ErrorCollector(),
+          true,
+          true
+      );
+      var schema = linker.link(filesByLocations.values());
+      linker.getErrors().throwIfNonEmpty();
+      return schema.getProtoFiles()
+          .stream()
+          .filter(p -> !knownTypes.containsKey(p.getLocation().getPath())) //filtering known types
+          .toList();
+    }
+
+    private Map<String, ProtoFile> knownProtoFiles() {
+      return Stream.of(
+          loadKnownProtoFile("google/type/color.proto", ColorProto.getDescriptor()),
+          loadKnownProtoFile("google/type/date.proto", DateProto.getDescriptor()),
+          loadKnownProtoFile("google/type/datetime.proto", DateTimeProto.getDescriptor()),
+          loadKnownProtoFile("google/type/dayofweek.proto", DayOfWeekProto.getDescriptor()),
+          loadKnownProtoFile("google/type/decimal.proto", com.google.type.DecimalProto.getDescriptor()),
+          loadKnownProtoFile("google/type/expr.proto", ExprProto.getDescriptor()),
+          loadKnownProtoFile("google/type/fraction.proto", FractionProto.getDescriptor()),
+          loadKnownProtoFile("google/type/interval.proto", IntervalProto.getDescriptor()),
+          loadKnownProtoFile("google/type/latlng.proto", LatLngProto.getDescriptor()),
+          loadKnownProtoFile("google/type/money.proto", MoneyProto.getDescriptor()),
+          loadKnownProtoFile("google/type/month.proto", MonthProto.getDescriptor()),
+          loadKnownProtoFile("google/type/phone_number.proto", PhoneNumberProto.getDescriptor()),
+          loadKnownProtoFile("google/type/postal_address.proto", PostalAddressProto.getDescriptor()),
+          loadKnownProtoFile("google/type/quaternion.prot", QuaternionProto.getDescriptor()),
+          loadKnownProtoFile("google/type/timeofday.proto", TimeOfDayProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/any.proto", AnyProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/api.proto", ApiProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/descriptor.proto", DescriptorProtos.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/duration.proto", DurationProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/empty.proto", EmptyProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/field_mask.proto", FieldMaskProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/source_context.proto", SourceContextProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/struct.proto", StructProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/timestamp.proto", TimestampProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/type.proto", TypeProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/wrappers.proto", WrappersProto.getDescriptor())
+      ).collect(Collectors.toMap(p -> p.getLocation().getPath(), p -> p));
+    }
+
+    private ProtoFile loadKnownProtoFile(String path, Descriptors.FileDescriptor fileDescriptor) {
+      String protoFileString = null;
+      // know type file contains either message or enum
+      if (!fileDescriptor.getMessageTypes().isEmpty()) {
+        protoFileString = new ProtobufSchema(fileDescriptor.getMessageTypes().get(0)).canonicalString();
+      } else if (!fileDescriptor.getEnumTypes().isEmpty()) {
+        protoFileString = new ProtobufSchema(fileDescriptor.getEnumTypes().get(0)).canonicalString();
+      } else {
+        throw new IllegalStateException();
+      }
+      return ProtoFile.Companion.get(ProtoParser.Companion.parse(Location.get(path), protoFileString));
+    }
+
+    private Loader createFilesLoader(Map<String, ProtoFile> files) {
+      return new Loader() {
+        @Override
+        public @NotNull ProtoFile load(@NotNull String path) {
+          return Preconditions.checkNotNull(files.get(path), "ProtoFile not found for import '%s'", path);
+        }
+
+        @Override
+        public @NotNull Loader withErrors(@NotNull ErrorCollector errorCollector) {
+          return this;
+        }
+      };
+    }
+
+    @SneakyThrows
+    private Map<String, ProtoFile> loadFilesWithLocations() {
+      Map<String, ProtoFile> filesByLocations = new HashMap<>();
+      try (var files = Files.walk(baseLocation)) {
+        files.filter(p -> !Files.isDirectory(p) && p.toString().endsWith(".proto"))
+            .forEach(path -> {
+              // relative path will be used as "import" statement
+              String relativePath = baseLocation.relativize(path).toString();
+              var protoFileElement = ProtoParser.Companion.parse(
+                  Location.get(baseLocation.toString(), relativePath),
+                  readFileAsString(path)
+              );
+              filesByLocations.put(relativePath, ProtoFile.Companion.get(protoFileElement));
+            });
+      }
+      return filesByLocations;
+    }
+  }
+
 }

+ 263 - 253
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerdeTest.java

@@ -10,14 +10,16 @@ import com.google.protobuf.Descriptors;
 import com.google.protobuf.util.JsonFormat;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde.Configuration;
+import com.squareup.wire.schema.ProtoFile;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import lombok.SneakyThrows;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.springframework.util.ResourceUtils;
 
@@ -29,28 +31,29 @@ class ProtobufFileSerdeTest {
   private static final String sampleBookMsgJson = "{\"version\": 1, \"people\": ["
       + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\", \"phones\":[]}]}";
 
-  private static final String sampleSensorMsgJson = "{ \"name\": \"My Sensor\", "
-      + "\"temperature\": 20.5, \"humidity\": 50, \"door\": \"OPEN\" }";
+  private static final String sampleLangDescriptionMsgJson = "{ \"lang\": \"EN\", "
+      + "\"descr\": \"Some description here\" }";
 
   // Sample message of type `test.Person`
   private byte[] personMessageBytes;
   // Sample message of type `test.AddressBook`
   private byte[] addressBookMessageBytes;
-  private byte[] sensorMessageBytes;
-  private Path addressBookSchemaPath;
-  private Path sensorSchemaPath;
-
+  private byte[] langDescriptionMessageBytes;
   private Descriptors.Descriptor personDescriptor;
   private Descriptors.Descriptor addressBookDescriptor;
-  private Descriptors.Descriptor sensorDescriptor;
+  private Descriptors.Descriptor langDescriptionDescriptor;
   private Map<Descriptors.Descriptor, Path> descriptorPaths;
 
   @BeforeEach
   void setUp() throws Exception {
-    addressBookSchemaPath = ResourceUtils.getFile("classpath:address-book.proto").toPath();
-    sensorSchemaPath = ResourceUtils.getFile("classpath:sensor.proto").toPath();
+    Map<Path, ProtobufSchema> files = ProtobufFileSerde.Configuration.loadSchemas(
+        Optional.empty(),
+        Optional.empty(),
+        Optional.of(protoFilesDir())
+    );
 
-    ProtobufSchema addressBookSchema = new ProtobufSchema(Files.readString(addressBookSchemaPath));
+    Path addressBookSchemaPath = ResourceUtils.getFile("classpath:protobuf-serde/address-book.proto").toPath();
+    var addressBookSchema = files.get(addressBookSchemaPath);
     var builder = addressBookSchema.newMessageBuilder("test.Person");
     JsonFormat.parser().merge(samplePersonMsgJson, builder);
     personMessageBytes = builder.build().toByteArray();
@@ -61,63 +64,241 @@ class ProtobufFileSerdeTest {
     personDescriptor = addressBookSchema.toDescriptor("test.Person");
     addressBookDescriptor = addressBookSchema.toDescriptor("test.AddressBook");
 
-    ProtobufSchema sensorSchema = new ProtobufSchema(Files.readString(sensorSchemaPath));
-    builder = sensorSchema.newMessageBuilder("iot.Sensor");
-    JsonFormat.parser().merge(sampleSensorMsgJson, builder);
-    sensorMessageBytes = builder.build().toByteArray();
-    sensorDescriptor = sensorSchema.toDescriptor("iot.Sensor");
+    Path languageDescriptionPath = ResourceUtils.getFile("classpath:protobuf-serde/lang-description.proto").toPath();
+    var languageDescriptionSchema = files.get(languageDescriptionPath);
+    builder = languageDescriptionSchema.newMessageBuilder("test.LanguageDescription");
+    JsonFormat.parser().merge(sampleLangDescriptionMsgJson, builder);
+    langDescriptionMessageBytes = builder.build().toByteArray();
+    langDescriptionDescriptor = languageDescriptionSchema.toDescriptor("test.LanguageDescription");
 
     descriptorPaths = Map.of(
         personDescriptor, addressBookSchemaPath,
-        addressBookDescriptor, addressBookSchemaPath,
-        sensorDescriptor, sensorSchemaPath
+        addressBookDescriptor, addressBookSchemaPath
     );
   }
 
-
   @Test
-  void testDeserialize() {
-    var messageNameMap = Map.of(
-        "persons", personDescriptor,
-        "books", addressBookDescriptor
-    );
-    var keyMessageNameMap = Map.of(
-        "books", addressBookDescriptor);
-
-    var serde = new ProtobufFileSerde();
-    serde.configure(
-        null,
-        null,
-        descriptorPaths,
-        messageNameMap,
-        keyMessageNameMap
-    );
+  void loadsAllProtoFiledFromTargetDirectory() throws Exception {
+    var protoDir = ResourceUtils.getFile("classpath:protobuf-serde/").getPath();
+    List<ProtoFile> files = new ProtobufFileSerde.ProtoSchemaLoader(protoDir).load();
+    assertThat(files).hasSize(4);
+    assertThat(files)
+        .map(f -> f.getLocation().getPath())
+        .containsExactlyInAnyOrder(
+            "language/language.proto",
+            "sensor.proto",
+            "address-book.proto",
+            "lang-description.proto"
+        );
+  }
 
-    var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
-        .deserialize(null, personMessageBytes);
-    assertJsonEquals(samplePersonMsgJson, deserializedPerson.getResult());
+  @SneakyThrows
+  private String protoFilesDir() {
+    return ResourceUtils.getFile("classpath:protobuf-serde/").getPath();
+  }
 
-    var deserializedBook = serde.deserializer("books", Serde.Target.KEY)
-        .deserialize(null, addressBookMessageBytes);
-    assertJsonEquals(sampleBookMsgJson, deserializedBook.getResult());
+  @Nested
+  class ConfigurationTests {
+
+    @Test
+    void canBeAutoConfiguredReturnsNoProtoPropertiesProvided() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      assertThat(Configuration.canBeAutoConfigured(resolver))
+          .isFalse();
+    }
+
+    @Test
+    void canBeAutoConfiguredReturnsTrueIfNoProtoFileHasBeenProvided() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFile", String.class))
+          .thenReturn(Optional.of("file.proto"));
+      assertThat(Configuration.canBeAutoConfigured(resolver))
+          .isTrue();
+    }
+
+    @Test
+    void canBeAutoConfiguredReturnsTrueIfProtoFilesHasBeenProvided() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getListProperty("protobufFiles", String.class))
+          .thenReturn(Optional.of(List.of("file.proto")));
+      assertThat(Configuration.canBeAutoConfigured(resolver))
+          .isTrue();
+    }
+
+    @Test
+    void canBeAutoConfiguredReturnsTrueIfProtoFilesDirProvided() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of("/filesDir"));
+      assertThat(Configuration.canBeAutoConfigured(resolver))
+          .isTrue();
+    }
+
+    @Test
+    void unknownSchemaAsDefaultThrowsException() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getProperty("protobufMessageName", String.class))
+          .thenReturn(Optional.of("test.NotExistent"));
+
+      assertThatThrownBy(() -> Configuration.create(resolver))
+          .isInstanceOf(NullPointerException.class)
+          .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+    }
+
+    @Test
+    void unknownSchemaAsDefaultForKeyThrowsException() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getProperty("protobufMessageNameForKey", String.class))
+          .thenReturn(Optional.of("test.NotExistent"));
+
+      assertThatThrownBy(() -> Configuration.create(resolver))
+          .isInstanceOf(NullPointerException.class)
+          .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+    }
+
+    @Test
+    void unknownSchemaAsTopicSchemaThrowsException() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
+          .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
+
+      assertThatThrownBy(() -> Configuration.create(resolver))
+          .isInstanceOf(NullPointerException.class)
+          .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+    }
+
+    @Test
+    void unknownSchemaAsTopicSchemaForKeyThrowsException() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class))
+          .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
+
+      assertThatThrownBy(() -> Configuration.create(resolver))
+          .isInstanceOf(NullPointerException.class)
+          .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+    }
+
+    @Test
+    void createConfigureFillsDescriptorMappingsWhenProtoFilesListProvided() throws Exception {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFile", String.class))
+          .thenReturn(Optional.of(
+              ResourceUtils.getFile("classpath:protobuf-serde/sensor.proto").getPath()));
+
+      when(resolver.getListProperty("protobufFiles", String.class))
+          .thenReturn(Optional.of(
+              List.of(
+                  ResourceUtils.getFile("classpath:protobuf-serde/address-book.proto").getPath())));
+
+      when(resolver.getProperty("protobufMessageName", String.class))
+          .thenReturn(Optional.of("test.Sensor"));
+
+      when(resolver.getProperty("protobufMessageNameForKey", String.class))
+          .thenReturn(Optional.of("test.AddressBook"));
+
+      when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
+          .thenReturn(Optional.of(
+              Map.of(
+                  "topic1", "test.Sensor",
+                  "topic2", "test.AddressBook")));
+
+      when(resolver.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class))
+          .thenReturn(Optional.of(
+              Map.of(
+                  "topic1", "test.Person",
+                  "topic2", "test.AnotherPerson")));
+
+      var configuration = Configuration.create(resolver);
+
+      assertThat(configuration.defaultMessageDescriptor())
+          .matches(d -> d.getFullName().equals("test.Sensor"));
+      assertThat(configuration.defaultKeyMessageDescriptor())
+          .matches(d -> d.getFullName().equals("test.AddressBook"));
+
+      assertThat(configuration.messageDescriptorMap())
+          .containsOnlyKeys("topic1", "topic2")
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.Sensor"))
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.AddressBook"));
+
+      assertThat(configuration.keyMessageDescriptorMap())
+          .containsOnlyKeys("topic1", "topic2")
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.Person"))
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.AnotherPerson"));
+    }
+
+    @Test
+    void createConfigureFillsDescriptorMappingsWhenProtoFileDirProvided() throws Exception {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getProperty("protobufMessageName", String.class))
+          .thenReturn(Optional.of("test.Sensor"));
+
+      when(resolver.getProperty("protobufMessageNameForKey", String.class))
+          .thenReturn(Optional.of("test.AddressBook"));
+
+      when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
+          .thenReturn(Optional.of(
+              Map.of(
+                  "topic1", "test.Sensor",
+                  "topic2", "test.LanguageDescription")));
+
+      when(resolver.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class))
+          .thenReturn(Optional.of(
+              Map.of(
+                  "topic1", "test.Person",
+                  "topic2", "test.AnotherPerson")));
+
+      var configuration = Configuration.create(resolver);
+
+      assertThat(configuration.defaultMessageDescriptor())
+          .matches(d -> d.getFullName().equals("test.Sensor"));
+      assertThat(configuration.defaultKeyMessageDescriptor())
+          .matches(d -> d.getFullName().equals("test.AddressBook"));
+
+      assertThat(configuration.messageDescriptorMap())
+          .containsOnlyKeys("topic1", "topic2")
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.Sensor"))
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.LanguageDescription"));
+
+      assertThat(configuration.keyMessageDescriptorMap())
+          .containsOnlyKeys("topic1", "topic2")
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.Person"))
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.AnotherPerson"));
+    }
   }
 
   @Test
-  void testDeserializeMultipleProtobuf() {
+  void deserializeUsesTopicsMappingToFindMsgDescriptor() {
     var messageNameMap = Map.of(
         "persons", personDescriptor,
         "books", addressBookDescriptor,
-        "sensors", sensorDescriptor
+        "langs", langDescriptionDescriptor
     );
     var keyMessageNameMap = Map.of(
         "books", addressBookDescriptor);
     var serde = new ProtobufFileSerde();
     serde.configure(
-        null,
-        null,
-        descriptorPaths,
-        messageNameMap,
-        keyMessageNameMap
+        new Configuration(
+            null,
+            null,
+            descriptorPaths,
+            messageNameMap,
+            keyMessageNameMap
+        )
     );
 
     var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
@@ -128,20 +309,22 @@ class ProtobufFileSerdeTest {
         .deserialize(null, addressBookMessageBytes);
     assertJsonEquals(sampleBookMsgJson, deserializedBook.getResult());
 
-    var deserializedSensor = serde.deserializer("sensors", Serde.Target.VALUE)
-        .deserialize(null, sensorMessageBytes);
-    assertJsonEquals(sampleSensorMsgJson, deserializedSensor.getResult());
+    var deserializedSensor = serde.deserializer("langs", Serde.Target.VALUE)
+        .deserialize(null, langDescriptionMessageBytes);
+    assertJsonEquals(sampleLangDescriptionMsgJson, deserializedSensor.getResult());
   }
 
   @Test
-  void testDefaultMessageName() {
+  void deserializeUsesDefaultDescriptorIfTopicMappingNotFound() {
     var serde = new ProtobufFileSerde();
     serde.configure(
-        personDescriptor,
-        addressBookDescriptor,
-        descriptorPaths,
-        Map.of(),
-        Map.of()
+        new Configuration(
+            personDescriptor,
+            addressBookDescriptor,
+            descriptorPaths,
+            Map.of(),
+            Map.of()
+        )
     );
 
     var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
@@ -154,230 +337,57 @@ class ProtobufFileSerdeTest {
   }
 
   @Test
-  void testSerialize() {
-    var messageNameMap = Map.of(
-        "persons", personDescriptor,
-        "books", addressBookDescriptor
-    );
-    var keyMessageNameMap = Map.of(
-        "books", addressBookDescriptor);
-
-    var serde = new ProtobufFileSerde();
-    serde.configure(
-        null,
-        null,
-        descriptorPaths,
-        messageNameMap,
-        keyMessageNameMap
-    );
-
-    var personBytes = serde.serializer("persons", Serde.Target.VALUE)
-        .serialize("{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }");
-    assertThat(personBytes).isEqualTo(personMessageBytes);
-
-    var booksBytes = serde.serializer("books", Serde.Target.KEY)
-        .serialize("{\"version\": 1, \"people\": ["
-            + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\" }]}");
-    assertThat(booksBytes).isEqualTo(addressBookMessageBytes);
-  }
-
-  @Test
-  void testSerializeMultipleProtobuf() {
+  void serializeUsesTopicsMappingToFindMsgDescriptor() {
     var messageNameMap = Map.of(
         "persons", personDescriptor,
         "books", addressBookDescriptor,
-        "sensors", sensorDescriptor
+        "langs", langDescriptionDescriptor
     );
     var keyMessageNameMap = Map.of(
         "books", addressBookDescriptor);
 
     var serde = new ProtobufFileSerde();
     serde.configure(
-        null,
-        null,
-        descriptorPaths,
-        messageNameMap,
-        keyMessageNameMap
+        new Configuration(
+            null,
+            null,
+            descriptorPaths,
+            messageNameMap,
+            keyMessageNameMap
+        )
     );
 
-    var personBytes = serde.serializer("persons", Serde.Target.VALUE)
-        .serialize("{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }");
-    assertThat(personBytes).isEqualTo(personMessageBytes);
+    var personBytes = serde.serializer("langs", Serde.Target.VALUE)
+        .serialize(sampleLangDescriptionMsgJson);
+    assertThat(personBytes).isEqualTo(langDescriptionMessageBytes);
 
     var booksBytes = serde.serializer("books", Serde.Target.KEY)
-        .serialize("{\"version\": 1, \"people\": ["
-            + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\" }]}");
+        .serialize(sampleBookMsgJson);
     assertThat(booksBytes).isEqualTo(addressBookMessageBytes);
-
-    var sensorBytes = serde.serializer("sensors", Serde.Target.VALUE)
-        .serialize("{ \"name\": \"My Sensor\", \"temperature\": 20.5, \"humidity\": 50, \"door\": \"OPEN\" }");
-    assertThat(sensorBytes).isEqualTo(sensorMessageBytes);
   }
 
   @Test
-  void testSerializeDefaults() {
+  void serializeUsesDefaultDescriptorIfTopicMappingNotFound() {
     var serde = new ProtobufFileSerde();
     serde.configure(
-        personDescriptor,
-        addressBookDescriptor,
-        descriptorPaths,
-        Map.of(),
-        Map.of()
+        new Configuration(
+            personDescriptor,
+            addressBookDescriptor,
+            descriptorPaths,
+            Map.of(),
+            Map.of()
+        )
     );
 
     var personBytes = serde.serializer("persons", Serde.Target.VALUE)
-        .serialize("{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }");
+        .serialize(samplePersonMsgJson);
     assertThat(personBytes).isEqualTo(personMessageBytes);
 
     var booksBytes = serde.serializer("books", Serde.Target.KEY)
-        .serialize("{\"version\": 1, \"people\": ["
-            + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\" }]}");
+        .serialize(sampleBookMsgJson);
     assertThat(booksBytes).isEqualTo(addressBookMessageBytes);
   }
 
-  @Test
-  void canBeAutoConfiguredReturnsFalseIfNoProtoFilesHaveBeenProvided() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isFalse();
-  }
-
-  @Test
-  void canBeAutoConfiguredReturnsFalseIfProtoFilesListIsEmpty() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of()));
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isFalse();
-  }
-
-  @Test
-  void canBeAutoConfiguredReturnsTrueIfNoProtoFileHasBeenProvided() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getProperty("protobufFile", String.class)).thenReturn(Optional.of("file.proto"));
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isTrue();
-  }
-
-  @Test
-  void canBeAutoConfiguredReturnsTrueIfProtoFilesHasBeenProvided() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of("file.proto")));
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isTrue();
-  }
-
-  @Test
-  void canBeAutoConfiguredReturnsTrueIfProtoFileAndProtoFilesHaveBeenProvided() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getProperty("protobufFile", String.class)).thenReturn(Optional.of("file1.proto"));
-    when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of("file2.proto")));
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isTrue();
-  }
-
-  @Test
-  void listOfProtobufFilesIsJoined() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getProperty("protobufFile", String.class))
-        .thenReturn(Optional.of(addressBookSchemaPath.toString()));
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.AddressBook"));
-
-    Map<String, String> protobufMessageNameByTopic = Map.of(
-        "persons", "test.Person",
-        "books", "test.AddressBook",
-        "sensors", "iot.Sensor");
-    when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
-        .thenReturn(Optional.of(protobufMessageNameByTopic));
-
-    var serde = new ProtobufFileSerde();
-    serde.configure(resolver, resolver, resolver);
-
-    var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
-        .deserialize(null, personMessageBytes);
-    assertJsonEquals(samplePersonMsgJson, deserializedPerson.getResult());
-
-    var deserializedSensor = serde.deserializer("sensors", Serde.Target.VALUE)
-        .deserialize(null, sensorMessageBytes);
-    assertJsonEquals(sampleSensorMsgJson, deserializedSensor.getResult());
-  }
-
-  @Test
-  void unknownSchemaAsDefaultThrowsException() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.NotExistent"));
-
-    var serde = new ProtobufFileSerde();
-    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
-        .isInstanceOf(NullPointerException.class)
-        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
-  }
-
-  @Test
-  void unknownSchemaAsDefaultForKeyThrowsException() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.AddressBook"));
-    when(resolver.getProperty("protobufMessageNameForKey", String.class))
-        .thenReturn(Optional.of("test.NotExistent"));
-
-    var serde = new ProtobufFileSerde();
-    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
-        .isInstanceOf(NullPointerException.class)
-        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
-  }
-
-  @Test
-  void unknownSchemaAsTopicSchemaThrowsException() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.AddressBook"));
-
-    when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
-        .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
-
-    var serde = new ProtobufFileSerde();
-    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
-        .isInstanceOf(NullPointerException.class)
-        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
-  }
-
-  @Test
-  void unknownSchemaAsTopicSchemaForKeyThrowsException() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.AddressBook"));
-
-    when(resolver.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class))
-        .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
-
-    var serde = new ProtobufFileSerde();
-    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
-        .isInstanceOf(NullPointerException.class)
-        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
-  }
-
   @SneakyThrows
   private void assertJsonEquals(String expectedJson, String actualJson) {
     var mapper = new JsonMapper();

+ 5 - 7
kafka-ui-api/src/test/resources/address-book.proto → kafka-ui-api/src/test/resources/protobuf-serde/address-book.proto

@@ -1,16 +1,10 @@
-// [START declaration]
 syntax = "proto3";
 package test;
 
-// [END declaration]
-
-// [START java_declaration]
 option java_multiple_files = true;
 option java_package = "com.example.tutorial.protos";
 option java_outer_classname = "AddressBookProtos";
-// [END java_declaration]
 
-// [START messages]
 message Person {
   string name = 1;
   int32 id = 2;  // Unique ID number for this person.
@@ -31,9 +25,13 @@ message Person {
 
 }
 
+message AnotherPerson {
+    string name = 1;
+    string surname = 2;
+}
+
 // Our address book file is just one of these.
 message AddressBook {
   int32 version = 1;
   repeated Person people = 2;
 }
-// [END messages]

+ 11 - 0
kafka-ui-api/src/test/resources/protobuf-serde/lang-description.proto

@@ -0,0 +1,11 @@
+syntax = "proto3";
+
+package test;
+
+import "language/language.proto";
+import "google/protobuf/wrappers.proto";
+
+message LanguageDescription {
+    test.lang.Language lang = 1;
+    google.protobuf.StringValue descr = 2;
+}

+ 11 - 0
kafka-ui-api/src/test/resources/protobuf-serde/language/language.proto

@@ -0,0 +1,11 @@
+syntax = "proto3";
+package test.lang;
+
+enum Language {
+    DE = 0;
+    EN = 1;
+    ES = 2;
+    FR = 3;
+    PL = 4;
+    RU = 5;
+}

+ 1 - 1
kafka-ui-api/src/test/resources/sensor.proto → kafka-ui-api/src/test/resources/protobuf-serde/sensor.proto

@@ -1,5 +1,5 @@
 syntax = "proto3";
-package iot;
+package test;
 
 message Sensor {
     string name = 1;