瀏覽代碼

Implement deserializing binary protobuf encoded message keys (#1729)

* Add protobufMessageNameForKeyByTopic option to config. Message keys are
deserialized using a protobuf schema if the config is set. Otherwise
message keys are treated as strings.

Closes #1699

* Add documentation around kafkaui's protobuf support

* Add protobufMessageNameForKey config option

* Update README with info about default types

* Imeplement support for protobufMessageNameForKeyByTopic

* fallback to FALLBACK_FORMATTER

* Add ability to publish message with protobuf key

* Change log levels to debug and add @Nullable annotations

* Attempt at fixing documentation workflow

Co-authored-by: Ilya Kuramshin <ilia-2k@rambler.ru>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Co-authored-by: Roman Zabaluev <github@haarolean.dev>
Joseph DeChicchis 3 年之前
父節點
當前提交
1796fd519b

+ 1 - 1
.github/workflows/documentation.yaml

@@ -18,6 +18,6 @@ jobs:
         uses: urlstechie/urlchecker-action@0.2.31
         with:
           exclude_patterns: localhost,127.0.,192.168.
-          exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com
+          exclude_urls: https://api.server,https://graph.microsoft.com/User.Read,https://dev-a63ggcut.auth0.com/
           print_all: false
           file_types: .md

+ 33 - 0
documentation/guides/Protobuf.md

@@ -0,0 +1,33 @@
+# Kafkaui Protobuf Support
+
+Kafkaui supports deserializing protobuf messages in two ways:
+1. Using Confluent Schema Registry's [protobuf support](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html).
+2. Supplying a protobuf file as well as a configuration that maps topic names to protobuf types.
+
+## Configuring Kafkaui with a Protobuf File
+
+To configure Kafkaui to deserialize protobuf messages using a supplied protobuf schema add the following to the config:
+```yaml
+kafka:
+  clusters:
+    - # Cluster configuration omitted.
+      # protobufFile is the path to the protobuf schema.
+      protobufFile: path/to/my.proto
+      # protobufMessageName is the default protobuf type that is used to deserilize
+      # the message's value if the topic is not found in protobufMessageNameByTopic.
+      protobufMessageName: my.Type1
+      # protobufMessageNameByTopic is a mapping of topic names to protobuf types.
+      # This mapping is required and is used to deserialize the Kafka message's value.
+      protobufMessageNameByTopic:
+        topic1: my.Type1
+        topic2: my.Type2
+      # protobufMessageNameForKey is the default protobuf type that is used to deserilize
+      # the message's key if the topic is not found in protobufMessageNameForKeyByTopic.
+      protobufMessageNameForKey: my.Type1
+      # protobufMessageNameForKeyByTopic is a mapping of topic names to protobuf types.
+      # This mapping is optional and is used to deserialize the Kafka message's key.
+      # If a protobuf type is not found for a topic's key, the key is deserialized as a string,
+      # unless protobufMessageNameForKey is specified.
+      protobufMessageNameForKeyByTopic:
+        topic1: my.KeyType1
+```

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -31,6 +31,8 @@ public class ClustersProperties {
     String protobufFile;
     String protobufMessageName;
     Map<String, String> protobufMessageNameByTopic;
+    String protobufMessageNameForKey;
+    Map<String, String> protobufMessageNameForKeyByTopic;
     List<ConnectCluster> kafkaConnect;
     int jmxPort;
     boolean jmxSsl;

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -28,6 +28,8 @@ public class KafkaCluster {
   private final Path protobufFile;
   private final String protobufMessageName;
   private final Map<String, String> protobufMessageNameByTopic;
+  private final String protobufMessageNameForKey;
+  private final Map<String, String> protobufMessageNameForKeyByTopic;
   private final Properties properties;
   private final boolean readOnly;
   private final boolean disableLogDirsCollection;

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java

@@ -32,7 +32,8 @@ public class DeserializationService {
       if (cluster.getProtobufFile() != null) {
         log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName());
         return new ProtobufFileRecordSerDe(cluster.getProtobufFile(),
-            cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName());
+            cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageNameForKeyByTopic(),
+            cluster.getProtobufMessageName(), cluster.getProtobufMessageNameForKey());
       } else if (cluster.getSchemaRegistry() != null) {
         log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName());
         return new SchemaRegistryAwareRecordSerDe(cluster);

+ 106 - 37
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java

@@ -6,6 +6,7 @@ import com.google.protobuf.util.JsonFormat;
 import com.provectus.kafka.ui.model.MessageSchemaDTO;
 import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
 import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
+import com.provectus.kafka.ui.serde.schemaregistry.StringMessageFormatter;
 import com.provectus.kafka.ui.util.jsonschema.JsonSchema;
 import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
@@ -22,20 +23,26 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.utils.Bytes;
 
-//TODO: currently we assume that keys for this serde are always string - need to discuss if it is ok
+@Slf4j
 public class ProtobufFileRecordSerDe implements RecordSerDe {
+  private static final StringMessageFormatter FALLBACK_FORMATTER = new StringMessageFormatter();
+
   private final ProtobufSchema protobufSchema;
   private final Path protobufSchemaPath;
   private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter();
   private final Map<String, Descriptor> messageDescriptorMap;
+  private final Map<String, Descriptor> keyMessageDescriptorMap;
   private final Descriptor defaultMessageDescriptor;
+  private final Descriptor defaultKeyMessageDescriptor;
 
   public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> messageNameMap,
-                                 String defaultMessageName)
+                                 Map<String, String> keyMessageNameMap, String defaultMessageName,
+                                 @Nullable String defaultKeyMessageName)
       throws IOException {
     this.protobufSchemaPath = protobufSchemaPath;
     try (final Stream<String> lines = Files.lines(protobufSchemaPath)) {
@@ -49,35 +56,70 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
       }
       this.messageDescriptorMap = new HashMap<>();
       if (messageNameMap != null) {
-        for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
-          var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
-              "The given message type is not found in protobuf definition: "
-                  + entry.getValue());
-          messageDescriptorMap.put(entry.getKey(), descriptor);
-        }
+        populateDescriptors(messageNameMap, messageDescriptorMap);
+      }
+      this.keyMessageDescriptorMap = new HashMap<>();
+      if (keyMessageNameMap != null) {
+        populateDescriptors(keyMessageNameMap, keyMessageDescriptorMap);
       }
       defaultMessageDescriptor = Objects.requireNonNull(protobufSchema.toDescriptor(),
           "The given message type is not found in protobuf definition: "
               + defaultMessageName);
