浏览代码

Add protobuf raw message deserializer (#4041)

Implemented a Protobuf Raw deserialiser that works like protoc --decode_raw. This is a no config alternative to the existing ProtobufFileSerde.

Co-authored-by: Ilya Kuramshin <iliax@proton.me>
Kostas Dizas 1 年之前
父节点
当前提交
d915de4fd8

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -16,6 +16,7 @@ import com.provectus.kafka.ui.serdes.builtin.HexSerde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
+import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
 import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
@@ -50,6 +51,7 @@ public class SerdesInitializer {
             .put(Base64Serde.name(), Base64Serde.class)
             .put(HexSerde.name(), HexSerde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
+            .put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
             .build(),
         new CustomSerdeLoader()
     );

+ 59 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java

@@ -0,0 +1,59 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.google.protobuf.UnknownFieldSet;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.RecordHeaders;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.util.Map;
+import java.util.Optional;
+import lombok.SneakyThrows;
+
+public class ProtobufRawSerde implements BuiltInSerde {
+
+  public static String name() {
+    return "ProtobufDecodeRaw";
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return false;
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return new Deserializer() {
+        @SneakyThrows
+        @Override
+        public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
+            try {
+              UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data);
+              return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of());
+            } catch (Exception e) {
+              throw new ValidationException(e.getMessage());
+            }
+        }
+    };
+  }
+}

+ 108 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java

@@ -0,0 +1,108 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.Serde;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ProtobufRawSerdeTest {
+
+  private static final String DUMMY_TOPIC = "dummy-topic";
+
+  private ProtobufRawSerde serde;
+
+  @BeforeEach
+  void init() {
+    serde = new ProtobufRawSerde();
+  }
+
+  @SneakyThrows
+  ProtobufSchema getSampleSchema() {
+    return new ProtobufSchema(
+        """
+          syntax = "proto3";
+          message Message1 {
+            int32 my_field = 1;
+          }
+        """
+    );
+  }
+
+  @SneakyThrows
+  private byte[] getProtobufMessage() {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleSchema().toDescriptor("Message1"));
+    builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
+    return builder.build().toByteArray();
+  }
+
+  @Test
+  void deserializeSimpleMessage() {
+    var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
+        .deserialize(null, getProtobufMessage());
+    assertThat(deserialized.getResult()).isEqualTo("1: 5\n");
+  }
+
+  @Test
+  void deserializeEmptyMessage() {
+    var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
+        .deserialize(null, new byte[0]);
+    assertThat(deserialized.getResult()).isEqualTo("");
+  }
+
+  @Test
+  void deserializeInvalidMessage() {
+    var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
+    assertThatThrownBy(() -> deserializer.deserialize(null, new byte[] { 1, 2, 3 }))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Protocol message contained an invalid tag");
+  }
+  
+  @Test
+  void deserializeNullMessage() {
+    var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
+    assertThatThrownBy(() -> deserializer.deserialize(null, null))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot read the array length");
+  }
+
+  ProtobufSchema getSampleNestedSchema() {
+    return new ProtobufSchema(
+      """
+        syntax = "proto3";
+        message Message2 {
+          int32 my_nested_field = 1;
+        }
+        message Message1 {
+          int32 my_field = 1;
+          Message2 my_nested_message = 2;
+        }
+      """
+    );
+  }
+
+  @SneakyThrows
+  private byte[] getComplexProtobufMessage() {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message1"));
+    builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
+    DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message2"));
+    nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10);
+    builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build());
+
+    return builder.build().toByteArray();
+  }
+
+  @Test
+  void deserializeNestedMessage() {
+    var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
+        .deserialize(null, getComplexProtobufMessage());
+    assertThat(deserialized.getResult()).isEqualTo("1: 5\n2: {\n  1: 10\n}\n");
+  }
+}