Merge branch 'master' into controllers_structure_refactor

This commit is contained in:
Ilya Kuramshin 2023-08-11 14:12:20 +04:00 committed by GitHub
commit 144c5860cc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 169 additions and 0 deletions

View file

@ -16,6 +16,7 @@ import com.provectus.kafka.ui.serdes.builtin.HexSerde;
import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde;
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
@ -50,6 +51,7 @@ public class SerdesInitializer {
.put(Base64Serde.name(), Base64Serde.class)
.put(HexSerde.name(), HexSerde.class)
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
.put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
.build(),
new CustomSerdeLoader()
);

View file

@ -0,0 +1,59 @@
package com.provectus.kafka.ui.serdes.builtin;
import com.google.protobuf.UnknownFieldSet;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import java.util.Map;
import java.util.Optional;
import lombok.SneakyThrows;
public class ProtobufRawSerde implements BuiltInSerde {
public static String name() {
return "ProtobufDecodeRaw";
}
@Override
public Optional<String> getDescription() {
return Optional.empty();
}
@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
return Optional.empty();
}
@Override
public boolean canSerialize(String topic, Target type) {
return false;
}
@Override
public boolean canDeserialize(String topic, Target type) {
return true;
}
@Override
public Serializer serializer(String topic, Target type) {
throw new UnsupportedOperationException();
}
@Override
public Deserializer deserializer(String topic, Target type) {
return new Deserializer() {
@SneakyThrows
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
try {
UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data);
return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of());
} catch (Exception e) {
throw new ValidationException(e.getMessage());
}
}
};
}
}

View file

@ -0,0 +1,108 @@
package com.provectus.kafka.ui.serdes.builtin;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.serde.api.Serde;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class ProtobufRawSerdeTest {
private static final String DUMMY_TOPIC = "dummy-topic";
private ProtobufRawSerde serde;
@BeforeEach
void init() {
serde = new ProtobufRawSerde();
}
@SneakyThrows
ProtobufSchema getSampleSchema() {
return new ProtobufSchema(
"""
syntax = "proto3";
message Message1 {
int32 my_field = 1;
}
"""
);
}
@SneakyThrows
private byte[] getProtobufMessage() {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleSchema().toDescriptor("Message1"));
builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
return builder.build().toByteArray();
}
@Test
void deserializeSimpleMessage() {
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
.deserialize(null, getProtobufMessage());
assertThat(deserialized.getResult()).isEqualTo("1: 5\n");
}
@Test
void deserializeEmptyMessage() {
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
.deserialize(null, new byte[0]);
assertThat(deserialized.getResult()).isEqualTo("");
}
@Test
void deserializeInvalidMessage() {
var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
assertThatThrownBy(() -> deserializer.deserialize(null, new byte[] { 1, 2, 3 }))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Protocol message contained an invalid tag");
}
@Test
void deserializeNullMessage() {
var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
assertThatThrownBy(() -> deserializer.deserialize(null, null))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Cannot read the array length");
}
ProtobufSchema getSampleNestedSchema() {
return new ProtobufSchema(
"""
syntax = "proto3";
message Message2 {
int32 my_nested_field = 1;
}
message Message1 {
int32 my_field = 1;
Message2 my_nested_message = 2;
}
"""
);
}
@SneakyThrows
private byte[] getComplexProtobufMessage() {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message1"));
builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message2"));
nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10);
builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build());
return builder.build().toByteArray();
}
@Test
void deserializeNestedMessage() {
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
.deserialize(null, getComplexProtobufMessage());
assertThat(deserialized.getResult()).isEqualTo("1: 5\n2: {\n 1: 10\n}\n");
}
}