Browse Source

Add protobuf raw message deserializer

kostasdizas 2 years ago
parent
commit
2ad229439e

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -15,6 +15,7 @@ import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
+import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
 import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
@@ -48,6 +49,7 @@ public class SerdesInitializer {
             .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
             .put(Base64Serde.name(), Base64Serde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
+            .put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
             .build(),
         new CustomSerdeLoader()
     );

+ 59 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java

@@ -0,0 +1,59 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.google.protobuf.UnknownFieldSet;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.RecordHeaders;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.util.Map;
+import java.util.Optional;
+import lombok.SneakyThrows;
+
+public class ProtobufRawSerde implements BuiltInSerde {
+
+  public static String name() {
+    return "ProtobufRaw";
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return false;
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return new Deserializer() {
+        @SneakyThrows
+        @Override
+        public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
+            try {
+              UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data);
+              return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of());
+            } catch (Exception e) {
+              throw new ValidationException(e.getMessage());
+            }
+        }
+    };
+  }
+}

+ 150 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java

@@ -0,0 +1,150 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.Serde;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ProtobufRawSerdeTest {
+
+  private static final String DUMMY_TOPIC = "dummy-topic";
+
+  private ProtobufRawSerde serde;
+
+  @BeforeEach
+  void init() {
+    serde = new ProtobufRawSerde();
+  }
+
+  @SneakyThrows
+  Descriptors.Descriptor getDescriptor() {
+    DescriptorProtos.FileDescriptorProto fileDescriptorProto = DescriptorProtos.FileDescriptorProto
+        .newBuilder()
+        .setSyntax("proto3")
+        .setName("test.proto")
+        .addMessageType(
+            DescriptorProtos.DescriptorProto.newBuilder()
+                .setName("MyMessage")
+                .addField(
+                    DescriptorProtos.FieldDescriptorProto.newBuilder()
+                        .setName("my_field")
+                        .setNumber(1)
+                        .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32)
+                        .build()
+                )
+            .build()
+        )
+        .build();
+
+    Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(
+        fileDescriptorProto, new Descriptors.FileDescriptor[0]);
+
+    return fileDescriptor.findMessageTypeByName("MyMessage");
+  }
+
+  @SneakyThrows
+  private byte[] getProtobufMessage() {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(getDescriptor());
+    builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
+    return builder.build().toByteArray();
+  }
+
+  @Test
+  void deserializeSimpleMessage() {
+    var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
+        .deserialize(null, getProtobufMessage());
+    assertThat(deserialized.getResult()).isEqualTo("1: 5\n");
+  }
+
+  @Test
+  void deserializeEmptyMessage() {
+    var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
+        .deserialize(null, new byte[0]);
+    assertThat(deserialized.getResult()).isEqualTo("");
+  }
+
+  @Test
+  void deserializeInvalidMessage() {
+    var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
+    assertThatThrownBy(() -> deserializer.deserialize(null, new byte[] { 1, 2, 3 }))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Protocol message contained an invalid tag");
+  }
+  
+  @Test
+  void deserializeNullMessage() {
+    var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
+    assertThatThrownBy(() -> deserializer.deserialize(null, null))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot read the array length");
+  }
+
+  @SneakyThrows
+  private byte[] getComplexProtobufMessage() {
+    DescriptorProtos.FileDescriptorProto fileDescriptorProto = DescriptorProtos.FileDescriptorProto
+        .newBuilder()
+        .setSyntax("proto3")
+        .setName("test.proto")
+        .addMessageType(
+            DescriptorProtos.DescriptorProto.newBuilder()
+                .setName("MyMessage")
+                .addField(
+                    DescriptorProtos.FieldDescriptorProto.newBuilder()
+                        .setName("my_field")
+                        .setNumber(1)
+                        .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32)
+                        .build()
+                )
+                .addField(
+                    DescriptorProtos.FieldDescriptorProto.newBuilder()
+                        .setName("my_nested_message")
+                        .setNumber(2)
+                        .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE)
+                        .setTypeName("MyNestedMessage")
+                        .build()
+                )
+                .build()
+        )
+        .addMessageType(
+            DescriptorProtos.DescriptorProto.newBuilder()
+                .setName("MyNestedMessage")
+                .addField(
+                    DescriptorProtos.FieldDescriptorProto.newBuilder()
+                        .setName("my_nested_field")
+                        .setNumber(1)
+                        .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32)
+                        .build()
+                )
+                .build()
+        )
+        .build();
+
+    Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(
+        fileDescriptorProto, new Descriptors.FileDescriptor[0]);
+
+    Descriptors.Descriptor descriptor = fileDescriptor.findMessageTypeByName("MyMessage");
+    Descriptors.Descriptor nestedDescriptor = fileDescriptor.findMessageTypeByName("MyNestedMessage");
+
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
+    DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(nestedDescriptor);
+    nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10);
+    builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build());
+
+    return builder.build().toByteArray();
+  }
+
+  @Test
+  void deserializeNestedMessage() {
+    var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
+        .deserialize(null, getComplexProtobufMessage());
+    assertThat(deserialized.getResult()).isEqualTo("1: 5\n2: {\n  1: 10\n}\n");
+  }
+}