Pārlūkot izejas kodu

Merge branch 'master' into vlad/develop

VladSenyuta 2 gadi atpakaļ
vecāks
revīzija
4fc55c1a2d

+ 1 - 2
documentation/compose/kafka-ui-serdes.yaml

@@ -28,8 +28,7 @@ services:
             kafka.clusters.0.serde.0.name: ProtobufFile
             kafka.clusters.0.serde.0.topicKeysPattern: "topic1"
             kafka.clusters.0.serde.0.topicValuesPattern: "topic1"
-            kafka.clusters.0.serde.0.properties.protobufFiles.0: /protofiles/key-types.proto
-            kafka.clusters.0.serde.0.properties.protobufFiles.1: /protofiles/values.proto
+            kafka.clusters.0.serde.0.properties.protobufFilesDir: /protofiles/
             kafka.clusters.0.serde.0.properties.protobufMessageNameForKey: test.MyKey # default type for keys
             kafka.clusters.0.serde.0.properties.protobufMessageName: test.MyValue # default type for values
             kafka.clusters.0.serde.0.properties.protobufMessageNameForKeyByTopic.topic1: test.MySpecificTopicKey # keys type for topic "topic1"

+ 4 - 0
documentation/compose/proto/key-types.proto

@@ -1,11 +1,15 @@
 syntax = "proto3";
 package test;
 
+import "google/protobuf/wrappers.proto";
+
 message MyKey {
     string myKeyF1 = 1;
+    google.protobuf.UInt64Value uint_64_wrapper = 2;
 }
 
 message MySpecificTopicKey {
     string special_field1 = 1;
     string special_field2 = 2;
+    google.protobuf.FloatValue float_wrapper = 3;
 }

+ 13 - 9
documentation/guides/Protobuf.md

