new conversion tests added

This commit is contained in:
iliax 2023-05-10 22:19:08 +04:00
parent 6367f44a90
commit 152b088771
3 changed files with 567 additions and 357 deletions

View file

@ -189,11 +189,8 @@ public class JsonAvroConversion {
yield node; yield node;
} }
case MAP -> { case MAP -> {
var map = (Map<Utf8, Object>) obj;
ObjectNode node = MAPPER.createObjectNode(); ObjectNode node = MAPPER.createObjectNode();
map.forEach((k, v) -> { ((Map) obj).forEach((k, v) -> node.set(k.toString(), convertAvroToJson(v, avroSchema.getValueType())));
node.set(k.toString(), convertAvroToJson(v, avroSchema.getValueType()));
});
yield node; yield node;
} }
case ARRAY -> { case ARRAY -> {
@ -206,8 +203,6 @@ public class JsonAvroConversion {
yield new TextNode(obj.toString()); yield new TextNode(obj.toString());
} }
case UNION -> { case UNION -> {
//TODO: cover with tests
// non-null case
ObjectNode node = MAPPER.createObjectNode(); ObjectNode node = MAPPER.createObjectNode();
int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj); int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj);
Schema unionType = avroSchema.getTypes().get(unionIdx); Schema unionType = avroSchema.getTypes().get(unionIdx);
@ -232,25 +227,17 @@ public class JsonAvroConversion {
} }
yield new IntNode((Integer) obj); yield new IntNode((Integer) obj);
} }
case FLOAT -> { case FLOAT -> new FloatNode((Float) obj);
yield new FloatNode((Float) obj); case DOUBLE -> new DoubleNode((Double) obj);
} case BOOLEAN -> BooleanNode.valueOf((Boolean) obj);
case DOUBLE -> { case NULL -> NullNode.getInstance();
yield new DoubleNode((Double) obj);
}
case BOOLEAN -> {
yield BooleanNode.valueOf((Boolean) obj);
}
case NULL -> {
yield NullNode.getInstance();
}
case BYTES -> { case BYTES -> {
if (isLogicalType(avroSchema)) { if (isLogicalType(avroSchema)) {
yield processLogicalType(obj, avroSchema); yield processLogicalType(obj, avroSchema);
} }
//TODO: check with tests //TODO: check with tests
byte[] bytes = (byte[]) obj; ByteBuffer bytes = (ByteBuffer) obj;
yield new TextNode(new String(bytes)); //TODO: encoding yield new TextNode(new String(bytes.array()));
} }
case FIXED -> { case FIXED -> {
if (isLogicalType(avroSchema)) { if (isLogicalType(avroSchema)) {
@ -263,27 +250,26 @@ public class JsonAvroConversion {
} }
private static Object processLogicalType(JsonNode node, Schema schema) { private static Object processLogicalType(JsonNode node, Schema schema) {
String logicalTypeName = schema.getLogicalType().getName(); return findConversion(schema)
var conversion = Stream.of(LogicalTypeConversion.values())
.filter(t -> t.name.equalsIgnoreCase(logicalTypeName))
.findFirst();
return conversion
.map(c -> c.jsonToAvroConversion.apply(node, schema)) .map(c -> c.jsonToAvroConversion.apply(node, schema))
.orElseThrow(() -> .orElseThrow(() ->
new JsonToAvroConversionException("'%s' logical type is not supported" new JsonToAvroConversionException("'%s' logical type is not supported"
.formatted(logicalTypeName))); .formatted(schema.getLogicalType().getName())));
} }
private static JsonNode processLogicalType(Object obj, Schema schema) { private static JsonNode processLogicalType(Object obj, Schema schema) {
String logicalTypeName = schema.getLogicalType().getName(); return findConversion(schema)
var conversion = Stream.of(LogicalTypeConversion.values())
.filter(t -> t.name.equalsIgnoreCase(logicalTypeName))
.findFirst();
return conversion
.map(c -> c.avroToJsonConversion.apply(obj, schema)) .map(c -> c.avroToJsonConversion.apply(obj, schema))
.orElseThrow(() -> .orElseThrow(() ->
new JsonToAvroConversionException("'%s' logical type is not supported" new JsonToAvroConversionException("'%s' logical type is not supported"
.formatted(logicalTypeName))); .formatted(schema.getLogicalType().getName())));
}
private static Optional<LogicalTypeConversion> findConversion(Schema schema) {
String logicalTypeName = schema.getLogicalType().getName();
return Stream.of(LogicalTypeConversion.values())
.filter(t -> t.name.equalsIgnoreCase(logicalTypeName))
.findFirst();
} }
private static boolean isLogicalType(Schema schema) { private static boolean isLogicalType(Schema schema) {

View file

@ -6,8 +6,8 @@ import com.fasterxml.jackson.databind.json.JsonMapper;
import com.provectus.kafka.ui.serde.api.DeserializeResult; import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.SchemaDescription; import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serde.api.Serde; import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -212,13 +212,13 @@ class SchemaRegistrySerdeTest {
GenericDatumWriter<Object> writer = new GenericDatumWriter<>(schema.rawSchema()); GenericDatumWriter<Object> writer = new GenericDatumWriter<>(schema.rawSchema());
ByteArrayOutputStream output = new ByteArrayOutputStream(); ByteArrayOutputStream output = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(output, null); Encoder encoder = EncoderFactory.get().binaryEncoder(output, null);
writer.write(AvroSchemaUtils.toObject(json, schema), encoder); writer.write(JsonAvroConversion.convertJsonToAvro(json, schema.rawSchema()), encoder);
encoder.flush(); encoder.flush();
return output.toByteArray(); return output.toByteArray();
} }
@Test @Test
void fieldsRepresentationIsConsistentForSerializationAndDeserialization() throws Exception { void avroFieldsRepresentationIsConsistentForSerializationAndDeserialization() throws Exception {
AvroSchema schema = new AvroSchema( AvroSchema schema = new AvroSchema(
""" """
{ {
@ -268,6 +268,10 @@ class SchemaRegistrySerdeTest {
{ {
"name": "f_union", "name": "f_union",
"type": ["null", "string", "int" ] "type": ["null", "string", "int" ]
},
{
"name": "f_optional_to_test_not_filled_case",
"type": [ "null", "string"]
} }
] ]
}""" }"""
@ -288,18 +292,11 @@ class SchemaRegistrySerdeTest {
"""; """;
registryClient.register("test-value", schema); registryClient.register("test-value", schema);
assertSerdeCycle("test", jsonPayload);
byte[] serializedBytes = serde.serializer("test", Serde.Target.VALUE).serialize(jsonPayload);
var deserializedJson = serde.deserializer("test", Serde.Target.VALUE)
.deserialize(null, serializedBytes)
.getResult();
assertJsonsEqual(jsonPayload, deserializedJson);
} }
@Test @Test
void logicalTypesRepresentationIsConsistentForSerializationAndDeserialization() throws Exception { void avroLogicalTypesRepresentationIsConsistentForSerializationAndDeserialization() throws Exception {
AvroSchema schema = new AvroSchema( AvroSchema schema = new AvroSchema(
""" """
{ {
@ -361,14 +358,18 @@ class SchemaRegistrySerdeTest {
"""; """;
registryClient.register("test-value", schema); registryClient.register("test-value", schema);
assertSerdeCycle("test", jsonPayload);
}
byte[] serializedBytes = serde.serializer("test", Serde.Target.VALUE).serialize(jsonPayload); // 1. serialize input json to binary
// 2. deserialize from binary
var deserializedJson = serde.deserializer("test", Serde.Target.VALUE) // 3. check that deserialized version equal to input
void assertSerdeCycle(String topic, String jsonInput) {
byte[] serializedBytes = serde.serializer(topic, Serde.Target.VALUE).serialize(jsonInput);
var deserializedJson = serde.deserializer(topic, Serde.Target.VALUE)
.deserialize(null, serializedBytes) .deserialize(null, serializedBytes)
.getResult(); .getResult();
assertJsonsEqual(jsonInput, deserializedJson);
assertJsonsEqual(jsonPayload, deserializedJson);
} }
} }

View file

@ -1,8 +1,17 @@
package com.provectus.kafka.ui.util.jsonschema; package com.provectus.kafka.ui.util.jsonschema;
import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertJsonToAvro; import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertJsonToAvro;
import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertAvroToJson;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.FloatNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Instant; import java.time.Instant;
@ -11,355 +20,569 @@ import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import lombok.SneakyThrows;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
class JsonAvroConversionTest { class JsonAvroConversionTest {
@Test // checking conversion from json to KafkaAvroSerializer-compatible avro objects
void primitiveRoot() { @Nested
assertThat(convertJsonToAvro("\"str\"", createSchema("\"string\""))) class FromJsonToAvro {
.isEqualTo("str");
assertThat(convertJsonToAvro("123", createSchema("\"int\""))) @Test
.isEqualTo(123); void primitiveRoot() {
assertThat(convertJsonToAvro("\"str\"", createSchema("\"string\"")))
.isEqualTo("str");
assertThat(convertJsonToAvro("123", createSchema("\"long\""))) assertThat(convertJsonToAvro("123", createSchema("\"int\"")))
.isEqualTo(123L); .isEqualTo(123);
assertThat(convertJsonToAvro("123.123", createSchema("\"float\""))) assertThat(convertJsonToAvro("123", createSchema("\"long\"")))
.isEqualTo(123.123F); .isEqualTo(123L);
assertThat(convertJsonToAvro("12345.12345", createSchema("\"double\""))) assertThat(convertJsonToAvro("123.123", createSchema("\"float\"")))
.isEqualTo(12345.12345); .isEqualTo(123.123F);
}
@Test assertThat(convertJsonToAvro("12345.12345", createSchema("\"double\"")))
void primitiveTypedFields() { .isEqualTo(12345.12345);
var schema = createSchema( }
"""
{ @Test
"type": "record", void primitiveTypedFields() {
"name": "TestAvroRecord", var schema = createSchema(
"fields": [ """
{ {
"name": "f_int", "type": "record",
"type": "int" "name": "TestAvroRecord",
}, "fields": [
{ {
"name": "f_long", "name": "f_int",
"type": "long" "type": "int"
}, },
{ {
"name": "f_string", "name": "f_long",
"type": "string" "type": "long"
}, },
{ {
"name": "f_boolean", "name": "f_string",
"type": "boolean" "type": "string"
}, },
{ {
"name": "f_float", "name": "f_boolean",
"type": "float" "type": "boolean"
}, },
{ {
"name": "f_double", "name": "f_float",
"type": "double" "type": "float"
}, },
{ {
"name": "f_enum", "name": "f_double",
"type" : { "type": "double"
"type": "enum", },
"name": "Suit", {
"symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] "name": "f_enum",
"type" : {
"type": "enum",
"name": "Suit",
"symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}
} }
} ]
] }"""
}""" );
);
String jsonPayload = """ String jsonPayload = """
{ {
"f_int": 123, "f_int": 123,
"f_long": 4294967294, "f_long": 4294967294,
"f_string": "string here", "f_string": "string here",
"f_boolean": true, "f_boolean": true,
"f_float": 123.1, "f_float": 123.1,
"f_double": 123456.123456, "f_double": 123456.123456,
"f_enum": "SPADES" "f_enum": "SPADES"
} }
"""; """;
var converted = convertJsonToAvro(jsonPayload, schema); var converted = convertJsonToAvro(jsonPayload, schema);
assertThat(converted).isInstanceOf(GenericData.Record.class); assertThat(converted).isInstanceOf(GenericData.Record.class);
var record = (GenericData.Record) converted; var record = (GenericData.Record) converted;
assertThat(record.get("f_int")).isEqualTo(123); assertThat(record.get("f_int")).isEqualTo(123);
assertThat(record.get("f_long")).isEqualTo(4294967294L); assertThat(record.get("f_long")).isEqualTo(4294967294L);
assertThat(record.get("f_string")).isEqualTo("string here"); assertThat(record.get("f_string")).isEqualTo("string here");
assertThat(record.get("f_boolean")).isEqualTo(true); assertThat(record.get("f_boolean")).isEqualTo(true);
assertThat(record.get("f_float")).isEqualTo(123.1f); assertThat(record.get("f_float")).isEqualTo(123.1f);
assertThat(record.get("f_double")).isEqualTo(123456.123456); assertThat(record.get("f_double")).isEqualTo(123456.123456);
assertThat(record.get("f_enum")) assertThat(record.get("f_enum"))
.isEqualTo( .isEqualTo(
new GenericData.EnumSymbol( new GenericData.EnumSymbol(
schema.getField("f_enum").schema(), schema.getField("f_enum").schema(),
"SPADES" "SPADES"
) )
); );
} }
@Test @Test
void unionRoot() { void unionRoot() {
var sc = createSchema("[ \"null\", \"string\", \"int\" ]"); var schema = createSchema("[ \"null\", \"string\", \"int\" ]");
var converted = convertJsonToAvro("{\"string\":\"string here\"}", sc); var converted = convertJsonToAvro("{\"string\":\"string here\"}", schema);
assertThat(converted).isEqualTo("string here"); assertThat(converted).isEqualTo("string here");
converted = convertJsonToAvro("{\"int\": 123}", sc); converted = convertJsonToAvro("{\"int\": 123}", schema);
assertThat(converted).isEqualTo(123); assertThat(converted).isEqualTo(123);
converted = convertJsonToAvro("null", sc); converted = convertJsonToAvro("null", schema);
assertThat(converted).isEqualTo(null); assertThat(converted).isEqualTo(null);
} }
@Test @Test
void unionField() { void unionField() {
var schema = createSchema( var schema = createSchema(
""" """
{ {
"type": "record", "type": "record",
"name": "TestAvroRecord", "namespace": "com.test",
"fields": [ "name": "TestAvroRecord",
{ "fields": [
"name": "f_union", {
"type": [ "null", "int", "TestAvroRecord"] "name": "f_union",
} "type": [ "null", "int", "TestAvroRecord"]
]
}"""
);
String jsonPayload = "{ \"f_union\": null }";
var converted = convertJsonToAvro(jsonPayload, schema);
assertThat(converted).isInstanceOf(GenericData.Record.class);
var record = (GenericData.Record) converted;
assertThat(record.get("f_union")).isNull();
jsonPayload = "{ \"f_union\": { \"int\": 123 } }";
record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
assertThat(record.get("f_union")).isEqualTo(123);
jsonPayload = "{ \"f_union\": { \"TestAvroRecord\": { \"f_union\": { \"int\": 123 } } } }";
record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class);
var innerRec = (GenericData.Record) record.get("f_union");
assertThat(innerRec.get("f_union")).isEqualTo(123);
}
@Test
void mapField() {
var schema = createSchema(
"""
{
"type": "record",
"name": "TestAvroRecord",
"fields": [
{
"name": "long_map",
"type": {
"type": "map",
"values" : "long",
"default": {}
} }
}, ]
{ }"""
"name": "string_map", );
"type": {
"type": "map",
"values" : "string",
"default": {}
}
},
{
"name": "self_ref_map",
"type": {
"type": "map",
"values" : "TestAvroRecord",
"default": {}
}
}
]
}"""
);
String jsonPayload = """ String jsonPayload = "{ \"f_union\": null }";
{
"long_map": { var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
"k1": 123, assertThat(record.get("f_union")).isNull();
"k2": 456
}, jsonPayload = "{ \"f_union\": { \"int\": 123 } }";
"string_map": { record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
"k3": "s1", assertThat(record.get("f_union")).isEqualTo(123);
"k4": "s2"
}, //inner-record's name should be fully-qualified!
"self_ref_map": { jsonPayload = "{ \"f_union\": { \"com.test.TestAvroRecord\": { \"f_union\": { \"int\": 123 } } } }";
"k5" : { record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
"long_map": { "_k1": 222 }, assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class);
"string_map": { "_k2": "_s1" } var innerRec = (GenericData.Record) record.get("f_union");
assertThat(innerRec.get("f_union")).isEqualTo(123);
}
@Test
void mapField() {
var schema = createSchema(
"""
{
"type": "record",
"name": "TestAvroRecord",
"fields": [
{
"name": "long_map",
"type": {
"type": "map",
"values" : "long",
"default": {}
}
},
{
"name": "string_map",
"type": {
"type": "map",
"values" : "string",
"default": {}
}
},
{
"name": "self_ref_map",
"type": {
"type": "map",
"values" : "TestAvroRecord",
"default": {}
}
}
]
}"""
);
String jsonPayload = """
{
"long_map": {
"k1": 123,
"k2": 456
},
"string_map": {
"k3": "s1",
"k4": "s2"
},
"self_ref_map": {
"k5" : {
"long_map": { "_k1": 222 },
"string_map": { "_k2": "_s1" }
}
} }
} }
} """;
""";
var converted = convertJsonToAvro(jsonPayload, schema); var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
assertThat(converted).isInstanceOf(GenericData.Record.class); assertThat(record.get("long_map"))
.isEqualTo(Map.of("k1", 123L, "k2", 456L));
assertThat(record.get("string_map"))
.isEqualTo(Map.of("k3", "s1", "k4", "s2"));
assertThat(record.get("self_ref_map"))
.isNotNull();
var record = (GenericData.Record) converted; Map<String, Object> selfRefMapField = (Map<String, Object>) record.get("self_ref_map");
assertThat(record.get("long_map")) assertThat(selfRefMapField)
.isEqualTo(Map.of("k1", 123L, "k2", 456L)); .hasSize(1)
assertThat(record.get("string_map")) .hasEntrySatisfying("k5", v -> {
.isEqualTo(Map.of("k3", "s1", "k4", "s2")); assertThat(v).isInstanceOf(GenericData.Record.class);
assertThat(record.get("self_ref_map")) var innerRec = (GenericData.Record) v;
.isNotNull(); assertThat(innerRec.get("long_map"))
.isEqualTo(Map.of("_k1", 222L));
assertThat(innerRec.get("string_map"))
.isEqualTo(Map.of("_k2", "_s1"));
});
}
Map<String, Object> selfRefMapField = (Map<String, Object>) record.get("self_ref_map"); @Test
assertThat(selfRefMapField) void arrayField() {
.hasSize(1) var schema = createSchema(
.hasEntrySatisfying("k5", v -> { """
assertThat(v).isInstanceOf(GenericData.Record.class); {
var innerRec = (GenericData.Record) v; "type": "record",
assertThat(innerRec.get("long_map")) "name": "TestAvroRecord",
.isEqualTo(Map.of("_k1", 222L)); "fields": [
assertThat(innerRec.get("string_map")) {
.isEqualTo(Map.of("_k2", "_s1")); "name": "f_array",
}); "type": {
"type": "array",
"items" : "string",
"default": []
}
}
]
}"""
);
String jsonPayload = """
{
"f_array": [ "e1", "e2" ]
}
""";
var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
assertThat(record.get("f_array")).isEqualTo(List.of("e1", "e2"));
}
@Test
void logicalTypesField() {
var schema = createSchema(
"""
{
"type": "record",
"name": "TestAvroRecord",
"fields": [
{
"name": "lt_date",
"type": { "type": "int", "logicalType": "date" }
},
{
"name": "lt_uuid",
"type": { "type": "string", "logicalType": "uuid" }
},
{
"name": "lt_decimal",
"type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 }
},
{
"name": "lt_time_millis",
"type": { "type": "int", "logicalType": "time-millis"}
},
{
"name": "lt_time_micros",
"type": { "type": "long", "logicalType": "time-micros"}
},
{
"name": "lt_timestamp_millis",
"type": { "type": "long", "logicalType": "timestamp-millis" }
},
{
"name": "lt_timestamp_micros",
"type": { "type": "long", "logicalType": "timestamp-micros" }
},
{
"name": "lt_local_timestamp_millis",
"type": { "type": "long", "logicalType": "local-timestamp-millis" }
},
{
"name": "lt_local_timestamp_micros",
"type": { "type": "long", "logicalType": "local-timestamp-micros" }
}
]
}"""
);
String jsonPayload = """
{
"lt_date":"1991-08-14",
"lt_decimal": 2.1617413862327545E11,
"lt_time_millis": "10:15:30.001",
"lt_time_micros": "10:15:30.123456",
"lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908",
"lt_timestamp_millis": "2007-12-03T10:15:30.123Z",
"lt_timestamp_micros": "2007-12-13T10:15:30.123456Z",
"lt_local_timestamp_millis": "2017-12-03T10:15:30.123",
"lt_local_timestamp_micros": "2017-12-13T10:15:30.123456"
}
""";
var converted = convertJsonToAvro(jsonPayload, schema);
assertThat(converted).isInstanceOf(GenericData.Record.class);
var record = (GenericData.Record) converted;
assertThat(record.get("lt_date"))
.isEqualTo(LocalDate.of(1991, 8, 14));
assertThat(record.get("lt_decimal"))
.isEqualTo(new BigDecimal("2.1617413862327545E11"));
assertThat(record.get("lt_time_millis"))
.isEqualTo(LocalTime.parse("10:15:30.001"));
assertThat(record.get("lt_time_micros"))
.isEqualTo(LocalTime.parse("10:15:30.123456"));
assertThat(record.get("lt_timestamp_millis"))
.isEqualTo(Instant.parse("2007-12-03T10:15:30.123Z"));
assertThat(record.get("lt_timestamp_micros"))
.isEqualTo(Instant.parse("2007-12-13T10:15:30.123456Z"));
assertThat(record.get("lt_local_timestamp_millis"))
.isEqualTo(LocalDateTime.parse("2017-12-03T10:15:30.123"));
assertThat(record.get("lt_local_timestamp_micros"))
.isEqualTo(LocalDateTime.parse("2017-12-13T10:15:30.123456"));
}
} }
@Test // checking conversion of KafkaAvroDeserializer output to JsonNode
void arrayField() { @Nested
var schema = createSchema( class FromAvroToJson {
"""
{
"type": "record",
"name": "TestAvroRecord",
"fields": [
{
"name": "f_array",
"type": {
"type": "array",
"items" : "string",
"default": []
}
}
]
}"""
);
String jsonPayload = """ @Test
{ void primitiveRoot() {
"f_array": [ "e1", "e2" ] assertThat(convertAvroToJson("str", createSchema("\"string\"")))
} .isEqualTo(new TextNode("str"));
""";
var converted = convertJsonToAvro(jsonPayload, schema); assertThat(convertAvroToJson(123, createSchema("\"int\"")))
assertThat(converted).isInstanceOf(GenericData.Record.class); .isEqualTo(new IntNode(123));
var record = (GenericData.Record) converted; assertThat(convertAvroToJson(123L, createSchema("\"long\"")))
assertThat(record.get("f_array")).isEqualTo(List.of("e1", "e2")); .isEqualTo(new LongNode(123));
}
@Test assertThat(convertAvroToJson(123.1F, createSchema("\"float\"")))
void logicalTypesField() { .isEqualTo(new FloatNode(123.1F));
var schema = createSchema(
"""
{
"type": "record",
"name": "TestAvroRecord",
"fields": [
{
"name": "lt_date",
"type": { "type": "int", "logicalType": "date" }
},
{
"name": "lt_uuid",
"type": { "type": "string", "logicalType": "uuid" }
},
{
"name": "lt_decimal",
"type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 }
},
{
"name": "lt_time_millis",
"type": { "type": "int", "logicalType": "time-millis"}
},
{
"name": "lt_time_micros",
"type": { "type": "long", "logicalType": "time-micros"}
},
{
"name": "lt_timestamp_millis",
"type": { "type": "long", "logicalType": "timestamp-millis" }
},
{
"name": "lt_timestamp_micros",
"type": { "type": "long", "logicalType": "timestamp-micros" }
},
{
"name": "lt_local_timestamp_millis",
"type": { "type": "long", "logicalType": "local-timestamp-millis" }
},
{
"name": "lt_local_timestamp_micros",
"type": { "type": "long", "logicalType": "local-timestamp-micros" }
}
]
}"""
);
String jsonPayload = """ assertThat(convertAvroToJson(123.1, createSchema("\"double\"")))
{ .isEqualTo(new DoubleNode(123.1));
"lt_date":"1991-08-14",
"lt_decimal": 2.1617413862327545E11,
"lt_time_millis": "10:15:30.001",
"lt_time_micros": "10:15:30.123456",
"lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908",
"lt_timestamp_millis": "2007-12-03T10:15:30.123Z",
"lt_timestamp_micros": "2007-12-13T10:15:30.123456Z",
"lt_local_timestamp_millis": "2017-12-03T10:15:30.123",
"lt_local_timestamp_micros": "2017-12-13T10:15:30.123456"
}
""";
var converted = convertJsonToAvro(jsonPayload, schema); assertThat(convertAvroToJson(true, createSchema("\"boolean\"")))
assertThat(converted).isInstanceOf(GenericData.Record.class); .isEqualTo(BooleanNode.valueOf(true));
}
var record = (GenericData.Record) converted; @SneakyThrows
@Test
void primitiveTypedFields() {
var schema = createSchema(
"""
{
"type": "record",
"name": "TestAvroRecord",
"fields": [
{
"name": "f_int",
"type": "int"
},
{
"name": "f_long",
"type": "long"
},
{
"name": "f_string",
"type": "string"
},
{
"name": "f_boolean",
"type": "boolean"
},
{
"name": "f_float",
"type": "float"
},
{
"name": "f_double",
"type": "double"
},
{
"name": "f_enum",
"type" : {
"type": "enum",
"name": "Suit",
"symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}
}
]
}"""
);
GenericData.Record inputRecord = new GenericData.Record(schema);
inputRecord.put("f_int", 123);
inputRecord.put("f_long", 4294967294L);
inputRecord.put("f_string", "string here");
inputRecord.put("f_boolean", true);
inputRecord.put("f_float", 123.1f);
inputRecord.put("f_double", 123456.123456);
inputRecord.put("f_enum", new GenericData.EnumSymbol(schema.getField("f_enum").schema(), "SPADES"));
String expectedJson = """
{
"f_int": 123,
"f_long": 4294967294,
"f_string": "string here",
"f_boolean": true,
"f_float": 123.1,
"f_double": 123456.123456,
"f_enum": "SPADES"
}
""";
assertJsonsEqual(expectedJson, convertAvroToJson(inputRecord, schema));
}
@Test
void logicalTypesField() {
var schema = createSchema(
"""
{
"type": "record",
"name": "TestAvroRecord",
"fields": [
{
"name": "lt_date",
"type": { "type": "int", "logicalType": "date" }
},
{
"name": "lt_uuid",
"type": { "type": "string", "logicalType": "uuid" }
},
{
"name": "lt_decimal",
"type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 }
},
{
"name": "lt_time_millis",
"type": { "type": "int", "logicalType": "time-millis"}
},
{
"name": "lt_time_micros",
"type": { "type": "long", "logicalType": "time-micros"}
},
{
"name": "lt_timestamp_millis",
"type": { "type": "long", "logicalType": "timestamp-millis" }
},
{
"name": "lt_timestamp_micros",
"type": { "type": "long", "logicalType": "timestamp-micros" }
},
{
"name": "lt_local_timestamp_millis",
"type": { "type": "long", "logicalType": "local-timestamp-millis" }
},
{
"name": "lt_local_timestamp_micros",
"type": { "type": "long", "logicalType": "local-timestamp-micros" }
}
]
}"""
);
GenericData.Record inputRecord = new GenericData.Record(schema);
inputRecord.put("lt_date", LocalDate.of(1991, 8, 14));
inputRecord.put("lt_decimal", new BigDecimal("2.1617413862327545E11"));
inputRecord.put("lt_time_millis", LocalTime.parse("10:15:30.001"));
inputRecord.put("lt_time_micros", LocalTime.parse("10:15:30.123456"));
inputRecord.put("lt_uuid", UUID.fromString("a37b75ca-097c-5d46-6119-f0637922e908"));
inputRecord.put("lt_timestamp_millis", Instant.parse("2007-12-03T10:15:30.123Z"));
inputRecord.put("lt_timestamp_micros", Instant.parse("2007-12-13T10:15:30.123456Z"));
inputRecord.put("lt_local_timestamp_millis", LocalDateTime.parse("2017-12-03T10:15:30.123"));
inputRecord.put("lt_local_timestamp_micros", LocalDateTime.parse("2017-12-13T10:15:30.123456"));
String expectedJson = """
{
"lt_date":"1991-08-14",
"lt_decimal": 2.1617413862327545E11,
"lt_time_millis": "10:15:30.001",
"lt_time_micros": "10:15:30.123456",
"lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908",
"lt_timestamp_millis": "2007-12-03T10:15:30.123Z",
"lt_timestamp_micros": "2007-12-13T10:15:30.123456Z",
"lt_local_timestamp_millis": "2017-12-03T10:15:30.123",
"lt_local_timestamp_micros": "2017-12-13T10:15:30.123456"
}
""";
assertJsonsEqual(expectedJson, convertAvroToJson(inputRecord, schema));
}
@Test
void unionField() {
var schema = createSchema(
"""
{
"type": "record",
"namespace": "com.test",
"name": "TestAvroRecord",
"fields": [
{
"name": "f_union",
"type": [ "null", "int", "TestAvroRecord"]
}
]
}"""
);
var r = new GenericData.Record(schema);
r.put("f_union", null);
assertJsonsEqual(" {}", convertAvroToJson(r, schema));
r = new GenericData.Record(schema);
r.put("f_union", 123);
assertJsonsEqual(" { \"f_union\" : { \"int\" : 123 } }", convertAvroToJson(r, schema));
r = new GenericData.Record(schema);
var innerRec = new GenericData.Record(schema);
innerRec.put("f_union", 123);
r.put("f_union", innerRec);
assertJsonsEqual(
" { \"f_union\" : { \"com.test.TestAvroRecord\" : { \"f_union\" : { \"int\" : 123 } } } }",
convertAvroToJson(r, schema)
);
}
assertThat(record.get("lt_date"))
.isEqualTo(LocalDate.of(1991, 8, 14));
assertThat(record.get("lt_decimal"))
.isEqualTo(new BigDecimal("2.1617413862327545E11"));
assertThat(record.get("lt_time_millis"))
.isEqualTo(LocalTime.parse("10:15:30.001"));
assertThat(record.get("lt_time_micros"))
.isEqualTo(LocalTime.parse("10:15:30.123456"));
assertThat(record.get("lt_timestamp_millis"))
.isEqualTo(Instant.parse("2007-12-03T10:15:30.123Z"));
assertThat(record.get("lt_timestamp_micros"))
.isEqualTo(Instant.parse("2007-12-13T10:15:30.123456Z"));
assertThat(record.get("lt_local_timestamp_millis"))
.isEqualTo(LocalDateTime.parse("2017-12-03T10:15:30.123"));
assertThat(record.get("lt_local_timestamp_micros"))
.isEqualTo(LocalDateTime.parse("2017-12-13T10:15:30.123456"));
} }
private Schema createSchema(String schema) { private Schema createSchema(String schema) {
return new AvroSchema(schema).rawSchema(); return new AvroSchema(schema).rawSchema();
} }
@SneakyThrows
private void assertJsonsEqual(String expected, JsonNode actual) {
var mapper = new JsonMapper();
assertThat(actual.toPrettyString())
.isEqualTo(mapper.readTree(expected).toPrettyString());
}
} }