Kaynağa Gözat

Add support for multiple Protobuf files (#2874)

* Add support for multiple Protobuf files

* Ensure `protobufFiles` list is not empty

* Address review comments
Jochen Schalanda 2 yıl önce
ebeveyn
işleme
3277186fc0

+ 5 - 1
documentation/guides/Protobuf.md

@@ -11,8 +11,12 @@ To configure Kafkaui to deserialize protobuf messages using a supplied protobuf
 kafka:
   clusters:
     - # Cluster configuration omitted.
-      # protobufFile is the path to the protobuf schema.
+      # protobufFile is the path to the protobuf schema. (deprecated: please use "protobufFiles")
       protobufFile: path/to/my.proto
+      # protobufFiles is the path to one or more protobuf schemas.
+      protobufFiles: 
+          - path/to/my.proto
+          - path/to/another.proto
       # protobufMessageName is the default protobuf type that is used to deserilize
       # the message's value if the topic is not found in protobufMessageNameByTopic.
       protobufMessageName: my.Type1

+ 103 - 42
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerde.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.serdes.builtin;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
@@ -13,18 +14,21 @@ import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import lombok.SneakyThrows;
 
-
 public class ProtobufFileSerde implements BuiltInSerde {
 
   public static String name() {
@@ -33,11 +37,11 @@ public class ProtobufFileSerde implements BuiltInSerde {
 
   private static final ProtobufSchemaConverter SCHEMA_CONVERTER = new ProtobufSchemaConverter();
 
-  private Path protobufSchemaPath;
-
   private Map<String, Descriptor> messageDescriptorMap = new HashMap<>();
   private Map<String, Descriptor> keyMessageDescriptorMap = new HashMap<>();
 
+  private Map<Descriptor, Path> descriptorPaths = new HashMap<>();
+
   private Descriptor defaultMessageDescriptor;
 
   @Nullable
@@ -46,8 +50,10 @@ public class ProtobufFileSerde implements BuiltInSerde {
   @Override
   public boolean initOnStartup(PropertyResolver kafkaClusterProperties,
                                PropertyResolver globalProperties) {
-    return kafkaClusterProperties.getProperty("protobufFile", String.class)
-        .isPresent();
+    Optional<String> protobufFile = kafkaClusterProperties.getProperty("protobufFile", String.class);
+    Optional<List<String>> protobufFiles = kafkaClusterProperties.getListProperty("protobufFiles", String.class);
+
+    return protobufFile.isPresent() || protobufFiles.map(files -> files.isEmpty() ? null : files).isPresent();
   }
 
   @SneakyThrows
@@ -55,55 +61,107 @@ public class ProtobufFileSerde implements BuiltInSerde {
   public void configure(PropertyResolver serdeProperties,
                         PropertyResolver kafkaClusterProperties,
                         PropertyResolver globalProperties) {
-    protobufSchemaPath = Path.of(
-        kafkaClusterProperties.getProperty("protobufFile", String.class)
-            .orElseThrow());
-    ProtobufSchema protobufSchema;
-    try (Stream<String> lines = Files.lines(protobufSchemaPath)) {
-      protobufSchema = new ProtobufSchema(lines.collect(Collectors.joining("\n")));
-    }
+    Map<Path, ProtobufSchema> protobufSchemas = joinPathProperties(kafkaClusterProperties).stream()
+        .map(path -> Map.entry(path, new ProtobufSchema(readFileAsString(path))))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+
+    // Load all referenced message schemas and store their source proto file with the descriptors
+    Map<Descriptor, Path> descriptorPaths = new HashMap<>();
+    Optional<String> protobufMessageName = kafkaClusterProperties.getProperty("protobufMessageName", String.class);
+    protobufMessageName.ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
+
+    Optional<String> protobufMessageNameForKey =
+        kafkaClusterProperties.getProperty("protobufMessageNameForKey", String.class);
+    protobufMessageNameForKey
+        .ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
+
+    Optional<Map<String, String>> protobufMessageNameByTopic =
+        kafkaClusterProperties.getMapProperty("protobufMessageNameByTopic", String.class, String.class);
+    protobufMessageNameByTopic
+        .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
+
+    Optional<Map<String, String>> protobufMessageNameForKeyByTopic =
+        kafkaClusterProperties.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class);
+    protobufMessageNameForKeyByTopic
+        .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
+
+    // Fill dictionary for descriptor lookup by full message name
+    Map<String, Descriptor> descriptorMap = descriptorPaths.keySet().stream()
+        .collect(Collectors.toMap(Descriptor::getFullName, Function.identity()));
+
     configure(
-        protobufSchemaPath,
-        defaultMessageDescriptor = kafkaClusterProperties.getProperty("protobufMessageName", String.class)
-            .map(msgName -> Objects.requireNonNull(protobufSchema.toDescriptor(msgName),
-                "The given message type not found in protobuf definition: " + msgName))
-            // this is strange logic, but we need it to support serde's backward-compatibility
-            .orElseGet(protobufSchema::toDescriptor),
-        defaultKeyMessageDescriptor = kafkaClusterProperties.getProperty("protobufMessageNameForKey", String.class)
-            .map(msgName -> Objects.requireNonNull(protobufSchema.toDescriptor(msgName),
-                "The given message type not found in protobuf definition: " + msgName))
-            .orElse(null),
-        kafkaClusterProperties.getMapProperty("protobufMessageNameByTopic", String.class, String.class)
-            .map(map -> populateDescriptors(protobufSchema, map))
-            .orElse(Map.of()),
-        kafkaClusterProperties.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class)
-            .map(map -> populateDescriptors(protobufSchema, map))
-            .orElse(Map.of())
+        // this is strange logic, but we need it to support serde's backward-compatibility
+        protobufMessageName.map(descriptorMap::get).orElseGet(Empty::getDescriptor),
+        protobufMessageNameForKey.map(descriptorMap::get).orElse(null),
+        descriptorPaths,
+        protobufMessageNameByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of()),
+        protobufMessageNameForKeyByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of())
     );
   }
 
   @VisibleForTesting
   void configure(
-      Path protobufSchemaPath,
       Descriptor defaultMessageDescriptor,
       @Nullable Descriptor defaultKeyMessageDescriptor,
+      Map<Descriptor, Path> descriptorPaths,
       Map<String, Descriptor> messageDescriptorMap,
       Map<String, Descriptor> keyMessageDescriptorMap) {
-    this.protobufSchemaPath = protobufSchemaPath;
     this.defaultMessageDescriptor = defaultMessageDescriptor;
     this.defaultKeyMessageDescriptor = defaultKeyMessageDescriptor;
+    this.descriptorPaths = descriptorPaths;
     this.messageDescriptorMap = messageDescriptorMap;
     this.keyMessageDescriptorMap = keyMessageDescriptorMap;
   }
 
-  private Map<String, Descriptor> populateDescriptors(ProtobufSchema protobufSchema,
+  private static void addProtobufSchema(Map<Descriptor, Path> descriptorPaths,
+                                 Map<Path, ProtobufSchema> protobufSchemas,
+                                 String messageName) {
+    var descriptorAndPath = getDescriptorAndPath(protobufSchemas, messageName);
+    descriptorPaths.put(descriptorAndPath.getKey(), descriptorAndPath.getValue());
+  }
+
+  private static void addProtobufSchemas(Map<Descriptor, Path> descriptorPaths,
+                                  Map<Path, ProtobufSchema> protobufSchemas,
+                                  Map<String, String> messageNamesByTopic) {
+    messageNamesByTopic.values().stream()
+        .map(msgName -> getDescriptorAndPath(protobufSchemas, msgName))
+        .forEach(entry -> descriptorPaths.put(entry.getKey(), entry.getValue()));
+  }
+
+  private static List<Path> joinPathProperties(PropertyResolver propertyResolver) {
+    return Stream.concat(
+            propertyResolver.getProperty("protobufFile", String.class).map(List::of).stream(),
+            propertyResolver.getListProperty("protobufFiles", String.class).stream())
+        .flatMap(Collection::stream)
+        .distinct()
+        .map(Path::of)
+        .collect(Collectors.toList());
+  }
+
+  private static Map.Entry<Descriptor, Path> getDescriptorAndPath(Map<Path, ProtobufSchema> protobufSchemas,
+                                                                  String msgName) {
+    return protobufSchemas.entrySet().stream()
+            .filter(schema -> schema.getValue() != null && schema.getValue().toDescriptor(msgName) != null)
+            .map(schema -> Map.entry(schema.getValue().toDescriptor(msgName), schema.getKey()))
+            .findFirst()
+            .orElseThrow(() -> new NullPointerException(
+                    "The given message type not found in protobuf definition: " + msgName));
+  }
+
+  private static String readFileAsString(Path path) {
+    try {
+      return Files.readString(path);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  private Map<String, Descriptor> populateDescriptors(Map<String, Descriptor> descriptorMap,
                                                       Map<String, String> messageNameMap) {
     Map<String, Descriptor> descriptors = new HashMap<>();
     for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
-      var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
-          "The given message type is not found in protobuf definition: "
-              + entry.getValue());
-      descriptors.put(entry.getKey(), descriptor);
+      descriptors.put(entry.getKey(), descriptorMap.get(entry.getValue()));
     }
     return descriptors;
   }
@@ -168,11 +226,14 @@ public class ProtobufFileSerde implements BuiltInSerde {
 
   @Override
   public Optional<SchemaDescription> getSchema(String topic, Target type) {
-    return descriptorFor(topic, type)
-        .map(descriptor ->
-            new SchemaDescription(
-                SCHEMA_CONVERTER.convert(protobufSchemaPath.toUri(), descriptor).toJson(),
-                Map.of("messageName", descriptor.getFullName())
-            ));
+    return descriptorFor(topic, type).map(this::toSchemaDescription);
+  }
+
+  private SchemaDescription toSchemaDescription(Descriptor descriptor) {
+    Path path = descriptorPaths.get(descriptor);
+    return new SchemaDescription(
+        SCHEMA_CONVERTER.convert(path.toUri(), descriptor).toJson(),
+        Map.of("messageName", descriptor.getFullName())
+    );
   }
 }

+ 265 - 31
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerdeTest.java

@@ -1,19 +1,25 @@
 package com.provectus.kafka.ui.serdes.builtin;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.fasterxml.jackson.databind.json.JsonMapper;
-import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.util.JsonFormat;
+import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.Serde;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import lombok.SneakyThrows;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.springframework.util.ResourceUtils;
 
 class ProtobufFileSerdeTest {
 
@@ -23,43 +29,93 @@ class ProtobufFileSerdeTest {
   private static final String sampleBookMsgJson = "{\"version\": 1, \"people\": ["
       + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\", \"phones\":[]}]}";
 
+  private static final String sampleSensorMsgJson = "{ \"name\": \"My Sensor\", "
+      + "\"temperature\": 20.5, \"humidity\": 50, \"door\": \"OPEN\" }";
+
   // Sample message of type `test.Person`
-  private static byte[] personMessageBytes;
+  private byte[] personMessageBytes;
   // Sample message of type `test.AddressBook`
-  private static byte[] addressBookMessageBytes;
-  private static Path protobufSchemaPath;
-  private static ProtobufSchema protobufSchema;
+  private byte[] addressBookMessageBytes;
+  private byte[] sensorMessageBytes;
+  private Path addressBookSchemaPath;
+  private Path sensorSchemaPath;
+
+  private Descriptors.Descriptor personDescriptor;
+  private Descriptors.Descriptor addressBookDescriptor;
+  private Descriptors.Descriptor sensorDescriptor;
+  private Map<Descriptors.Descriptor, Path> descriptorPaths;
 
-  @BeforeAll
-  static void setUp() throws Exception {
-    protobufSchemaPath = Paths.get(ProtobufFileSerdeTest.class.getClassLoader()
-        .getResource("address-book.proto").toURI());
-    protobufSchema = new ProtobufSchema(Files.readString(protobufSchemaPath));
+  @BeforeEach
+  void setUp() throws Exception {
+    addressBookSchemaPath = ResourceUtils.getFile("classpath:address-book.proto").toPath();
+    sensorSchemaPath = ResourceUtils.getFile("classpath:sensor.proto").toPath();
 
-    DynamicMessage.Builder builder = protobufSchema.newMessageBuilder("test.Person");
+    ProtobufSchema addressBookSchema = new ProtobufSchema(Files.readString(addressBookSchemaPath));
+    var builder = addressBookSchema.newMessageBuilder("test.Person");
     JsonFormat.parser().merge(samplePersonMsgJson, builder);
     personMessageBytes = builder.build().toByteArray();
 
-    builder = protobufSchema.newMessageBuilder("test.AddressBook");
+    builder = addressBookSchema.newMessageBuilder("test.AddressBook");
     JsonFormat.parser().merge(sampleBookMsgJson, builder);
     addressBookMessageBytes = builder.build().toByteArray();
+    personDescriptor = addressBookSchema.toDescriptor("test.Person");
+    addressBookDescriptor = addressBookSchema.toDescriptor("test.AddressBook");
+
+    ProtobufSchema sensorSchema = new ProtobufSchema(Files.readString(sensorSchemaPath));
+    builder = sensorSchema.newMessageBuilder("iot.Sensor");
+    JsonFormat.parser().merge(sampleSensorMsgJson, builder);
+    sensorMessageBytes = builder.build().toByteArray();
+    sensorDescriptor = sensorSchema.toDescriptor("iot.Sensor");
+
+    descriptorPaths = Map.of(
+        personDescriptor, addressBookSchemaPath,
+        addressBookDescriptor, addressBookSchemaPath,
+        sensorDescriptor, sensorSchemaPath
+    );
   }
 
 
   @Test
   void testDeserialize() {
     var messageNameMap = Map.of(
-        "persons", protobufSchema.toDescriptor("test.Person"),
-        "books", protobufSchema.toDescriptor("test.AddressBook")
+        "persons", personDescriptor,
+        "books", addressBookDescriptor
     );
     var keyMessageNameMap = Map.of(
-        "books", protobufSchema.toDescriptor("test.AddressBook"));
+        "books", addressBookDescriptor);
+
+    var serde = new ProtobufFileSerde();
+    serde.configure(
+        null,
+        null,
+        descriptorPaths,
+        messageNameMap,
+        keyMessageNameMap
+    );
+
+    var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
+        .deserialize(null, personMessageBytes);
+    assertJsonEquals(samplePersonMsgJson, deserializedPerson.getResult());
+
+    var deserializedBook = serde.deserializer("books", Serde.Target.KEY)
+        .deserialize(null, addressBookMessageBytes);
+    assertJsonEquals(sampleBookMsgJson, deserializedBook.getResult());
+  }
 
+  @Test
+  void testDeserializeMultipleProtobuf() {
+    var messageNameMap = Map.of(
+        "persons", personDescriptor,
+        "books", addressBookDescriptor,
+        "sensors", sensorDescriptor
+    );
+    var keyMessageNameMap = Map.of(
+        "books", addressBookDescriptor);
     var serde = new ProtobufFileSerde();
     serde.configure(
-        protobufSchemaPath,
         null,
         null,
+        descriptorPaths,
         messageNameMap,
         keyMessageNameMap
     );
@@ -71,15 +127,19 @@ class ProtobufFileSerdeTest {
     var deserializedBook = serde.deserializer("books", Serde.Target.KEY)
         .deserialize(null, addressBookMessageBytes);
     assertJsonEquals(sampleBookMsgJson, deserializedBook.getResult());
+
+    var deserializedSensor = serde.deserializer("sensors", Serde.Target.VALUE)
+        .deserialize(null, sensorMessageBytes);
+    assertJsonEquals(sampleSensorMsgJson, deserializedSensor.getResult());
   }
 
   @Test
   void testDefaultMessageName() {
     var serde = new ProtobufFileSerde();
     serde.configure(
-        protobufSchemaPath,
-        protobufSchema.toDescriptor("test.Person"),
-        protobufSchema.toDescriptor("test.AddressBook"),
+        personDescriptor,
+        addressBookDescriptor,
+        descriptorPaths,
         Map.of(),
         Map.of()
     );
@@ -93,21 +153,20 @@ class ProtobufFileSerdeTest {
     assertJsonEquals(sampleBookMsgJson, deserializedBook.getResult());
   }
 
-
   @Test
   void testSerialize() {
     var messageNameMap = Map.of(
-        "persons", protobufSchema.toDescriptor("test.Person"),
-        "books", protobufSchema.toDescriptor("test.AddressBook")
+        "persons", personDescriptor,
+        "books", addressBookDescriptor
     );
     var keyMessageNameMap = Map.of(
-        "books", protobufSchema.toDescriptor("test.AddressBook"));
+        "books", addressBookDescriptor);
 
     var serde = new ProtobufFileSerde();
     serde.configure(
-        protobufSchemaPath,
         null,
         null,
+        descriptorPaths,
         messageNameMap,
         keyMessageNameMap
     );
@@ -122,13 +181,46 @@ class ProtobufFileSerdeTest {
     assertThat(booksBytes).isEqualTo(addressBookMessageBytes);
   }
 
+  @Test
+  void testSerializeMultipleProtobuf() {
+    var messageNameMap = Map.of(
+        "persons", personDescriptor,
+        "books", addressBookDescriptor,
+        "sensors", sensorDescriptor
+    );
+    var keyMessageNameMap = Map.of(
+        "books", addressBookDescriptor);
+
+    var serde = new ProtobufFileSerde();
+    serde.configure(
+        null,
+        null,
+        descriptorPaths,
+        messageNameMap,
+        keyMessageNameMap
+    );
+
+    var personBytes = serde.serializer("persons", Serde.Target.VALUE)
+        .serialize("{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }");
+    assertThat(personBytes).isEqualTo(personMessageBytes);
+
+    var booksBytes = serde.serializer("books", Serde.Target.KEY)
+        .serialize("{\"version\": 1, \"people\": ["
+            + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"addrBook@example.com\" }]}");
+    assertThat(booksBytes).isEqualTo(addressBookMessageBytes);
+
+    var sensorBytes = serde.serializer("sensors", Serde.Target.VALUE)
+        .serialize("{ \"name\": \"My Sensor\", \"temperature\": 20.5, \"humidity\": 50, \"door\": \"OPEN\" }");
+    assertThat(sensorBytes).isEqualTo(sensorMessageBytes);
+  }
+
   @Test
   void testSerializeDefaults() {
     var serde = new ProtobufFileSerde();
     serde.configure(
-        protobufSchemaPath,
-        protobufSchema.toDescriptor("test.Person"),
-        protobufSchema.toDescriptor("test.AddressBook"),
+        personDescriptor,
+        addressBookDescriptor,
+        descriptorPaths,
         Map.of(),
         Map.of()
     );
@@ -143,10 +235,152 @@ class ProtobufFileSerdeTest {
     assertThat(booksBytes).isEqualTo(addressBookMessageBytes);
   }
 
+  @Test
+  void initOnStartupReturnsFalseIfNoProtoFilesHaveBeenProvided() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+
+    var serde = new ProtobufFileSerde();
+    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    assertThat(startupSuccessful).isFalse();
+  }
+
+  @Test
+  void initOnStartupReturnsFalseIfProtoFilesListIsEmpty() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+    when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of()));
+
+    var serde = new ProtobufFileSerde();
+    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    assertThat(startupSuccessful).isFalse();
+  }
+
+  @Test
+  void initOnStartupReturnsTrueIfNoProtoFileHasBeenProvided() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+    when(resolver.getProperty("protobufFile", String.class)).thenReturn(Optional.of("file.proto"));
+
+    var serde = new ProtobufFileSerde();
+    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    assertThat(startupSuccessful).isTrue();
+  }
+
+  @Test
+  void initOnStartupReturnsTrueIfProtoFilesHasBeenProvided() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+    when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of("file.proto")));
+
+    var serde = new ProtobufFileSerde();
+    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    assertThat(startupSuccessful).isTrue();
+  }
+
+  @Test
+  void initOnStartupReturnsTrueIfProtoFileAndProtoFilesHaveBeenProvided() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+    when(resolver.getProperty("protobufFile", String.class)).thenReturn(Optional.of("file1.proto"));
+    when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of("file2.proto")));
+
+    var serde = new ProtobufFileSerde();
+    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    assertThat(startupSuccessful).isTrue();
+  }
+
+  @Test
+  void listOfProtobufFilesIsJoined() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+    when(resolver.getProperty("protobufFile", String.class))
+        .thenReturn(Optional.of(addressBookSchemaPath.toString()));
+    when(resolver.getListProperty("protobufFiles", String.class))
+        .thenReturn(Optional.of(List.of(sensorSchemaPath.toString())));
+    when(resolver.getProperty("protobufMessageName", String.class))
+        .thenReturn(Optional.of("test.AddressBook"));
+
+    Map<String, String> protobufMessageNameByTopic = Map.of(
+        "persons", "test.Person",
+        "books", "test.AddressBook",
+        "sensors", "iot.Sensor");
+    when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
+        .thenReturn(Optional.of(protobufMessageNameByTopic));
+
+    var serde = new ProtobufFileSerde();
+    serde.configure(resolver, resolver, resolver);
+
+    var deserializedPerson = serde.deserializer("persons", Serde.Target.VALUE)
+        .deserialize(null, personMessageBytes);
+    assertJsonEquals(samplePersonMsgJson, deserializedPerson.getResult());
+
+    var deserializedSensor = serde.deserializer("sensors", Serde.Target.VALUE)
+        .deserialize(null, sensorMessageBytes);
+    assertJsonEquals(sampleSensorMsgJson, deserializedSensor.getResult());
+  }
+
+  @Test
+  void unknownSchemaAsDefaultThrowsException() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+    when(resolver.getListProperty("protobufFiles", String.class))
+        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
+    when(resolver.getProperty("protobufMessageName", String.class))
+        .thenReturn(Optional.of("test.NotExistent"));
+
+    var serde = new ProtobufFileSerde();
+    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+  }
+
+  @Test
+  void unknownSchemaAsDefaultForKeyThrowsException() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+    when(resolver.getListProperty("protobufFiles", String.class))
+        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
+    when(resolver.getProperty("protobufMessageName", String.class))
+        .thenReturn(Optional.of("test.AddressBook"));
+    when(resolver.getProperty("protobufMessageNameForKey", String.class))
+        .thenReturn(Optional.of("test.NotExistent"));
+
+    var serde = new ProtobufFileSerde();
+    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+  }
+
+  @Test
+  void unknownSchemaAsTopicSchemaThrowsException() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+    when(resolver.getListProperty("protobufFiles", String.class))
+        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
+    when(resolver.getProperty("protobufMessageName", String.class))
+        .thenReturn(Optional.of("test.AddressBook"));
+
+    when(resolver.getMapProperty("protobufMessageNameByTopic", String.class, String.class))
+        .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
+
+    var serde = new ProtobufFileSerde();
+    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+  }
+
+  @Test
+  void unknownSchemaAsTopicSchemaForKeyThrowsException() {
+    PropertyResolver resolver = mock(PropertyResolver.class);
+    when(resolver.getListProperty("protobufFiles", String.class))
+        .thenReturn(Optional.of(List.of(addressBookSchemaPath.toString(), sensorSchemaPath.toString())));
+    when(resolver.getProperty("protobufMessageName", String.class))
+        .thenReturn(Optional.of("test.AddressBook"));
+
+    when(resolver.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class))
+        .thenReturn(Optional.of(Map.of("persons", "test.NotExistent")));
+
+    var serde = new ProtobufFileSerde();
+    assertThatThrownBy(() -> serde.configure(resolver, resolver, resolver))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessage("The given message type not found in protobuf definition: test.NotExistent");
+  }
+
   @SneakyThrows
   private void assertJsonEquals(String expectedJson, String actualJson) {
     var mapper = new JsonMapper();
     assertThat(mapper.readTree(actualJson)).isEqualTo(mapper.readTree(expectedJson));
   }
-
-}
+}

+ 14 - 0
kafka-ui-api/src/test/resources/sensor.proto

@@ -0,0 +1,14 @@
+syntax = "proto3";
+package iot;
+
+message Sensor {
+    string name = 1;
+    double temperature = 2;
+    int32 humidity = 3;
+
+    enum SwitchLevel {
+        CLOSED = 0;
+        OPEN = 1;
+    }
+    SwitchLevel door = 5;
+}