@@ -12,22 +12,26 @@ To configure Kafkaui to deserialize protobuf messages using a supplied protobuf
 ```yaml
 kafka:
   clusters:
-    - # Cluster configuration omitted.
-      # protobufFile is the path to the protobuf schema. (deprecated: please use "protobufFiles")
+    - # Cluster configuration omitted...
+      # protobufFilesDir specifies root location for proto files (will be scanned recursively)
+      # NOTE: if 'protobufFilesDir' specified, then 'protobufFile' and 'protobufFiles' settings will be ignored
+      protobufFilesDir: "/path/to/my-protobufs"
+      # (DEPRECATED) protobufFile is the path to the protobuf schema. (deprecated: please use "protobufFiles")
       protobufFile: path/to/my.proto
-      # protobufFiles is the path to one or more protobuf schemas.
-      protobufFiles: 
-        - /path/to/my.proto
-        - /path/to/another.proto
-      # protobufMessageName is the default protobuf type that is used to deserilize
-      # the message's value if the topic is not found in protobufMessageNameByTopic.
+      # (DEPRECATED) protobufFiles is the location of one or more protobuf schemas
+      protobufFiles:
+        - /path/to/my-protobufs/my.proto
+        - /path/to/my-protobufs/another.proto
+        - /path/to/my-protobufs:test/test.proto
+      # protobufMessageName is the default protobuf type that is used to deserialize
+      # the message's value if the topic is not found in protobufMessageNameByTopic.    
       protobufMessageName: my.DefaultValType
       # protobufMessageNameByTopic is a mapping of topic names to protobuf types.
       # This mapping is required and is used to deserialize the Kafka message's value.
       protobufMessageNameByTopic:
         topic1: my.Type1
         topic2: my.Type2
-      # protobufMessageNameForKey is the default protobuf type that is used to deserilize
+      # protobufMessageNameForKey is the default protobuf type that is used to deserialize
       # the message's key if the topic is not found in protobufMessageNameForKeyByTopic.
       protobufMessageNameForKey: my.DefaultKeyType
       # protobufMessageNameForKeyByTopic is a mapping of topic names to protobuf types.

+ 2 - 4
documentation/guides/Serialization.md

@@ -46,10 +46,8 @@ kafka:
       serde:
         - name: ProtobufFile
           properties:
-            # path to the protobuf schema files
-            protobufFiles:
-              - path/to/my.proto
-              - path/to/another.proto
+            # path to the protobuf schema files directory
+            protobufFilesDir: "path/to/protofiles"
             # default protobuf type that is used for KEY serialization/deserialization
             # optional
             protobufMessageNameForKey: my.Type1

+ 285 - 112
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerde.java

@@ -1,9 +1,36 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.AnyProto;
+import com.google.protobuf.ApiProto;
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.DurationProto;
 import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.EmptyProto;
+import com.google.protobuf.FieldMaskProto;
+import com.google.protobuf.SourceContextProto;
+import com.google.protobuf.StructProto;
+import com.google.protobuf.TimestampProto;
+import com.google.protobuf.TypeProto;
+import com.google.protobuf.WrappersProto;
 import com.google.protobuf.util.JsonFormat;
+import com.google.type.ColorProto;
+import com.google.type.DateProto;
+import com.google.type.DateTimeProto;
+import com.google.type.DayOfWeekProto;
+import com.google.type.ExprProto;
+import com.google.type.FractionProto;
+import com.google.type.IntervalProto;
+import com.google.type.LatLngProto;
+import com.google.type.MoneyProto;
+import com.google.type.MonthProto;
+import com.google.type.PhoneNumberProto;
+import com.google.type.PostalAddressProto;
+import com.google.type.QuaternionProto;
+import com.google.type.TimeOfDayProto;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
@@ -11,13 +38,19 @@ import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
+import com.squareup.wire.schema.ErrorCollector;
+import com.squareup.wire.schema.Linker;
+import com.squareup.wire.schema.Loader;
+import com.squareup.wire.schema.Location;
+import com.squareup.wire.schema.ProtoFile;
+import com.squareup.wire.schema.internal.parser.ProtoFileElement;
+import com.squareup.wire.schema.internal.parser.ProtoParser;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -28,7 +61,10 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.jetbrains.annotations.NotNull;
 
+@Slf4j
 public class ProtobufFileSerde implements BuiltInSerde {
 
   public static String name() {
@@ -51,132 +87,35 @@ public class ProtobufFileSerde implements BuiltInSerde {
   @Override
   public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties,
                                      PropertyResolver globalProperties) {
-    Optional<String> protobufFile = kafkaClusterProperties.getProperty("protobufFile", String.class);
-    Optional<List<String>> protobufFiles = kafkaClusterProperties.getListProperty("protobufFiles", String.class);
-    return protobufFile.isPresent() || protobufFiles.filter(files -> !files.isEmpty()).isPresent();
+    return Configuration.canBeAutoConfigured(kafkaClusterProperties);
   }
 
   @Override
   public void autoConfigure(PropertyResolver kafkaClusterProperties,
                             PropertyResolver globalProperties) {
-    configure(kafkaClusterProperties);
+    configure(Configuration.create(kafkaClusterProperties));
   }
 
   @Override
   public void configure(PropertyResolver serdeProperties,
                         PropertyResolver kafkaClusterProperties,
                         PropertyResolver globalProperties) {
-    configure(serdeProperties);
-  }
-
-  private void configure(PropertyResolver properties) {
-    Map<Path, ProtobufSchema> protobufSchemas = joinPathProperties(properties).stream()
-        .map(path -> Map.entry(path, new ProtobufSchema(readFileAsString(path))))
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
-    // Load all referenced message schemas and store their source proto file with the descriptors
-    Map<Descriptor, Path> descriptorPaths = new HashMap<>();
-    Optional<String> protobufMessageName = properties.getProperty("protobufMessageName", String.class);
-    protobufMessageName.ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
-
-    Optional<String> protobufMessageNameForKey =
-        properties.getProperty("protobufMessageNameForKey", String.class);
-    protobufMessageNameForKey
-        .ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
-
-    Optional<Map<String, String>> protobufMessageNameByTopic =
-        properties.getMapProperty("protobufMessageNameByTopic", String.class, String.class);
-    protobufMessageNameByTopic
-        .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
-
-    Optional<Map<String, String>> protobufMessageNameForKeyByTopic =
-        properties.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class);
-    protobufMessageNameForKeyByTopic
-        .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
-
-    // Fill dictionary for descriptor lookup by full message name
-    Map<String, Descriptor> descriptorMap = descriptorPaths.keySet().stream()
-        .collect(Collectors.toMap(Descriptor::getFullName, Function.identity()));
-
-    configure(
-        protobufMessageName.map(descriptorMap::get).orElse(null),
-        protobufMessageNameForKey.map(descriptorMap::get).orElse(null),
-        descriptorPaths,
-        protobufMessageNameByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of()),
-        protobufMessageNameForKeyByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of())
-    );
+    configure(Configuration.create(serdeProperties));
   }
 
   @VisibleForTesting
-  void configure(
-      @Nullable Descriptor defaultMessageDescriptor,
-      @Nullable Descriptor defaultKeyMessageDescriptor,
-      Map<Descriptor, Path> descriptorPaths,
-      Map<String, Descriptor> messageDescriptorMap,
-      Map<String, Descriptor> keyMessageDescriptorMap) {
-    if (defaultMessageDescriptor == null
-        && defaultKeyMessageDescriptor == null
-        && messageDescriptorMap.isEmpty()
-        && keyMessageDescriptorMap.isEmpty()) {
+  void configure(Configuration configuration) {
+    if (configuration.defaultMessageDescriptor() == null
+        && configuration.defaultKeyMessageDescriptor() == null
+        && configuration.messageDescriptorMap().isEmpty()
+        && configuration.keyMessageDescriptorMap().isEmpty()) {
       throw new ValidationException("Neither default, not per-topic descriptors defined for " + name() + " serde");
     }
-    this.defaultMessageDescriptor = defaultMessageDescriptor;
-    this.defaultKeyMessageDescriptor = defaultKeyMessageDescriptor;
-    this.descriptorPaths = descriptorPaths;
-    this.messageDescriptorMap = messageDescriptorMap;
-    this.keyMessageDescriptorMap = keyMessageDescriptorMap;
-  }
-
-  private static void addProtobufSchema(Map<Descriptor, Path> descriptorPaths,
-                                 Map<Path, ProtobufSchema> protobufSchemas,
-                                 String messageName) {
-    var descriptorAndPath = getDescriptorAndPath(protobufSchemas, messageName);
-    descriptorPaths.put(descriptorAndPath.getKey(), descriptorAndPath.getValue());
-  }
-
-  private static void addProtobufSchemas(Map<Descriptor, Path> descriptorPaths,
-                                  Map<Path, ProtobufSchema> protobufSchemas,
-                                  Map<String, String> messageNamesByTopic) {
-    messageNamesByTopic.values().stream()
-        .map(msgName -> getDescriptorAndPath(protobufSchemas, msgName))
-        .forEach(entry -> descriptorPaths.put(entry.getKey(), entry.getValue()));
-  }
-
-  private static List<Path> joinPathProperties(PropertyResolver propertyResolver) {
-    return Stream.concat(
-            propertyResolver.getProperty("protobufFile", String.class).map(List::of).stream(),
-            propertyResolver.getListProperty("protobufFiles", String.class).stream())
-        .flatMap(Collection::stream)
-        .distinct()
-        .map(Path::of)
-        .collect(Collectors.toList());
-  }
-
-  private static Map.Entry<Descriptor, Path> getDescriptorAndPath(Map<Path, ProtobufSchema> protobufSchemas,
-                                                                  String msgName) {
-    return protobufSchemas.entrySet().stream()
-            .filter(schema -> schema.getValue().toDescriptor(msgName) != null)
-            .map(schema -> Map.entry(schema.getValue().toDescriptor(msgName), schema.getKey()))
-            .findFirst()
-            .orElseThrow(() -> new NullPointerException(
-                    "The given message type not found in protobuf definition: " + msgName));
-  }
-
-  private static String readFileAsString(Path path) {
-    try {
-      return Files.readString(path);
-    } catch (IOException e) {
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  private Map<String, Descriptor> populateDescriptors(Map<String, Descriptor> descriptorMap,
-                                                      Map<String, String> messageNameMap) {
-    Map<String, Descriptor> descriptors = new HashMap<>();
-    for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
-      descriptors.put(entry.getKey(), descriptorMap.get(entry.getValue()));
-    }
-    return descriptors;
+    this.defaultMessageDescriptor = configuration.defaultMessageDescriptor();
+    this.defaultKeyMessageDescriptor = configuration.defaultKeyMessageDescriptor();
+    this.descriptorPaths = configuration.descriptorPaths();
+    this.messageDescriptorMap = configuration.messageDescriptorMap();
+    this.keyMessageDescriptorMap = configuration.keyMessageDescriptorMap();
   }
 
   @Override
@@ -249,4 +188,238 @@ public class ProtobufFileSerde implements BuiltInSerde {
         Map.of("messageName", descriptor.getFullName())
     );
   }
+
+  @SneakyThrows
+  private static String readFileAsString(Path path) {
+    return Files.readString(path);
+  }
+
+  //----------------------------------------------------------------------------------------------------------------
+
+  @VisibleForTesting
+  record Configuration(@Nullable Descriptor defaultMessageDescriptor,
+                       @Nullable Descriptor defaultKeyMessageDescriptor,
+                       Map<Descriptor, Path> descriptorPaths,
+                       Map<String, Descriptor> messageDescriptorMap,
+                       Map<String, Descriptor> keyMessageDescriptorMap) {
+
+    static boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties) {
+      Optional<String> protobufFile = kafkaClusterProperties.getProperty("protobufFile", String.class);
+      Optional<List<String>> protobufFiles = kafkaClusterProperties.getListProperty("protobufFiles", String.class);
+      Optional<String> protobufFilesDir = kafkaClusterProperties.getProperty("protobufFilesDir", String.class);
+      return protobufFilesDir.isPresent()
+          || protobufFile.isPresent()
+          || protobufFiles.filter(files -> !files.isEmpty()).isPresent();
+    }
+
+    static Configuration create(PropertyResolver properties) {
+      var protobufSchemas = loadSchemas(
+          properties.getProperty("protobufFile", String.class),
+          properties.getListProperty("protobufFiles", String.class),
+          properties.getProperty("protobufFilesDir", String.class)
+      );
+
+      // Load all referenced message schemas and store their source proto file with the descriptors
+      Map<Descriptor, Path> descriptorPaths = new HashMap<>();
+      Optional<String> protobufMessageName = properties.getProperty("protobufMessageName", String.class);
+      protobufMessageName.ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
+
+      Optional<String> protobufMessageNameForKey =
+          properties.getProperty("protobufMessageNameForKey", String.class);
+      protobufMessageNameForKey
+          .ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
+
+      Optional<Map<String, String>> protobufMessageNameByTopic =
+          properties.getMapProperty("protobufMessageNameByTopic", String.class, String.class);
+      protobufMessageNameByTopic
+          .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
+
+      Optional<Map<String, String>> protobufMessageNameForKeyByTopic =
+          properties.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class);
+      protobufMessageNameForKeyByTopic
+          .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
+
+      // Fill dictionary for descriptor lookup by full message name
+      Map<String, Descriptor> descriptorMap = descriptorPaths.keySet().stream()
+          .collect(Collectors.toMap(Descriptor::getFullName, Function.identity()));
+
+      return new Configuration(
+          protobufMessageName.map(descriptorMap::get).orElse(null),
+          protobufMessageNameForKey.map(descriptorMap::get).orElse(null),
+          descriptorPaths,
+          protobufMessageNameByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of()),
+          protobufMessageNameForKeyByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of())
+      );
+    }
+
+    private static Map.Entry<Descriptor, Path> getDescriptorAndPath(Map<Path, ProtobufSchema> protobufSchemas,
+                                                                    String msgName) {
+      return protobufSchemas.entrySet().stream()
+          .filter(schema -> schema.getValue().toDescriptor(msgName) != null)
+          .map(schema -> Map.entry(schema.getValue().toDescriptor(msgName), schema.getKey()))
+          .findFirst()
+          .orElseThrow(() -> new NullPointerException(
+              "The given message type not found in protobuf definition: " + msgName));
+    }
+
+    private static Map<String, Descriptor> populateDescriptors(Map<String, Descriptor> descriptorMap,
+                                                               Map<String, String> messageNameMap) {
+      Map<String, Descriptor> descriptors = new HashMap<>();
+      for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
+        descriptors.put(entry.getKey(), descriptorMap.get(entry.getValue()));
+      }
+      return descriptors;
+    }
+
+    @VisibleForTesting
+    static Map<Path, ProtobufSchema> loadSchemas(Optional<String> protobufFile,
+                                                 Optional<List<String>> protobufFiles,
+                                                 Optional<String> protobufFilesDir) {
+      if (protobufFilesDir.isPresent()) {
+        if (protobufFile.isPresent() || protobufFiles.isPresent()) {
+          log.warn("protobufFile and protobufFiles properties will be ignored, since protobufFilesDir provided");
+        }
+        List<ProtoFile> loadedFiles = new ProtoSchemaLoader(protobufFilesDir.get()).load();
+        Map<String, ProtoFileElement> allPaths = loadedFiles.stream()
+            .collect(Collectors.toMap(f -> f.getLocation().getPath(), ProtoFile::toElement));
+        return loadedFiles.stream()
+            .collect(Collectors.toMap(
+                f -> Path.of(f.getLocation().getBase(), f.getLocation().getPath()),
+                f -> new ProtobufSchema(f.toElement(), List.of(), allPaths)));
+      }
+      //Supporting for backward-compatibility. Normally, protobufFilesDir setting should be used
+      return Stream.concat(
+              protobufFile.stream(),
+              protobufFiles.stream().flatMap(Collection::stream)
+          )
+          .distinct()
+          .map(Path::of)
+          .collect(Collectors.toMap(path -> path, path -> new ProtobufSchema(readFileAsString(path))));
+    }
+
+    private static void addProtobufSchema(Map<Descriptor, Path> descriptorPaths,
+                                          Map<Path, ProtobufSchema> protobufSchemas,
+                                          String messageName) {
+      var descriptorAndPath = getDescriptorAndPath(protobufSchemas, messageName);
+      descriptorPaths.put(descriptorAndPath.getKey(), descriptorAndPath.getValue());
+    }
+
+    private static void addProtobufSchemas(Map<Descriptor, Path> descriptorPaths,
+                                           Map<Path, ProtobufSchema> protobufSchemas,
+                                           Map<String, String> messageNamesByTopic) {
+      messageNamesByTopic.values().stream()
+          .map(msgName -> getDescriptorAndPath(protobufSchemas, msgName))
+          .forEach(entry -> descriptorPaths.put(entry.getKey(), entry.getValue()));
+    }
+  }
+
+  static class ProtoSchemaLoader {
+
+    private final Path baseLocation;
+
+    ProtoSchemaLoader(String baseLocationStr) {
+      this.baseLocation = Path.of(baseLocationStr);
+      if (!Files.isReadable(baseLocation)) {
+        throw new ValidationException("proto files directory not readable");
+      }
+    }
+
+    List<ProtoFile> load() {
+      Map<String, ProtoFile> knownTypes = knownProtoFiles();
+
+      Map<String, ProtoFile> filesByLocations = new HashMap<>();
+      filesByLocations.putAll(knownTypes);
+      filesByLocations.putAll(loadFilesWithLocations());
+
+      Linker linker = new Linker(
+          createFilesLoader(filesByLocations),
+          new ErrorCollector(),
+          true,
+          true
+      );
+      var schema = linker.link(filesByLocations.values());
+      linker.getErrors().throwIfNonEmpty();
+      return schema.getProtoFiles()
+          .stream()
+          .filter(p -> !knownTypes.containsKey(p.getLocation().getPath())) //filtering known types
+          .toList();
+    }
+
+    private Map<String, ProtoFile> knownProtoFiles() {
+      return Stream.of(
+          loadKnownProtoFile("google/type/color.proto", ColorProto.getDescriptor()),
+          loadKnownProtoFile("google/type/date.proto", DateProto.getDescriptor()),
+          loadKnownProtoFile("google/type/datetime.proto", DateTimeProto.getDescriptor()),
+          loadKnownProtoFile("google/type/dayofweek.proto", DayOfWeekProto.getDescriptor()),
+          loadKnownProtoFile("google/type/decimal.proto", com.google.type.DecimalProto.getDescriptor()),
+          loadKnownProtoFile("google/type/expr.proto", ExprProto.getDescriptor()),
+          loadKnownProtoFile("google/type/fraction.proto", FractionProto.getDescriptor()),
+          loadKnownProtoFile("google/type/interval.proto", IntervalProto.getDescriptor()),
+          loadKnownProtoFile("google/type/latlng.proto", LatLngProto.getDescriptor()),
+          loadKnownProtoFile("google/type/money.proto", MoneyProto.getDescriptor()),
+          loadKnownProtoFile("google/type/month.proto", MonthProto.getDescriptor()),
+          loadKnownProtoFile("google/type/phone_number.proto", PhoneNumberProto.getDescriptor()),
+          loadKnownProtoFile("google/type/postal_address.proto", PostalAddressProto.getDescriptor()),
+          loadKnownProtoFile("google/type/quaternion.prot", QuaternionProto.getDescriptor()),
+          loadKnownProtoFile("google/type/timeofday.proto", TimeOfDayProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/any.proto", AnyProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/api.proto", ApiProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/descriptor.proto", DescriptorProtos.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/duration.proto", DurationProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/empty.proto", EmptyProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/field_mask.proto", FieldMaskProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/source_context.proto", SourceContextProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/struct.proto", StructProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/timestamp.proto", TimestampProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/type.proto", TypeProto.getDescriptor()),
+          loadKnownProtoFile("google/protobuf/wrappers.proto", WrappersProto.getDescriptor())
+      ).collect(Collectors.toMap(p -> p.getLocation().getPath(), p -> p));
+    }
+
+    private ProtoFile loadKnownProtoFile(String path, Descriptors.FileDescriptor fileDescriptor) {
+      String protoFileString = null;
+      // know type file contains either message or enum
+      if (!fileDescriptor.getMessageTypes().isEmpty()) {
+        protoFileString = new ProtobufSchema(fileDescriptor.getMessageTypes().get(0)).canonicalString();
+      } else if (!fileDescriptor.getEnumTypes().isEmpty()) {
+        protoFileString = new ProtobufSchema(fileDescriptor.getEnumTypes().get(0)).canonicalString();
+      } else {
+        throw new IllegalStateException();
+      }
+      return ProtoFile.Companion.get(ProtoParser.Companion.parse(Location.get(path), protoFileString));
+    }
+
+    private Loader createFilesLoader(Map<String, ProtoFile> files) {
+      return new Loader() {
+        @Override
+        public @NotNull ProtoFile load(@NotNull String path) {
+          return Preconditions.checkNotNull(files.get(path), "ProtoFile not found for import '%s'", path);
+        }
+
+        @Override
+        public @NotNull Loader withErrors(@NotNull ErrorCollector errorCollector) {
+          return this;
+        }
+      };
+    }
+
+    @SneakyThrows
+    private Map<String, ProtoFile> loadFilesWithLocations() {
+      Map<String, ProtoFile> filesByLocations = new HashMap<>();
+      try (var files = Files.walk(baseLocation)) {
+        files.filter(p -> !Files.isDirectory(p) && p.toString().endsWith(".proto"))
+            .forEach(path -> {
+              // relative path will be used as "import" statement
+              String relativePath = baseLocation.relativize(path).toString();
+              var protoFileElement = ProtoParser.Companion.parse(
+                  Location.get(baseLocation.toString(), relativePath),
+                  readFileAsString(path)
+              );
+              filesByLocations.put(relativePath, ProtoFile.Companion.get(protoFileElement));
+            });
+      }
+      return filesByLocations;
+    }
+  }
+
 }

+ 263 - 253
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerdeTest.java

@@ -10,14 +10,16 @@ import com.google.protobuf.Descriptors;
 import com.google.protobuf.util.JsonFormat;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde.Configuration;
+import com.squareup.wire.schema.ProtoFile;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import lombok.SneakyThrows;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.springframework.util.ResourceUtils;
 
@@ -29,28 +31,29 @@ class ProtobufFileSerdeTest {
   private static final String sampleBookMsgJson = "{\"version\": 1, \"people\": ["
       + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\", \"phones\":[]}]}";
 
-  private static final String sampleSensorMsgJson = "{ \"name\": \"My Sensor\", "
-      + "\"temperature\": 20.5, \"humidity\": 50, \"door\": \"OPEN\" }";
+  private static final String sampleLangDescriptionMsgJson = "{ \"lang\": \"EN\", "
+      + "\"descr\": \"Some description here\" }";
 
   // Sample message of type `test.Person`
   private byte[] personMessageBytes;
   // Sample message of type `test.AddressBook`
   private byte[] addressBookMessageBytes;
-  private byte[] sensorMessageBytes;
-  private Path addressBookSchemaPath;
-  private Path sensorSchemaPath;
-
+  private byte[] langDescriptionMessageBytes;
   private Descriptors.Descriptor personDescriptor;
   private Descriptors.Descriptor addressBookDescriptor;
-  private Descriptors.Descriptor sensorDescriptor;
+  private Descriptors.Descriptor langDescriptionDescriptor;
   private Map<Descriptors.Descriptor, Path> descriptorPaths;
 
   @BeforeEach
   void setUp() throws Exception {
-    addressBookSchemaPath = ResourceUtils.getFile("classpath:address-book.proto").toPath();
-    sensorSchemaPath = ResourceUtils.getFile("classpath:sensor.proto").toPath();
+    Map<Path, ProtobufSchema> files = ProtobufFileSerde.Configuration.loadSchemas(
+        Optional.empty(),
+        Optional.empty(),
+        Optional.of(protoFilesDir())
+    );
 
-    ProtobufSchema addressBookSchema = new ProtobufSchema(Files.readString(addressBookSchemaPath));
+    Path addressBookSchemaPath = ResourceUtils.getFile("classpath:protobuf-serde/address-book.proto").toPath();
+    var addressBookSchema = files.get(addressBookSchemaPath);
     var builder = addressBookSchema.newMessageBuilder("test.Person");
     JsonFormat.parser().merge(samplePersonMsgJson, builder);
     personMessageBytes = builder.build().toByteArray();
@@ -61,63 +64,241 @@ class ProtobufFileSerdeTest {
     personDescriptor = addressBookSchema.toDescriptor("test.Person");
     addressBookDescriptor = addressBookSchema.toDescriptor("test.AddressBook");
 
-    ProtobufSchema sensorSchema = new ProtobufSchema(Files.readString(sensorSchemaPath));
-    builder = sensorSchema.newMessageBuilder("iot.Sensor");
-    JsonFormat.parser().merge(sampleSensorMsgJson, builder);
-    sensorMessageBytes = builder.build().toByteArray();
-    sensorDescriptor = sensorSchema.toDescriptor("iot.Sensor");
+    Path languageDescriptionPath = ResourceUtils.getFile("classpath:protobuf-serde/lang-description.proto").toPath();
+    var languageDescriptionSchema = files.get(languageDescriptionPath);
+    builder = languageDescriptionSchema.newMessageBuilder("test.LanguageDescription");
+    JsonFormat.parser().merge(sampleLangDescriptionMsgJson, builder);
+    langDescriptionMessageBytes = builder.build().toByteArray();
+    langDescriptionDescriptor = languageDescriptionSchema.toDescriptor("test.LanguageDescription");
 
     descriptorPaths = Map.of(
         personDescriptor, addressBookSchemaPath,
-        addressBookDescriptor, addressBookSchemaPath,
-        sensorDescriptor, sensorSchemaPath
+        addressBookDescriptor, addressBookSchemaPath
     );
   }
 
-
   @Test
-  void testDeserialize() {
-    var messageNameMap = Map.of(
-        "persons", personDescriptor,
-        "books", addressBookDescriptor
-    );
-    var keyMessageNameMap = Map.of(
-        "books", addressBookDescriptor);
-
-    var serde = new ProtobufFileSerde();
-    serde.configure(
-        null,
-        null,
-        descriptorPaths,
-        messageNameMap,
-        keyMessageNameMap
-    );
+  void loadsAllProtoFiledFromTargetDirectory() throws Exception {
+    var protoDir = ResourceUtils.getFile("classpath:protobuf-serde/").getPath();
+    List<ProtoFile> files = new ProtobufFileSerde.ProtoSchemaLoader(protoDir).load();
+    assertThat(files).hasSize(4);
+    assertThat(files)
+        .map(f -> f.getLocation().getPath())
+        .containsExactlyInAnyOrder(
+            "language/language.proto",
+            "sensor.proto",
+            "address-book.proto",
+            "lang-description.proto"
+        );
+  }
 
-    var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
-        .deserialize(null, personMessageBytes);
-    assertJsonEquals(samplePersonMsgJson, deserializedPerson.getResult());
+  @SneakyThrows
+  private String protoFilesDir() {
+    return ResourceUtils.getFile("classpath:protobuf-serde/").getPath();
+  }
 
-    var deserializedBook = serde.deserializer("books", Serde.Target.KEY)
-        .deserialize(null, addressBookMessageBytes);
-    assertJsonEquals(sampleBookMsgJson, deserializedBook.getResult());
+  @Nested
+  class ConfigurationTests {
+
+    @Test
+    void canBeAutoConfiguredReturnsNoProtoPropertiesProvided() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      assertThat(Configuration.canBeAutoConfigured(resolver))
+          .isFalse();
+    }
+
+    @Test
+    void canBeAutoConfiguredReturnsTrueIfNoProtoFileHasBeenProvided() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFile", String.class))
+          .thenReturn(Optional.of("file.proto"));
+      assertThat(Configuration.canBeAutoConfigured(resolver))
+          .isTrue();
+    }
+
+    @Test
+    void canBeAutoConfiguredReturnsTrueIfProtoFilesHasBeenProvided() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getListProperty("protobufFiles", String.class))
+          .thenReturn(Optional.of(List.of("file.proto")));
+      assertThat(Configuration.canBeAutoConfigured(resolver))
+          .isTrue();
+    }
+
+    @Test
+    void canBeAutoConfiguredReturnsTrueIfProtoFilesDirProvided() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of("/filesDir"));
+      assertThat(Configuration.canBeAutoConfigured(resolver))
+          .isTrue();
+    }
+
+    @Test
+    void unknownSchemaAsDefaultThrowsException() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getProperty("protobufMessageName", String.class))
+          .thenReturn(Optional.of("test.NotExistent"));
+
+      assertThatThrownBy(() -> Configuration.create(resolver))
+          .isInstanceOf(NullPointerException.class)
+          .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+    }
+
+    @Test
+    void unknownSchemaAsDefaultForKeyThrowsException() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getProperty("protobufMessageNameForKey", String.class))
+          .thenReturn(Optional.of("test.NotExistent"));
+
+      assertThatThrownBy(() -> Configuration.create(resolver))
+          .isInstanceOf(NullPointerException.class)
+          .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+    }
+
+    @Test
+    void unknownSchemaAsTopicSchemaThrowsException() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
+          .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
+
+      assertThatThrownBy(() -> Configuration.create(resolver))
+          .isInstanceOf(NullPointerException.class)
+          .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+    }
+
+    @Test
+    void unknownSchemaAsTopicSchemaForKeyThrowsException() {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class))
+          .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
+
+      assertThatThrownBy(() -> Configuration.create(resolver))
+          .isInstanceOf(NullPointerException.class)
+          .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+    }
+
+    @Test
+    void createConfigureFillsDescriptorMappingsWhenProtoFilesListProvided() throws Exception {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFile", String.class))
+          .thenReturn(Optional.of(
+              ResourceUtils.getFile("classpath:protobuf-serde/sensor.proto").getPath()));
+
+      when(resolver.getListProperty("protobufFiles", String.class))
+          .thenReturn(Optional.of(
+              List.of(
+                  ResourceUtils.getFile("classpath:protobuf-serde/address-book.proto").getPath())));
+
+      when(resolver.getProperty("protobufMessageName", String.class))
+          .thenReturn(Optional.of("test.Sensor"));
+
+      when(resolver.getProperty("protobufMessageNameForKey", String.class))
+          .thenReturn(Optional.of("test.AddressBook"));
+
+      when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
+          .thenReturn(Optional.of(
+              Map.of(
+                  "topic1", "test.Sensor",
+                  "topic2", "test.AddressBook")));
+
+      when(resolver.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class))
+          .thenReturn(Optional.of(
+              Map.of(
+                  "topic1", "test.Person",
+                  "topic2", "test.AnotherPerson")));
+
+      var configuration = Configuration.create(resolver);
+
+      assertThat(configuration.defaultMessageDescriptor())
+          .matches(d -> d.getFullName().equals("test.Sensor"));
+      assertThat(configuration.defaultKeyMessageDescriptor())
+          .matches(d -> d.getFullName().equals("test.AddressBook"));
+
+      assertThat(configuration.messageDescriptorMap())
+          .containsOnlyKeys("topic1", "topic2")
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.Sensor"))
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.AddressBook"));
+
+      assertThat(configuration.keyMessageDescriptorMap())
+          .containsOnlyKeys("topic1", "topic2")
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.Person"))
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.AnotherPerson"));
+    }
+
+    @Test
+    void createConfigureFillsDescriptorMappingsWhenProtoFileDirProvided() throws Exception {
+      PropertyResolver resolver = mock(PropertyResolver.class);
+      when(resolver.getProperty("protobufFilesDir", String.class))
+          .thenReturn(Optional.of(protoFilesDir()));
+
+      when(resolver.getProperty("protobufMessageName", String.class))
+          .thenReturn(Optional.of("test.Sensor"));
+
+      when(resolver.getProperty("protobufMessageNameForKey", String.class))
+          .thenReturn(Optional.of("test.AddressBook"));
+
+      when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
+          .thenReturn(Optional.of(
+              Map.of(
+                  "topic1", "test.Sensor",
+                  "topic2", "test.LanguageDescription")));
+
+      when(resolver.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class))
+          .thenReturn(Optional.of(
+              Map.of(
+                  "topic1", "test.Person",
+                  "topic2", "test.AnotherPerson")));
+
+      var configuration = Configuration.create(resolver);
+
+      assertThat(configuration.defaultMessageDescriptor())
+          .matches(d -> d.getFullName().equals("test.Sensor"));
+      assertThat(configuration.defaultKeyMessageDescriptor())
+          .matches(d -> d.getFullName().equals("test.AddressBook"));
+
+      assertThat(configuration.messageDescriptorMap())
+          .containsOnlyKeys("topic1", "topic2")
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.Sensor"))
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.LanguageDescription"));
+
+      assertThat(configuration.keyMessageDescriptorMap())
+          .containsOnlyKeys("topic1", "topic2")
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.Person"))
+          .anySatisfy((topic, descr) -> assertThat(descr.getFullName()).isEqualTo("test.AnotherPerson"));
+    }
   }
 
   @Test
-  void testDeserializeMultipleProtobuf() {
+  void deserializeUsesTopicsMappingToFindMsgDescriptor() {
     var messageNameMap = Map.of(
         "persons", personDescriptor,
         "books", addressBookDescriptor,
-        "sensors", sensorDescriptor
+        "langs", langDescriptionDescriptor
     );
     var keyMessageNameMap = Map.of(
         "books", addressBookDescriptor);
     var serde = new ProtobufFileSerde();
     serde.configure(
-        null,
-        null,
-        descriptorPaths,
-        messageNameMap,
-        keyMessageNameMap
+        new Configuration(
+            null,
+            null,
+            descriptorPaths,
+            messageNameMap,
+            keyMessageNameMap
+        )
     );
 
     var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
@@ -128,20 +309,22 @@ class ProtobufFileSerdeTest {
         .deserialize(null, addressBookMessageBytes);
     assertJsonEquals(sampleBookMsgJson, deserializedBook.getResult());
 
-    var deserializedSensor = serde.deserializer("sensors", Serde.Target.VALUE)
-        .deserialize(null, sensorMessageBytes);
-    assertJsonEquals(sampleSensorMsgJson, deserializedSensor.getResult());
+    var deserializedSensor = serde.deserializer("langs", Serde.Target.VALUE)
+        .deserialize(null, langDescriptionMessageBytes);
+    assertJsonEquals(sampleLangDescriptionMsgJson, deserializedSensor.getResult());
   }
 
   @Test
-  void testDefaultMessageName() {
+  void deserializeUsesDefaultDescriptorIfTopicMappingNotFound() {
     var serde = new ProtobufFileSerde();
     serde.configure(
-        personDescriptor,
-        addressBookDescriptor,
-        descriptorPaths,
-        Map.of(),
-        Map.of()
+        new Configuration(
+            personDescriptor,
+            addressBookDescriptor,
+            descriptorPaths,
+            Map.of(),
+            Map.of()
+        )
     );
 
     var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
@@ -154,230 +337,57 @@ class ProtobufFileSerdeTest {
   }
 
   @Test
-  void testSerialize() {
-    var messageNameMap = Map.of(
-        "persons", personDescriptor,
-        "books", addressBookDescriptor
-    );
-    var keyMessageNameMap = Map.of(
-        "books", addressBookDescriptor);
-
-    var serde = new ProtobufFileSerde();
-    serde.configure(
-        null,
-        null,
-        descriptorPaths,
-        messageNameMap,
-        keyMessageNameMap
-    );
-
-    var personBytes = serde.serializer("persons", Serde.Target.VALUE)
-        .serialize("{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }");
-    assertThat(personBytes).isEqualTo(personMessageBytes);
-
-    var booksBytes = serde.serializer("books", Serde.Target.KEY)
-        .serialize("{\"version\": 1, \"people\": ["
-            + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\" }]}");
-    assertThat(booksBytes).isEqualTo(addressBookMessageBytes);
-  }
-
-  @Test
-  void testSerializeMultipleProtobuf() {
+  void serializeUsesTopicsMappingToFindMsgDescriptor() {
     var messageNameMap = Map.of(
         "persons", personDescriptor,
         "books", addressBookDescriptor,
-        "sensors", sensorDescriptor
+        "langs", langDescriptionDescriptor
     );
     var keyMessageNameMap = Map.of(
         "books", addressBookDescriptor);
 
     var serde = new ProtobufFileSerde();
     serde.configure(
-        null,
-        null,
-        descriptorPaths,
-        messageNameMap,
-        keyMessageNameMap
+        new Configuration(
+            null,
+            null,
+            descriptorPaths,
+            messageNameMap,
+            keyMessageNameMap
+        )
     );
 
-    var personBytes = serde.serializer("persons", Serde.Target.VALUE)
-        .serialize("{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }");
-    assertThat(personBytes).isEqualTo(personMessageBytes);
+    var personBytes = serde.serializer("langs", Serde.Target.VALUE)
+        .serialize(sampleLangDescriptionMsgJson);
+    assertThat(personBytes).isEqualTo(langDescriptionMessageBytes);
 
     var booksBytes = serde.serializer("books", Serde.Target.KEY)
-        .serialize("{\"version\": 1, \"people\": ["
-            + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\" }]}");
+        .serialize(sampleBookMsgJson);
     assertThat(booksBytes).isEqualTo(addressBookMessageBytes);
-
-    var sensorBytes = serde.serializer("sensors", Serde.Target.VALUE)
-        .serialize("{ \"name\": \"My Sensor\", \"temperature\": 20.5, \"humidity\": 50, \"door\": \"OPEN\" }");
-    assertThat(sensorBytes).isEqualTo(sensorMessageBytes);
   }
 
   @Test
-  void testSerializeDefaults() {
+  void serializeUsesDefaultDescriptorIfTopicMappingNotFound() {
     var serde = new ProtobufFileSerde();
     serde.configure(
-        personDescriptor,
-        addressBookDescriptor,
-        descriptorPaths,
-        Map.of(),
-        Map.of()
+        new Configuration(
+            personDescriptor,
+            addressBookDescriptor,
+            descriptorPaths,
+            Map.of(),
+            Map.of()
+        )
     );
 
     var personBytes = serde.serializer("persons", Serde.Target.VALUE)
-        .serialize("{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }");
+        .serialize(samplePersonMsgJson);
     assertThat(personBytes).isEqualTo(personMessageBytes);
 
     var booksBytes = serde.serializer("books", Serde.Target.KEY)
-        .serialize("{\"version\": 1, \"people\": ["
-            + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\" }]}");
+        .serialize(sampleBookMsgJson);
     assertThat(booksBytes).isEqualTo(addressBookMessageBytes);
   }
 
-  @Test
-  void canBeAutoConfiguredReturnsFalseIfNoProtoFilesHaveBeenProvided() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isFalse();
-  }
-
-  @Test
-  void canBeAutoConfiguredReturnsFalseIfProtoFilesListIsEmpty() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of()));
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isFalse();
-  }
-
-  @Test
-  void canBeAutoConfiguredReturnsTrueIfNoProtoFileHasBeenProvided() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getProperty("protobufFile", String.class)).thenReturn(Optional.of("file.proto"));
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isTrue();
-  }
-
-  @Test
-  void canBeAutoConfiguredReturnsTrueIfProtoFilesHasBeenProvided() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of("file.proto")));
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isTrue();
-  }
-
-  @Test
-  void canBeAutoConfiguredReturnsTrueIfProtoFileAndProtoFilesHaveBeenProvided() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getProperty("protobufFile", String.class)).thenReturn(Optional.of("file1.proto"));
-    when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of("file2.proto")));
-
-    var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
-    assertThat(startupSuccessful).isTrue();
-  }
-
-  @Test
-  void listOfProtobufFilesIsJoined() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getProperty("protobufFile", String.class))
-        .thenReturn(Optional.of(addressBookSchemaPath.toString()));
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.AddressBook"));
-
-    Map<String, String> protobufMessageNameByTopic = Map.of(
-        "persons", "test.Person",
-        "books", "test.AddressBook",
-        "sensors", "iot.Sensor");
-    when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
-        .thenReturn(Optional.of(protobufMessageNameByTopic));
-
-    var serde = new ProtobufFileSerde();
-    serde.configure(resolver, resolver, resolver);
-
-    var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
-        .deserialize(null, personMessageBytes);
-    assertJsonEquals(samplePersonMsgJson, deserializedPerson.getResult());
-
-    var deserializedSensor = serde.deserializer("sensors", Serde.Target.VALUE)
-        .deserialize(null, sensorMessageBytes);
-    assertJsonEquals(sampleSensorMsgJson, deserializedSensor.getResult());
-  }
-
-  @Test
-  void unknownSchemaAsDefaultThrowsException() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.NotExistent"));
-
-    var serde = new ProtobufFileSerde();
-    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
-        .isInstanceOf(NullPointerException.class)
-        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
-  }
-
-  @Test
-  void unknownSchemaAsDefaultForKeyThrowsException() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.AddressBook"));
-    when(resolver.getProperty("protobufMessageNameForKey", String.class))
-        .thenReturn(Optional.of("test.NotExistent"));
-
-    var serde = new ProtobufFileSerde();
-    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
-        .isInstanceOf(NullPointerException.class)
-        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
-  }
-
-  @Test
-  void unknownSchemaAsTopicSchemaThrowsException() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.AddressBook"));
-
-    when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
-        .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
-
-    var serde = new ProtobufFileSerde();
-    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
-        .isInstanceOf(NullPointerException.class)
-        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
-  }
-
-  @Test
-  void unknownSchemaAsTopicSchemaForKeyThrowsException() {
-    PropertyResolver resolver = mock(PropertyResolver.class);
-    when(resolver.getListProperty("protobufFiles", String.class))
-        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
-    when(resolver.getProperty("protobufMessageName", String.class))
-        .thenReturn(Optional.of("test.AddressBook"));
-
-    when(resolver.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class))
-        .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
-
-    var serde = new ProtobufFileSerde();
-    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
-        .isInstanceOf(NullPointerException.class)
-        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
-  }
-
   @SneakyThrows
   private void assertJsonEquals(String expectedJson, String actualJson) {
     var mapper = new JsonMapper();

+ 5 - 7
kafka-ui-api/src/test/resources/address-book.proto → kafka-ui-api/src/test/resources/protobuf-serde/address-book.proto

@@ -1,16 +1,10 @@
-// [START declaration]
 syntax = "proto3";
 package test;
 
-// [END declaration]
-
-// [START java_declaration]
 option java_multiple_files = true;
 option java_package = "com.example.tutorial.protos";
 option java_outer_classname = "AddressBookProtos";
-// [END java_declaration]
 
-// [START messages]
 message Person {
   string name = 1;
   int32 id = 2;  // Unique ID number for this person.
@@ -31,9 +25,13 @@ message Person {
 
 }
 
+message AnotherPerson {
+    string name = 1;
+    string surname = 2;
+}
+
 // Our address book file is just one of these.
 message AddressBook {
   int32 version = 1;
   repeated Person people = 2;
 }
-// [END messages]

+ 11 - 0
kafka-ui-api/src/test/resources/protobuf-serde/lang-description.proto

@@ -0,0 +1,11 @@
+syntax = "proto3";
+
+package test;
+
+import "language/language.proto";
+import "google/protobuf/wrappers.proto";
+
+message LanguageDescription {
+    test.lang.Language lang = 1;
+    google.protobuf.StringValue descr = 2;
+}

+ 11 - 0
kafka-ui-api/src/test/resources/protobuf-serde/language/language.proto

@@ -0,0 +1,11 @@
+syntax = "proto3";
+package test.lang;
+
+enum Language {
+    DE = 0;
+    EN = 1;
+    ES = 2;
+    FR = 3;
+    PL = 4;
+    RU = 5;
+}

+ 1 - 1
kafka-ui-api/src/test/resources/sensor.proto → kafka-ui-api/src/test/resources/protobuf-serde/sensor.proto

@@ -1,5 +1,5 @@
 syntax = "proto3";
-package iot;
+package test;
 
 message Sensor {
     string name = 1;