Forráskód Böngészése

ISSUE-849: Use a map between topics and message-names when using ProtobufFile (#854)

* Use a map between topics and message-names when using ProtobufFile
* Validate the given message names for the topics in ProtobufFileRecordSerDe
Meysam Zarezadeh 3 éve
szülő
commit
64f957771c

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.config;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import lombok.Data;
 import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -26,6 +27,7 @@ public class ClustersProperties {
     String keySchemaNameTemplate = "%s-key";
     String protobufFile;
     String protobufMessageName;
+    Map<String, String> protobufMessageNameByTopic;
     List<ConnectCluster> kafkaConnect;
     int jmxPort;
     boolean jmxSsl;

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -35,6 +35,7 @@ public class KafkaCluster {
   private final Throwable lastZookeeperException;
   private final Path protobufFile;
   private final String protobufMessageName;
+  private final Map<String, String> protobufMessageNameByTopic;
   private final Properties properties;
   private final Boolean readOnly;
   private final Boolean disableLogDirsCollection;

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java

@@ -35,7 +35,8 @@ public class DeserializationService {
       if (cluster.getProtobufFile() != null) {
         log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName());
         return new ProtobufFileRecordSerDe(cluster.getProtobufFile(),
-            cluster.getProtobufMessageName(), objectMapper);
+            cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName(),
+            objectMapper);
       } else {
         log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName());
         return new SchemaRegistryAwareRecordSerDe(cluster);

+ 38 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.serde;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.util.JsonFormat;
 import com.provectus.kafka.ui.model.MessageSchema;
@@ -14,6 +15,8 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -30,15 +33,35 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
   private final ObjectMapper objectMapper;
   private final Path protobufSchemaPath;
   private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter();
+  private final Map<String, Descriptor> messageDescriptorMap;
+  private final Descriptor defaultMessageDescriptor;
 
-  public ProtobufFileRecordSerDe(Path protobufSchemaPath, String messageName,
-                                 ObjectMapper objectMapper) throws IOException {
+  public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> messageNameMap,
+                                 String defaultMessageName, ObjectMapper objectMapper)
+      throws IOException {
     this.objectMapper = objectMapper;
     this.protobufSchemaPath = protobufSchemaPath;
     try (final Stream<String> lines = Files.lines(protobufSchemaPath)) {
-      this.protobufSchema = new ProtobufSchema(
+      var schema = new ProtobufSchema(
           lines.collect(Collectors.joining("\n"))
-      ).copy(messageName);
+      );
+      if (defaultMessageName != null) {
+        this.protobufSchema = schema.copy(defaultMessageName);
+      } else {
+        this.protobufSchema = schema;
+      }
+      this.messageDescriptorMap = new HashMap<>();
+      if (messageNameMap != null) {
+        for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
+          var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
+              "The given message type is not found in protobuf definition: "
+                  + entry.getValue());
+          messageDescriptorMap.put(entry.getKey(), descriptor);
+        }
+      }
+      defaultMessageDescriptor = Objects.requireNonNull(protobufSchema.toDescriptor(),
+          "The given message type is not found in protobuf definition: "
+              + defaultMessageName);
     }
   }
 
@@ -51,7 +74,7 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
         builder.keyFormat(MessageFormat.UNKNOWN);
       }
       if (msg.value() != null) {
-        builder.value(parse(msg.value().get()));
+        builder.value(parse(msg.value().get(), getDescriptor(msg.topic())));
         builder.valueFormat(MessageFormat.PROTOBUF);
       }
       return builder.build();
@@ -60,11 +83,15 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
     }
   }
 
+  private Descriptor getDescriptor(String topic) {
+    return messageDescriptorMap.getOrDefault(topic, defaultMessageDescriptor);
+  }
+
   @SneakyThrows
