From 152b088771891f7840c72b8608c72ea8d83d2594 Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 10 May 2023 22:19:08 +0400 Subject: [PATCH] new conversion tests added --- .../util/jsonschema/JsonAvroConversion.java | 50 +- .../builtin/sr/SchemaRegistrySerdeTest.java | 35 +- .../jsonschema/JsonAvroConversionTest.java | 839 +++++++++++------- 3 files changed, 567 insertions(+), 357 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java index 4f75823f02..e446152052 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java @@ -189,11 +189,8 @@ public class JsonAvroConversion { yield node; } case MAP -> { - var map = (Map) obj; ObjectNode node = MAPPER.createObjectNode(); - map.forEach((k, v) -> { - node.set(k.toString(), convertAvroToJson(v, avroSchema.getValueType())); - }); + ((Map) obj).forEach((k, v) -> node.set(k.toString(), convertAvroToJson(v, avroSchema.getValueType()))); yield node; } case ARRAY -> { @@ -206,8 +203,6 @@ public class JsonAvroConversion { yield new TextNode(obj.toString()); } case UNION -> { - //TODO: cover with tests - // non-null case ObjectNode node = MAPPER.createObjectNode(); int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj); Schema unionType = avroSchema.getTypes().get(unionIdx); @@ -232,25 +227,17 @@ public class JsonAvroConversion { } yield new IntNode((Integer) obj); } - case FLOAT -> { - yield new FloatNode((Float) obj); - } - case DOUBLE -> { - yield new DoubleNode((Double) obj); - } - case BOOLEAN -> { - yield BooleanNode.valueOf((Boolean) obj); - } - case NULL -> { - yield NullNode.getInstance(); - } + case FLOAT -> new FloatNode((Float) obj); + case DOUBLE -> new DoubleNode((Double) obj); + case BOOLEAN -> BooleanNode.valueOf((Boolean) obj); + case NULL -> NullNode.getInstance(); case BYTES -> { if (isLogicalType(avroSchema)) { yield processLogicalType(obj, avroSchema); } //TODO: check with tests - byte[] bytes = (byte[]) obj; - yield new TextNode(new String(bytes)); //TODO: encoding + ByteBuffer bytes = (ByteBuffer) obj; + yield new TextNode(new String(bytes.array())); } case FIXED -> { if (isLogicalType(avroSchema)) { @@ -263,27 +250,26 @@ public class JsonAvroConversion { } private static Object processLogicalType(JsonNode node, Schema schema) { - String logicalTypeName = schema.getLogicalType().getName(); - var conversion = Stream.of(LogicalTypeConversion.values()) - .filter(t -> t.name.equalsIgnoreCase(logicalTypeName)) - .findFirst(); - return conversion + return findConversion(schema) .map(c -> c.jsonToAvroConversion.apply(node, schema)) .orElseThrow(() -> new JsonToAvroConversionException("'%s' logical type is not supported" - .formatted(logicalTypeName))); + .formatted(schema.getLogicalType().getName()))); } private static JsonNode processLogicalType(Object obj, Schema schema) { - String logicalTypeName = schema.getLogicalType().getName(); - var conversion = Stream.of(LogicalTypeConversion.values()) - .filter(t -> t.name.equalsIgnoreCase(logicalTypeName)) - .findFirst(); - return conversion + return findConversion(schema) .map(c -> c.avroToJsonConversion.apply(obj, schema)) .orElseThrow(() -> new JsonToAvroConversionException("'%s' logical type is not supported" - .formatted(logicalTypeName))); + .formatted(schema.getLogicalType().getName()))); + } + + private static Optional findConversion(Schema schema) { + String logicalTypeName = schema.getLogicalType().getName(); + return Stream.of(LogicalTypeConversion.values()) + .filter(t -> t.name.equalsIgnoreCase(logicalTypeName)) + .findFirst(); } private static boolean isLogicalType(Schema schema) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index 78f7e4819e..fb738efc3c 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -6,8 +6,8 @@ import com.fasterxml.jackson.databind.json.JsonMapper; import com.provectus.kafka.ui.serde.api.DeserializeResult; import com.provectus.kafka.ui.serde.api.SchemaDescription; import com.provectus.kafka.ui.serde.api.Serde; +import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion; import io.confluent.kafka.schemaregistry.avro.AvroSchema; -import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import java.io.ByteArrayOutputStream; @@ -212,13 +212,13 @@ class SchemaRegistrySerdeTest { GenericDatumWriter writer = new GenericDatumWriter<>(schema.rawSchema()); ByteArrayOutputStream output = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(output, null); - writer.write(AvroSchemaUtils.toObject(json, schema), encoder); + writer.write(JsonAvroConversion.convertJsonToAvro(json, schema.rawSchema()), encoder); encoder.flush(); return output.toByteArray(); } @Test - void fieldsRepresentationIsConsistentForSerializationAndDeserialization() throws Exception { + void avroFieldsRepresentationIsConsistentForSerializationAndDeserialization() throws Exception { AvroSchema schema = new AvroSchema( """ { @@ -268,6 +268,10 @@ class SchemaRegistrySerdeTest { { "name": "f_union", "type": ["null", "string", "int" ] + }, + { + "name": "f_optional_to_test_not_filled_case", + "type": [ "null", "string"] } ] }""" @@ -288,18 +292,11 @@ class SchemaRegistrySerdeTest { """; registryClient.register("test-value", schema); - - byte[] serializedBytes = serde.serializer("test", Serde.Target.VALUE).serialize(jsonPayload); - - var deserializedJson = serde.deserializer("test", Serde.Target.VALUE) - .deserialize(null, serializedBytes) - .getResult(); - - assertJsonsEqual(jsonPayload, deserializedJson); + assertSerdeCycle("test", jsonPayload); } @Test - void logicalTypesRepresentationIsConsistentForSerializationAndDeserialization() throws Exception { + void avroLogicalTypesRepresentationIsConsistentForSerializationAndDeserialization() throws Exception { AvroSchema schema = new AvroSchema( """ { @@ -361,14 +358,18 @@ class SchemaRegistrySerdeTest { """; registryClient.register("test-value", schema); + assertSerdeCycle("test", jsonPayload); + } - byte[] serializedBytes = serde.serializer("test", Serde.Target.VALUE).serialize(jsonPayload); - - var deserializedJson = serde.deserializer("test", Serde.Target.VALUE) + // 1. serialize input json to binary + // 2. deserialize from binary + // 3. check that deserialized version equal to input + void assertSerdeCycle(String topic, String jsonInput) { + byte[] serializedBytes = serde.serializer(topic, Serde.Target.VALUE).serialize(jsonInput); + var deserializedJson = serde.deserializer(topic, Serde.Target.VALUE) .deserialize(null, serializedBytes) .getResult(); - - assertJsonsEqual(jsonPayload, deserializedJson); + assertJsonsEqual(jsonInput, deserializedJson); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java index 9b72684a81..3fabad3f49 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java @@ -1,8 +1,17 @@ package com.provectus.kafka.ui.util.jsonschema; import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertJsonToAvro; +import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertAvroToJson; import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.node.BooleanNode; +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.FloatNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.LongNode; +import com.fasterxml.jackson.databind.node.TextNode; import io.confluent.kafka.schemaregistry.avro.AvroSchema; import java.math.BigDecimal; import java.time.Instant; @@ -11,355 +20,569 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.List; import java.util.Map; +import java.util.UUID; +import lombok.SneakyThrows; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; class JsonAvroConversionTest { - @Test - void primitiveRoot() { - assertThat(convertJsonToAvro("\"str\"", createSchema("\"string\""))) - .isEqualTo("str"); + // checking conversion from json to KafkaAvroSerializer-compatible avro objects + @Nested + class FromJsonToAvro { - assertThat(convertJsonToAvro("123", createSchema("\"int\""))) - .isEqualTo(123); + @Test + void primitiveRoot() { + assertThat(convertJsonToAvro("\"str\"", createSchema("\"string\""))) + .isEqualTo("str"); - assertThat(convertJsonToAvro("123", createSchema("\"long\""))) - .isEqualTo(123L); + assertThat(convertJsonToAvro("123", createSchema("\"int\""))) + .isEqualTo(123); - assertThat(convertJsonToAvro("123.123", createSchema("\"float\""))) - .isEqualTo(123.123F); + assertThat(convertJsonToAvro("123", createSchema("\"long\""))) + .isEqualTo(123L); - assertThat(convertJsonToAvro("12345.12345", createSchema("\"double\""))) - .isEqualTo(12345.12345); - } + assertThat(convertJsonToAvro("123.123", createSchema("\"float\""))) + .isEqualTo(123.123F); - @Test - void primitiveTypedFields() { - var schema = createSchema( - """ - { - "type": "record", - "name": "TestAvroRecord", - "fields": [ - { - "name": "f_int", - "type": "int" - }, - { - "name": "f_long", - "type": "long" - }, - { - "name": "f_string", - "type": "string" - }, - { - "name": "f_boolean", - "type": "boolean" - }, - { - "name": "f_float", - "type": "float" - }, - { - "name": "f_double", - "type": "double" - }, - { - "name": "f_enum", - "type" : { - "type": "enum", - "name": "Suit", - "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + assertThat(convertJsonToAvro("12345.12345", createSchema("\"double\""))) + .isEqualTo(12345.12345); + } + + @Test + void primitiveTypedFields() { + var schema = createSchema( + """ + { + "type": "record", + "name": "TestAvroRecord", + "fields": [ + { + "name": "f_int", + "type": "int" + }, + { + "name": "f_long", + "type": "long" + }, + { + "name": "f_string", + "type": "string" + }, + { + "name": "f_boolean", + "type": "boolean" + }, + { + "name": "f_float", + "type": "float" + }, + { + "name": "f_double", + "type": "double" + }, + { + "name": "f_enum", + "type" : { + "type": "enum", + "name": "Suit", + "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + } } - } - ] - }""" - ); + ] + }""" + ); - String jsonPayload = """ - { - "f_int": 123, - "f_long": 4294967294, - "f_string": "string here", - "f_boolean": true, - "f_float": 123.1, - "f_double": 123456.123456, - "f_enum": "SPADES" - } - """; + String jsonPayload = """ + { + "f_int": 123, + "f_long": 4294967294, + "f_string": "string here", + "f_boolean": true, + "f_float": 123.1, + "f_double": 123456.123456, + "f_enum": "SPADES" + } + """; - var converted = convertJsonToAvro(jsonPayload, schema); - assertThat(converted).isInstanceOf(GenericData.Record.class); + var converted = convertJsonToAvro(jsonPayload, schema); + assertThat(converted).isInstanceOf(GenericData.Record.class); - var record = (GenericData.Record) converted; - assertThat(record.get("f_int")).isEqualTo(123); - assertThat(record.get("f_long")).isEqualTo(4294967294L); - assertThat(record.get("f_string")).isEqualTo("string here"); - assertThat(record.get("f_boolean")).isEqualTo(true); - assertThat(record.get("f_float")).isEqualTo(123.1f); - assertThat(record.get("f_double")).isEqualTo(123456.123456); - assertThat(record.get("f_enum")) - .isEqualTo( - new GenericData.EnumSymbol( - schema.getField("f_enum").schema(), - "SPADES" - ) - ); - } + var record = (GenericData.Record) converted; + assertThat(record.get("f_int")).isEqualTo(123); + assertThat(record.get("f_long")).isEqualTo(4294967294L); + assertThat(record.get("f_string")).isEqualTo("string here"); + assertThat(record.get("f_boolean")).isEqualTo(true); + assertThat(record.get("f_float")).isEqualTo(123.1f); + assertThat(record.get("f_double")).isEqualTo(123456.123456); + assertThat(record.get("f_enum")) + .isEqualTo( + new GenericData.EnumSymbol( + schema.getField("f_enum").schema(), + "SPADES" + ) + ); + } - @Test - void unionRoot() { - var sc = createSchema("[ \"null\", \"string\", \"int\" ]"); + @Test + void unionRoot() { + var schema = createSchema("[ \"null\", \"string\", \"int\" ]"); - var converted = convertJsonToAvro("{\"string\":\"string here\"}", sc); - assertThat(converted).isEqualTo("string here"); + var converted = convertJsonToAvro("{\"string\":\"string here\"}", schema); + assertThat(converted).isEqualTo("string here"); - converted = convertJsonToAvro("{\"int\": 123}", sc); - assertThat(converted).isEqualTo(123); + converted = convertJsonToAvro("{\"int\": 123}", schema); + assertThat(converted).isEqualTo(123); - converted = convertJsonToAvro("null", sc); - assertThat(converted).isEqualTo(null); - } + converted = convertJsonToAvro("null", schema); + assertThat(converted).isEqualTo(null); + } - @Test - void unionField() { - var schema = createSchema( - """ - { - "type": "record", - "name": "TestAvroRecord", - "fields": [ - { - "name": "f_union", - "type": [ "null", "int", "TestAvroRecord"] - } - ] - }""" - ); - - String jsonPayload = "{ \"f_union\": null }"; - - var converted = convertJsonToAvro(jsonPayload, schema); - assertThat(converted).isInstanceOf(GenericData.Record.class); - - var record = (GenericData.Record) converted; - assertThat(record.get("f_union")).isNull(); - - - jsonPayload = "{ \"f_union\": { \"int\": 123 } }"; - record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema); - assertThat(record.get("f_union")).isEqualTo(123); - - jsonPayload = "{ \"f_union\": { \"TestAvroRecord\": { \"f_union\": { \"int\": 123 } } } }"; - record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema); - - assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class); - var innerRec = (GenericData.Record) record.get("f_union"); - assertThat(innerRec.get("f_union")).isEqualTo(123); - } - - @Test - void mapField() { - var schema = createSchema( - """ - { - "type": "record", - "name": "TestAvroRecord", - "fields": [ - { - "name": "long_map", - "type": { - "type": "map", - "values" : "long", - "default": {} + @Test + void unionField() { + var schema = createSchema( + """ + { + "type": "record", + "namespace": "com.test", + "name": "TestAvroRecord", + "fields": [ + { + "name": "f_union", + "type": [ "null", "int", "TestAvroRecord"] } - }, - { - "name": "string_map", - "type": { - "type": "map", - "values" : "string", - "default": {} - } - }, - { - "name": "self_ref_map", - "type": { - "type": "map", - "values" : "TestAvroRecord", - "default": {} - } - } - ] - }""" - ); + ] + }""" + ); - String jsonPayload = """ - { - "long_map": { - "k1": 123, - "k2": 456 - }, - "string_map": { - "k3": "s1", - "k4": "s2" - }, - "self_ref_map": { - "k5" : { - "long_map": { "_k1": 222 }, - "string_map": { "_k2": "_s1" } + String jsonPayload = "{ \"f_union\": null }"; + + var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema); + assertThat(record.get("f_union")).isNull(); + + jsonPayload = "{ \"f_union\": { \"int\": 123 } }"; + record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema); + assertThat(record.get("f_union")).isEqualTo(123); + + //inner-record's name should be fully-qualified! + jsonPayload = "{ \"f_union\": { \"com.test.TestAvroRecord\": { \"f_union\": { \"int\": 123 } } } }"; + record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema); + assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class); + var innerRec = (GenericData.Record) record.get("f_union"); + assertThat(innerRec.get("f_union")).isEqualTo(123); + } + + @Test + void mapField() { + var schema = createSchema( + """ + { + "type": "record", + "name": "TestAvroRecord", + "fields": [ + { + "name": "long_map", + "type": { + "type": "map", + "values" : "long", + "default": {} + } + }, + { + "name": "string_map", + "type": { + "type": "map", + "values" : "string", + "default": {} + } + }, + { + "name": "self_ref_map", + "type": { + "type": "map", + "values" : "TestAvroRecord", + "default": {} + } + } + ] + }""" + ); + + String jsonPayload = """ + { + "long_map": { + "k1": 123, + "k2": 456 + }, + "string_map": { + "k3": "s1", + "k4": "s2" + }, + "self_ref_map": { + "k5" : { + "long_map": { "_k1": 222 }, + "string_map": { "_k2": "_s1" } + } } } - } - """; + """; - var converted = convertJsonToAvro(jsonPayload, schema); - assertThat(converted).isInstanceOf(GenericData.Record.class); + var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema); + assertThat(record.get("long_map")) + .isEqualTo(Map.of("k1", 123L, "k2", 456L)); + assertThat(record.get("string_map")) + .isEqualTo(Map.of("k3", "s1", "k4", "s2")); + assertThat(record.get("self_ref_map")) + .isNotNull(); - var record = (GenericData.Record) converted; - assertThat(record.get("long_map")) - .isEqualTo(Map.of("k1", 123L, "k2", 456L)); - assertThat(record.get("string_map")) - .isEqualTo(Map.of("k3", "s1", "k4", "s2")); - assertThat(record.get("self_ref_map")) - .isNotNull(); + Map selfRefMapField = (Map) record.get("self_ref_map"); + assertThat(selfRefMapField) + .hasSize(1) + .hasEntrySatisfying("k5", v -> { + assertThat(v).isInstanceOf(GenericData.Record.class); + var innerRec = (GenericData.Record) v; + assertThat(innerRec.get("long_map")) + .isEqualTo(Map.of("_k1", 222L)); + assertThat(innerRec.get("string_map")) + .isEqualTo(Map.of("_k2", "_s1")); + }); + } - Map selfRefMapField = (Map) record.get("self_ref_map"); - assertThat(selfRefMapField) - .hasSize(1) - .hasEntrySatisfying("k5", v -> { - assertThat(v).isInstanceOf(GenericData.Record.class); - var innerRec = (GenericData.Record) v; - assertThat(innerRec.get("long_map")) - .isEqualTo(Map.of("_k1", 222L)); - assertThat(innerRec.get("string_map")) - .isEqualTo(Map.of("_k2", "_s1")); - }); + @Test + void arrayField() { + var schema = createSchema( + """ + { + "type": "record", + "name": "TestAvroRecord", + "fields": [ + { + "name": "f_array", + "type": { + "type": "array", + "items" : "string", + "default": [] + } + } + ] + }""" + ); + + String jsonPayload = """ + { + "f_array": [ "e1", "e2" ] + } + """; + + var record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema); + assertThat(record.get("f_array")).isEqualTo(List.of("e1", "e2")); + } + + @Test + void logicalTypesField() { + var schema = createSchema( + """ + { + "type": "record", + "name": "TestAvroRecord", + "fields": [ + { + "name": "lt_date", + "type": { "type": "int", "logicalType": "date" } + }, + { + "name": "lt_uuid", + "type": { "type": "string", "logicalType": "uuid" } + }, + { + "name": "lt_decimal", + "type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 } + }, + { + "name": "lt_time_millis", + "type": { "type": "int", "logicalType": "time-millis"} + }, + { + "name": "lt_time_micros", + "type": { "type": "long", "logicalType": "time-micros"} + }, + { + "name": "lt_timestamp_millis", + "type": { "type": "long", "logicalType": "timestamp-millis" } + }, + { + "name": "lt_timestamp_micros", + "type": { "type": "long", "logicalType": "timestamp-micros" } + }, + { + "name": "lt_local_timestamp_millis", + "type": { "type": "long", "logicalType": "local-timestamp-millis" } + }, + { + "name": "lt_local_timestamp_micros", + "type": { "type": "long", "logicalType": "local-timestamp-micros" } + } + ] + }""" + ); + + String jsonPayload = """ + { + "lt_date":"1991-08-14", + "lt_decimal": 2.1617413862327545E11, + "lt_time_millis": "10:15:30.001", + "lt_time_micros": "10:15:30.123456", + "lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908", + "lt_timestamp_millis": "2007-12-03T10:15:30.123Z", + "lt_timestamp_micros": "2007-12-13T10:15:30.123456Z", + "lt_local_timestamp_millis": "2017-12-03T10:15:30.123", + "lt_local_timestamp_micros": "2017-12-13T10:15:30.123456" + } + """; + + var converted = convertJsonToAvro(jsonPayload, schema); + assertThat(converted).isInstanceOf(GenericData.Record.class); + + var record = (GenericData.Record) converted; + + assertThat(record.get("lt_date")) + .isEqualTo(LocalDate.of(1991, 8, 14)); + assertThat(record.get("lt_decimal")) + .isEqualTo(new BigDecimal("2.1617413862327545E11")); + assertThat(record.get("lt_time_millis")) + .isEqualTo(LocalTime.parse("10:15:30.001")); + assertThat(record.get("lt_time_micros")) + .isEqualTo(LocalTime.parse("10:15:30.123456")); + assertThat(record.get("lt_timestamp_millis")) + .isEqualTo(Instant.parse("2007-12-03T10:15:30.123Z")); + assertThat(record.get("lt_timestamp_micros")) + .isEqualTo(Instant.parse("2007-12-13T10:15:30.123456Z")); + assertThat(record.get("lt_local_timestamp_millis")) + .isEqualTo(LocalDateTime.parse("2017-12-03T10:15:30.123")); + assertThat(record.get("lt_local_timestamp_micros")) + .isEqualTo(LocalDateTime.parse("2017-12-13T10:15:30.123456")); + } } - @Test - void arrayField() { - var schema = createSchema( - """ - { - "type": "record", - "name": "TestAvroRecord", - "fields": [ - { - "name": "f_array", - "type": { - "type": "array", - "items" : "string", - "default": [] - } - } - ] - }""" - ); + // checking conversion of KafkaAvroDeserializer output to JsonNode + @Nested + class FromAvroToJson { - String jsonPayload = """ - { - "f_array": [ "e1", "e2" ] - } - """; + @Test + void primitiveRoot() { + assertThat(convertAvroToJson("str", createSchema("\"string\""))) + .isEqualTo(new TextNode("str")); - var converted = convertJsonToAvro(jsonPayload, schema); - assertThat(converted).isInstanceOf(GenericData.Record.class); + assertThat(convertAvroToJson(123, createSchema("\"int\""))) + .isEqualTo(new IntNode(123)); - var record = (GenericData.Record) converted; - assertThat(record.get("f_array")).isEqualTo(List.of("e1", "e2")); - } + assertThat(convertAvroToJson(123L, createSchema("\"long\""))) + .isEqualTo(new LongNode(123)); - @Test - void logicalTypesField() { - var schema = createSchema( - """ - { - "type": "record", - "name": "TestAvroRecord", - "fields": [ - { - "name": "lt_date", - "type": { "type": "int", "logicalType": "date" } - }, - { - "name": "lt_uuid", - "type": { "type": "string", "logicalType": "uuid" } - }, - { - "name": "lt_decimal", - "type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 } - }, - { - "name": "lt_time_millis", - "type": { "type": "int", "logicalType": "time-millis"} - }, - { - "name": "lt_time_micros", - "type": { "type": "long", "logicalType": "time-micros"} - }, - { - "name": "lt_timestamp_millis", - "type": { "type": "long", "logicalType": "timestamp-millis" } - }, - { - "name": "lt_timestamp_micros", - "type": { "type": "long", "logicalType": "timestamp-micros" } - }, - { - "name": "lt_local_timestamp_millis", - "type": { "type": "long", "logicalType": "local-timestamp-millis" } - }, - { - "name": "lt_local_timestamp_micros", - "type": { "type": "long", "logicalType": "local-timestamp-micros" } - } - ] - }""" - ); + assertThat(convertAvroToJson(123.1F, createSchema("\"float\""))) + .isEqualTo(new FloatNode(123.1F)); - String jsonPayload = """ - { - "lt_date":"1991-08-14", - "lt_decimal": 2.1617413862327545E11, - "lt_time_millis": "10:15:30.001", - "lt_time_micros": "10:15:30.123456", - "lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908", - "lt_timestamp_millis": "2007-12-03T10:15:30.123Z", - "lt_timestamp_micros": "2007-12-13T10:15:30.123456Z", - "lt_local_timestamp_millis": "2017-12-03T10:15:30.123", - "lt_local_timestamp_micros": "2017-12-13T10:15:30.123456" - } - """; + assertThat(convertAvroToJson(123.1, createSchema("\"double\""))) + .isEqualTo(new DoubleNode(123.1)); - var converted = convertJsonToAvro(jsonPayload, schema); - assertThat(converted).isInstanceOf(GenericData.Record.class); + assertThat(convertAvroToJson(true, createSchema("\"boolean\""))) + .isEqualTo(BooleanNode.valueOf(true)); + } - var record = (GenericData.Record) converted; + @SneakyThrows + @Test + void primitiveTypedFields() { + var schema = createSchema( + """ + { + "type": "record", + "name": "TestAvroRecord", + "fields": [ + { + "name": "f_int", + "type": "int" + }, + { + "name": "f_long", + "type": "long" + }, + { + "name": "f_string", + "type": "string" + }, + { + "name": "f_boolean", + "type": "boolean" + }, + { + "name": "f_float", + "type": "float" + }, + { + "name": "f_double", + "type": "double" + }, + { + "name": "f_enum", + "type" : { + "type": "enum", + "name": "Suit", + "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + } + } + ] + }""" + ); + + GenericData.Record inputRecord = new GenericData.Record(schema); + inputRecord.put("f_int", 123); + inputRecord.put("f_long", 4294967294L); + inputRecord.put("f_string", "string here"); + inputRecord.put("f_boolean", true); + inputRecord.put("f_float", 123.1f); + inputRecord.put("f_double", 123456.123456); + inputRecord.put("f_enum", new GenericData.EnumSymbol(schema.getField("f_enum").schema(), "SPADES")); + + String expectedJson = """ + { + "f_int": 123, + "f_long": 4294967294, + "f_string": "string here", + "f_boolean": true, + "f_float": 123.1, + "f_double": 123456.123456, + "f_enum": "SPADES" + } + """; + + assertJsonsEqual(expectedJson, convertAvroToJson(inputRecord, schema)); + } + + @Test + void logicalTypesField() { + var schema = createSchema( + """ + { + "type": "record", + "name": "TestAvroRecord", + "fields": [ + { + "name": "lt_date", + "type": { "type": "int", "logicalType": "date" } + }, + { + "name": "lt_uuid", + "type": { "type": "string", "logicalType": "uuid" } + }, + { + "name": "lt_decimal", + "type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 } + }, + { + "name": "lt_time_millis", + "type": { "type": "int", "logicalType": "time-millis"} + }, + { + "name": "lt_time_micros", + "type": { "type": "long", "logicalType": "time-micros"} + }, + { + "name": "lt_timestamp_millis", + "type": { "type": "long", "logicalType": "timestamp-millis" } + }, + { + "name": "lt_timestamp_micros", + "type": { "type": "long", "logicalType": "timestamp-micros" } + }, + { + "name": "lt_local_timestamp_millis", + "type": { "type": "long", "logicalType": "local-timestamp-millis" } + }, + { + "name": "lt_local_timestamp_micros", + "type": { "type": "long", "logicalType": "local-timestamp-micros" } + } + ] + }""" + ); + + GenericData.Record inputRecord = new GenericData.Record(schema); + inputRecord.put("lt_date", LocalDate.of(1991, 8, 14)); + inputRecord.put("lt_decimal", new BigDecimal("2.1617413862327545E11")); + inputRecord.put("lt_time_millis", LocalTime.parse("10:15:30.001")); + inputRecord.put("lt_time_micros", LocalTime.parse("10:15:30.123456")); + inputRecord.put("lt_uuid", UUID.fromString("a37b75ca-097c-5d46-6119-f0637922e908")); + inputRecord.put("lt_timestamp_millis", Instant.parse("2007-12-03T10:15:30.123Z")); + inputRecord.put("lt_timestamp_micros", Instant.parse("2007-12-13T10:15:30.123456Z")); + inputRecord.put("lt_local_timestamp_millis", LocalDateTime.parse("2017-12-03T10:15:30.123")); + inputRecord.put("lt_local_timestamp_micros", LocalDateTime.parse("2017-12-13T10:15:30.123456")); + + String expectedJson = """ + { + "lt_date":"1991-08-14", + "lt_decimal": 2.1617413862327545E11, + "lt_time_millis": "10:15:30.001", + "lt_time_micros": "10:15:30.123456", + "lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908", + "lt_timestamp_millis": "2007-12-03T10:15:30.123Z", + "lt_timestamp_micros": "2007-12-13T10:15:30.123456Z", + "lt_local_timestamp_millis": "2017-12-03T10:15:30.123", + "lt_local_timestamp_micros": "2017-12-13T10:15:30.123456" + } + """; + + assertJsonsEqual(expectedJson, convertAvroToJson(inputRecord, schema)); + } + + @Test + void unionField() { + var schema = createSchema( + """ + { + "type": "record", + "namespace": "com.test", + "name": "TestAvroRecord", + "fields": [ + { + "name": "f_union", + "type": [ "null", "int", "TestAvroRecord"] + } + ] + }""" + ); + + var r = new GenericData.Record(schema); + r.put("f_union", null); + assertJsonsEqual(" {}", convertAvroToJson(r, schema)); + + r = new GenericData.Record(schema); + r.put("f_union", 123); + assertJsonsEqual(" { \"f_union\" : { \"int\" : 123 } }", convertAvroToJson(r, schema)); + + + r = new GenericData.Record(schema); + var innerRec = new GenericData.Record(schema); + innerRec.put("f_union", 123); + r.put("f_union", innerRec); + assertJsonsEqual( + " { \"f_union\" : { \"com.test.TestAvroRecord\" : { \"f_union\" : { \"int\" : 123 } } } }", + convertAvroToJson(r, schema) + ); + } - assertThat(record.get("lt_date")) - .isEqualTo(LocalDate.of(1991, 8, 14)); - assertThat(record.get("lt_decimal")) - .isEqualTo(new BigDecimal("2.1617413862327545E11")); - assertThat(record.get("lt_time_millis")) - .isEqualTo(LocalTime.parse("10:15:30.001")); - assertThat(record.get("lt_time_micros")) - .isEqualTo(LocalTime.parse("10:15:30.123456")); - assertThat(record.get("lt_timestamp_millis")) - .isEqualTo(Instant.parse("2007-12-03T10:15:30.123Z")); - assertThat(record.get("lt_timestamp_micros")) - .isEqualTo(Instant.parse("2007-12-13T10:15:30.123456Z")); - assertThat(record.get("lt_local_timestamp_millis")) - .isEqualTo(LocalDateTime.parse("2017-12-03T10:15:30.123")); - assertThat(record.get("lt_local_timestamp_micros")) - .isEqualTo(LocalDateTime.parse("2017-12-13T10:15:30.123456")); } private Schema createSchema(String schema) { return new AvroSchema(schema).rawSchema(); } + @SneakyThrows + private void assertJsonsEqual(String expected, JsonNode actual) { + var mapper = new JsonMapper(); + assertThat(actual.toPrettyString()) + .isEqualTo(mapper.readTree(expected).toPrettyString()); + } + }