+      if (defaultKeyMessageName != null) {
+        defaultKeyMessageDescriptor = schema.copy(defaultKeyMessageName).toDescriptor();
+      } else {
+        defaultKeyMessageDescriptor = null;
+      }
+    }
+  }
+
+  private void populateDescriptors(Map<String, String> messageNameMap, Map<String, Descriptor> messageDescriptorMap) {
+    for (Map.Entry<String, String> entry : messageNameMap.entrySet()) {
+      var descriptor = Objects.requireNonNull(protobufSchema.toDescriptor(entry.getValue()),
+          "The given message type is not found in protobuf definition: "
+              + entry.getValue());
+      messageDescriptorMap.put(entry.getKey(), descriptor);
     }
   }
 
   @Override
   public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
-    try {
-      var builder = DeserializedKeyValue.builder();
-      if (msg.key() != null) {
-        builder.key(new String(msg.key().get()));
-        builder.keyFormat(MessageFormat.UNKNOWN);
+    var builder = DeserializedKeyValue.builder();
+
+    if (msg.key() != null) {
+      Descriptor descriptor = getKeyDescriptor(msg.topic());
+      if (descriptor == null) {
+        builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.key().get()));
+        builder.keyFormat(FALLBACK_FORMATTER.getFormat());
+      } else {
+        try {
+          builder.key(parse(msg.key().get(), descriptor));
+          builder.keyFormat(MessageFormat.PROTOBUF);
+        } catch (Throwable e) {
+          log.debug("Failed to deserialize key as protobuf, falling back to string formatter", e);
+          builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.key().get()));
+          builder.keyFormat(FALLBACK_FORMATTER.getFormat());
+        }
       }