-  private String parse(byte[] value) {
+  private String parse(byte[] value, Descriptor descriptor) {
     DynamicMessage protoMsg = DynamicMessage.parseFrom(
-        protobufSchema.toDescriptor(),
-        new ByteArrayInputStream(value)
+            descriptor,
+            new ByteArrayInputStream(value)
     );
     byte[] jsonFromProto = ProtobufSchemaUtils.toJson(protoMsg);
     return new String(jsonFromProto);
@@ -78,7 +105,7 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
     if (data == null) {
       return new ProducerRecord<>(topic, partition, Objects.requireNonNull(key).getBytes(), null);
     }
-    DynamicMessage.Builder builder = protobufSchema.newMessageBuilder();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic));
     try {
       JsonFormat.parser().merge(data, builder);
       final DynamicMessage message = builder.build();
@@ -97,8 +124,8 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
   public TopicMessageSchema getTopicSchema(String topic) {
 
     final JsonSchema jsonSchema = schemaConverter.convert(
-        protobufSchemaPath.toUri(),
-        protobufSchema.toDescriptor()
+            protobufSchemaPath.toUri(),
+            getDescriptor(topic)
     );
     final MessageSchema keySchema = new MessageSchema()
         .name(protobufSchema.fullName())

+ 8 - 0
kafka-ui-api/src/main/resources/application-local.yml

@@ -19,6 +19,14 @@ kafka:
   #          address: http://localhost:8083
   #      jmxPort: 9998
   #      read-only: true
+  #    -
+  #      name: localUsingProtobufFile
+  #      bootstrapServers: localhost:9092
+  #      protobufFile: messages.proto
+  #      protobufMessageName: GenericMessage
+  #      protobufMessageNameByTopic:
+  #        input-topic: InputMessage
+  #        output-topic: OutputMessage
   admin-client-timeout: 5000
 zookeeper:
   connection-timeout: 1000

+ 102 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java

@@ -0,0 +1,102 @@
+package com.provectus.kafka.ui.serde;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.util.JsonFormat;
+import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+class ProtobufFileRecordSerDeTest {
+
+  // Sample message of type `test.Person`
+  private static byte[] personMessage;
+  // Sample message of type `test.AddressBook`
+  private static byte[] addressBookMessage;
+  private static Path protobufSchemaPath;
+
+  @BeforeAll
+  static void setUp() throws URISyntaxException, IOException {
+    protobufSchemaPath = Paths.get(ProtobufFileRecordSerDeTest.class.getClassLoader()
+        .getResource("address-book.proto").toURI());
+    ProtobufSchema protobufSchema = new ProtobufSchema(Files.readString(protobufSchemaPath));
+
+    DynamicMessage.Builder builder = protobufSchema.newMessageBuilder("test.Person");
+    JsonFormat.parser().merge(
+        "{ \"name\": \"My Name\",\"id\": 101, \"email\": \"user1@example.com\" }", builder);
+    personMessage = builder.build().toByteArray();
+
+    builder = protobufSchema.newMessageBuilder("test.AddressBook");
+    JsonFormat.parser().merge(
+        "{\"version\": 1, \"people\": ["
+            + "{ \"name\": \"My Name\",\"id\": 102, \"email\": \"user2@example.com\" }]}", builder);
+    addressBookMessage = builder.build().toByteArray();
+  }
+
+  @Test
+  void testDeserialize() throws IOException {
+    var messageNameMap = Map.of(
+        "topic1", "test.Person",
+        "topic2", "test.AddressBook");
+    var deserializer =
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, new ObjectMapper());
+    var msg1 = deserializer
+        .deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()),
+            Bytes.wrap(personMessage)));
+    assertEquals(MessageFormat.PROTOBUF, msg1.getValueFormat());
+    assertTrue(msg1.getValue().contains("user1@example.com"));
+
+    var msg2 = deserializer
+        .deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()),
+            Bytes.wrap(addressBookMessage)));
+    assertTrue(msg2.getValue().contains("user2@example.com"));
+  }
+
+  @Test
+  void testNoDefaultMessageName() throws IOException {
+    // by default the first message type defined in proto definition is used
+    var deserializer =
+        new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null,
+            new ObjectMapper());
+    var msg = deserializer
+        .deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()),
+            Bytes.wrap(personMessage)));
+    assertTrue(msg.getValue().contains("user1@example.com"));
+  }
+
+  @Test
+  void testDefaultMessageName() throws IOException {
+    var messageNameMap = Map.of("topic1", "test.Person");
+    var deserializer =
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook",
+            new ObjectMapper());
+    var msg = deserializer
+        .deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()),
+            Bytes.wrap(addressBookMessage)));
+    assertTrue(msg.getValue().contains("user2@example.com"));
+  }
+
+  @Test
+  void testSerialize() throws IOException {
+    var messageNameMap = Map.of("topic1", "test.Person");
+    var serializer =
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook",
+            new ObjectMapper());
+    var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0);
+    assertNotNull(serialized.value());
+  }
+}

+ 39 - 0
kafka-ui-api/src/test/resources/address-book.proto

@@ -0,0 +1,39 @@
+// [START declaration]
+syntax = "proto3";
+package test;
+
+// [END declaration]
+
+// [START java_declaration]
+option java_multiple_files = true;
+option java_package = "com.example.tutorial.protos";
+option java_outer_classname = "AddressBookProtos";
+// [END java_declaration]
+
+// [START messages]
+message Person {
+  string name = 1;
+  int32 id = 2;  // Unique ID number for this person.
+  string email = 3;
+
+  enum PhoneType {
+    MOBILE = 0;
+    HOME = 1;
+    WORK = 2;
+  }
+
+  message PhoneNumber {
+    string number = 1;
+    PhoneType type = 2;
+  }
+
+  repeated PhoneNumber phones = 4;
+
+}
+
+// Our address book file is just one of these.
+message AddressBook {
+  int32 version = 1;
+  repeated Person people = 2;
+}
+// [END messages]