소스 검색

BE: Implement Hex serde (#4074)

Co-authored-by: iliax <ikuramshin@provectus.com>
Ilya Kuramshin 1 년 전
부모
커밋
2db89593a7

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -12,6 +12,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
 import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
+import com.provectus.kafka.ui.serdes.builtin.HexSerde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
@@ -47,6 +48,7 @@ public class SerdesInitializer {
             .put(UInt64Serde.name(), UInt64Serde.class)
             .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
             .put(Base64Serde.name(), Base64Serde.class)
+            .put(HexSerde.name(), HexSerde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
             .build(),
         new CustomSerdeLoader()

+ 80 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/HexSerde.java

@@ -0,0 +1,80 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.PropertyResolver;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.util.HexFormat;
+import java.util.Map;
+import java.util.Optional;
+
+public class HexSerde implements BuiltInSerde {
+
+  private HexFormat deserializeHexFormat;
+
+  public static String name() {
+    return "Hex";
+  }
+
+  @Override
+  public void configure(PropertyResolver serdeProperties,
+                        PropertyResolver kafkaClusterProperties,
+                        PropertyResolver globalProperties) {
+    String delim = serdeProperties.getProperty("delimiter", String.class).orElse(" ");
+    boolean uppercase = serdeProperties.getProperty("uppercase", Boolean.class).orElse(true);
+    deserializeHexFormat = HexFormat.ofDelimiter(delim);
+    if (uppercase) {
+      deserializeHexFormat = deserializeHexFormat.withUpperCase();
+    }
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    return input -> {
+      input = input.trim();
+      // it is a hack to provide ability to sent empty array as a key/value
+      if (input.length() == 0) {
+        return new byte[] {};
+      }
+      return HexFormat.of().parseHex(prepareInputForParse(input));
+    };
+  }
+
+  // removing most-common delimiters and prefixes
+  private static String prepareInputForParse(String input) {
+    return input
+        .replaceAll(" ", "")
+        .replaceAll("#", "")
+        .replaceAll(":", "");
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return (headers, data) ->
+        new DeserializeResult(
+            deserializeHexFormat.formatHex(data),
+            DeserializeResult.Type.STRING,
+            Map.of()
+        );
+  }
+}

+ 84 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/HexSerdeTest.java

@@ -0,0 +1,84 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
+import com.provectus.kafka.ui.serdes.RecordHeadersImpl;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
+
+public class HexSerdeTest {
+
+  private static final byte[] TEST_BYTES = "hello world".getBytes();
+  private static final String TEST_BYTES_HEX_ENCODED = "68 65 6C 6C 6F 20 77 6F 72 6C 64";
+
+  private Serde hexSerde;
+
+  @BeforeEach
+  void init() {
+    hexSerde = new HexSerde();
+    hexSerde.configure(
+        PropertyResolverImpl.empty(),
+        PropertyResolverImpl.empty(),
+        PropertyResolverImpl.empty()
+    );
+  }
+
+
+  @ParameterizedTest
+  @CsvSource({
+      "68656C6C6F20776F726C64", // uppercase
+      "68656c6c6f20776f726c64", // lowercase
+      "68:65:6c:6c:6f:20:77:6f:72:6c:64", // ':' delim
+      "68 65 6C 6C 6F 20 77 6F 72 6C 64", // space delim, UC
+      "68 65 6c 6c 6f 20 77 6f 72 6c 64", // space delim, LC
+      "#68 #65 #6C #6C #6F #20 #77 #6F #72 #6C #64"  // '#' prefix, space delim
+  })
+  void serializesInputAsHexString(String hexString) {
+    for (Serde.Target type : Serde.Target.values()) {
+      var serializer = hexSerde.serializer("anyTopic", type);
+      byte[] bytes = serializer.serialize(hexString);
+      assertThat(bytes).isEqualTo(TEST_BYTES);
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void serializesEmptyStringAsEmptyBytesArray(Serde.Target type) {
+    var serializer = hexSerde.serializer("anyTopic", type);
+    byte[] bytes = serializer.serialize("");
+    assertThat(bytes).isEqualTo(new byte[] {});
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void deserializesDataAsHexBytes(Serde.Target type) {
+    var deserializer = hexSerde.deserializer("anyTopic", type);
+    var result = deserializer.deserialize(new RecordHeadersImpl(), TEST_BYTES);
+    assertThat(result.getResult()).isEqualTo(TEST_BYTES_HEX_ENCODED);
+    assertThat(result.getType()).isEqualTo(DeserializeResult.Type.STRING);
+    assertThat(result.getAdditionalProperties()).isEmpty();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void getSchemaReturnsEmpty(Serde.Target type) {
+    assertThat(hexSerde.getSchema("anyTopic", type)).isEmpty();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canDeserializeReturnsTrueForAllInputs(Serde.Target type) {
+    assertThat(hexSerde.canDeserialize("anyTopic", type)).isTrue();
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void canSerializeReturnsTrueForAllInput(Serde.Target type) {
+    assertThat(hexSerde.canSerialize("anyTopic", type)).isTrue();
+  }
+}