-      if (msg.value() != null) {
+    }
+
+    if (msg.value() != null) {
+      try {
         builder.value(parse(msg.value().get(), getDescriptor(msg.topic())));
         builder.valueFormat(MessageFormat.PROTOBUF);
+      } catch (Throwable e) {
+        log.debug("Failed to deserialize value as protobuf, falling back to string formatter", e);
+        builder.key(FALLBACK_FORMATTER.format(msg.topic(), msg.value().get()));
+        builder.keyFormat(FALLBACK_FORMATTER.getFormat());
       }
-      return builder.build();
-    } catch (Throwable e) {
-      throw new RuntimeException("Failed to parse record from topic " + msg.topic(), e);
     }
+
+    return builder.build();
+  }
+
+  @Nullable
+  private Descriptor getKeyDescriptor(String topic) {
+    return keyMessageDescriptorMap.getOrDefault(topic, defaultKeyMessageDescriptor);
   }
 
   private Descriptor getDescriptor(String topic) {
@@ -99,40 +141,67 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
                                                   @Nullable String key,
                                                   @Nullable String data,
                                                   @Nullable Integer partition) {
-    if (data == null) {
-      return new ProducerRecord<>(topic, partition, Objects.requireNonNull(key).getBytes(), null);
+    byte[] keyPayload = null;
+    byte[] valuePayload = null;
+
+    if (key != null) {
+      Descriptor keyDescriptor = getKeyDescriptor(topic);
+      if (keyDescriptor == null) {
+        keyPayload = key.getBytes();
+      } else {
+        DynamicMessage.Builder builder = DynamicMessage.newBuilder(keyDescriptor);
+        try {
+          JsonFormat.parser().merge(key, builder);
+          keyPayload = builder.build().toByteArray();
+        } catch (Throwable e) {
+          throw new RuntimeException("Failed to merge record key for topic " + topic, e);
+        }
+      }
     }
-    DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic));
-    try {
-      JsonFormat.parser().merge(data, builder);
-      final DynamicMessage message = builder.build();
-      return new ProducerRecord<>(
-          topic,
-          partition,
-          Optional.ofNullable(key).map(String::getBytes).orElse(null),
-          message.toByteArray()
-      );
-    } catch (Throwable e) {
-      throw new RuntimeException("Failed to merge record for topic " + topic, e);
+
+    if (data != null) {
+      DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor(topic));
+      try {
+        JsonFormat.parser().merge(data, builder);
+        valuePayload = builder.build().toByteArray();
+      } catch (Throwable e) {
+        throw new RuntimeException("Failed to merge record value for topic " + topic, e);
+      }
     }
