Pārlūkot izejas kodu

Avro (Embedded) serde implementation (#3266)

* Avro (Embedded) serde implementation

---------

Co-authored-by: iliax <ikuramshin@provectus.com>
Ilya Kuramshin 2 gadi atpakaļ
vecāks
revīzija
f2ec4d76de

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -9,6 +9,7 @@ import com.provectus.kafka.ui.config.ClustersProperties.SerdeConfig;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
 import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
@@ -43,6 +44,7 @@ public class SerdesInitializer {
             .put(Int64Serde.name(), Int64Serde.class)
             .put(UInt32Serde.name(), UInt32Serde.class)
             .put(UInt64Serde.name(), UInt64Serde.class)
+            .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
             .put(Base64Serde.name(), Base64Serde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
             .build(),

+ 72 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerde.java

@@ -0,0 +1,72 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.PropertyResolver;
+import com.provectus.kafka.ui.serde.api.RecordHeaders;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
+import java.util.Map;
+import java.util.Optional;
+import lombok.SneakyThrows;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableByteArrayInput;
+import org.apache.avro.generic.GenericDatumReader;
+
+public class AvroEmbeddedSerde implements BuiltInSerde {
+
+  public static String name() {
+    return "Avro (Embedded)";
+  }
+
+  @Override
+  public void configure(PropertyResolver serdeProperties,
+                        PropertyResolver kafkaClusterProperties,
+                        PropertyResolver globalProperties) {
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return false;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return new Deserializer() {
+      @SneakyThrows
+      @Override
+      public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
+        try (var reader = new DataFileReader<>(new SeekableByteArrayInput(data), new GenericDatumReader<>())) {
+          if (!reader.hasNext()) {
+            // this is very strange situation, when only header present in payload
+            // returning null in this case
+            return new DeserializeResult(null, DeserializeResult.Type.JSON, Map.of());
+          }
+          Object avroObj = reader.next();
+          String jsonValue = new String(AvroSchemaUtils.toJson(avroObj));
+          return new DeserializeResult(jsonValue, DeserializeResult.Type.JSON, Map.of());
+        }
+      }
+    };
+  }
+}

+ 92 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/AvroEmbeddedSerdeTest.java

@@ -0,0 +1,92 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+class AvroEmbeddedSerdeTest {
+
+  private AvroEmbeddedSerde avroEmbeddedSerde;
+
+  @BeforeEach
+  void init() {
+    avroEmbeddedSerde = new AvroEmbeddedSerde();
+    avroEmbeddedSerde.configure(
+        PropertyResolverImpl.empty(),
+        PropertyResolverImpl.empty(),
+        PropertyResolverImpl.empty()
+    );
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canDeserializeReturnsTrueForAllTargets(Serde.Target target) {
+    assertThat(avroEmbeddedSerde.canDeserialize("anyTopic", target))
+        .isTrue();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canSerializeReturnsFalseForAllTargets(Serde.Target target) {
+    assertThat(avroEmbeddedSerde.canSerialize("anyTopic", target))
+        .isFalse();
+  }
+
+  @Test
+  void deserializerParsesAvroDataWithEmbeddedSchema() throws Exception {
+    Schema schema = new Schema.Parser().parse("""
+        {
+          "type": "record",
+          "name": "TestAvroRecord",
+          "fields": [
+            { "name": "field1", "type": "string" },
+            { "name": "field2", "type": "int" }
+          ]
+        }
+        """
+    );
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("field1", "this is test msg");
+    record.put("field2", 100500);
+
+    String jsonRecord = new String(AvroSchemaUtils.toJson(record));
+    byte[] serializedRecordBytes = serializeAvroWithEmbeddedSchema(record);
+
+    var deserializer = avroEmbeddedSerde.deserializer("anyTopic", Serde.Target.KEY);
+    DeserializeResult result = deserializer.deserialize(null, serializedRecordBytes);
+    assertThat(result.getType()).isEqualTo(DeserializeResult.Type.JSON);
+    assertThat(result.getAdditionalProperties()).isEmpty();
+    assertJsonEquals(jsonRecord, result.getResult());
+  }
+
+  private void assertJsonEquals(String expected, String actual) throws IOException {
+    var mapper = new JsonMapper();
+    assertThat(mapper.readTree(actual)).isEqualTo(mapper.readTree(expected));
+  }
+
+  private byte[] serializeAvroWithEmbeddedSchema(GenericRecord record) throws IOException {
+    try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<>());
+         ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      writer.create(record.getSchema(), baos);
+      writer.append(record);
+      writer.flush();
+      return baos.toByteArray();
+    }
+  }
+
+}