+
+    return new ProducerRecord<>(
+        topic,
+        partition,
+        keyPayload,
+        valuePayload);
   }
 
   @Override
   public TopicMessageSchemaDTO getTopicSchema(String topic) {
+    JsonSchema keyJsonSchema;
+
+    Descriptor keyDescriptor = getKeyDescriptor(topic);
+    if (keyDescriptor == null) {
+      keyJsonSchema = JsonSchema.stringSchema();
+    } else {
+      keyJsonSchema = schemaConverter.convert(
+          protobufSchemaPath.toUri(),
+          keyDescriptor);
+    }
 
-    final JsonSchema jsonSchema = schemaConverter.convert(
-        protobufSchemaPath.toUri(),
-        getDescriptor(topic)
-    );
     final MessageSchemaDTO keySchema = new MessageSchemaDTO()
         .name(protobufSchema.fullName())
         .source(MessageSchemaDTO.SourceEnum.PROTO_FILE)
-        .schema(JsonSchema.stringSchema().toJson());
+        .schema(keyJsonSchema.toJson());
+
+    final JsonSchema valueJsonSchema = schemaConverter.convert(
+        protobufSchemaPath.toUri(),
+        getDescriptor(topic));
 
     final MessageSchemaDTO valueSchema = new MessageSchemaDTO()
         .name(protobufSchema.fullName())
         .source(MessageSchemaDTO.SourceEnum.PROTO_FILE)
-        .schema(jsonSchema.toJson());
+        .schema(valueJsonSchema.toJson());
 
     return new TopicMessageSchemaDTO()
         .key(keySchema)

+ 33 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java

@@ -51,8 +51,10 @@ class ProtobufFileRecordSerDeTest {
     var messageNameMap = Map.of(
         "topic1", "test.Person",
         "topic2", "test.AddressBook");
+    var keyMessageNameMap = Map.of(
+        "topic2", "test.Person");
     var deserializer =
-        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null);
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, keyMessageNameMap, null, null);
     var msg1 = deserializer
         .deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()),
             Bytes.wrap(personMessage)));
@@ -60,8 +62,10 @@ class ProtobufFileRecordSerDeTest {
     assertTrue(msg1.getValue().contains("user1@example.com"));
 
     var msg2 = deserializer
-        .deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap("key".getBytes()),
+        .deserialize(new ConsumerRecord<>("topic2", 1, 1, Bytes.wrap(personMessage),
             Bytes.wrap(addressBookMessage)));
+    assertEquals(MessageFormat.PROTOBUF, msg2.getKeyFormat());
+    assertTrue(msg2.getKey().contains("user1@example.com"));
     assertTrue(msg2.getValue().contains("user2@example.com"));
   }
 
@@ -69,7 +73,7 @@ class ProtobufFileRecordSerDeTest {
   void testNoDefaultMessageName() throws IOException {
     // by default the first message type defined in proto definition is used
     var deserializer =
-        new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null);
+        new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null, null, null);
     var msg = deserializer
         .deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()),
             Bytes.wrap(personMessage)));
@@ -80,19 +84,42 @@ class ProtobufFileRecordSerDeTest {
   void testDefaultMessageName() throws IOException {
     var messageNameMap = Map.of("topic1", "test.Person");
     var deserializer =
-        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook", null);
     var msg = deserializer
-        .deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()),
+        .deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap(addressBookMessage),
             Bytes.wrap(addressBookMessage)));
     assertTrue(msg.getValue().contains("user2@example.com"));
   }
 
+  @Test
+  void testDefaultKeyMessageName() throws IOException {
+    var messageNameMap = Map.of("topic1", "test.Person");
+    var deserializer =
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, messageNameMap, "test.AddressBook",
+            "test.AddressBook");
+    var msg = deserializer
+        .deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap(addressBookMessage),
+            Bytes.wrap(addressBookMessage)));
+    assertTrue(msg.getKey().contains("user2@example.com"));
+  }
+
   @Test
   void testSerialize() throws IOException {
     var messageNameMap = Map.of("topic1", "test.Person");
     var serializer =
-        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, "test.AddressBook", null);
     var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0);
     assertNotNull(serialized.value());
   }
+
+  @Test
+  void testSerializeKeyAndValue() throws IOException {
+    var messageNameMap = Map.of("topic1", "test.Person");
+    var serializer =
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, messageNameMap, "test.AddressBook",
+                "test.AddressBook");
+    var serialized = serializer.serialize("topic1", "{\"name\":\"MyName\"}", "{\"name\":\"MyName\"}", 0);
+    assertNotNull(serialized.key());
+    assertNotNull(serialized.value());
+  }
 }