From 4c231980aac8e54f0f54b59c1a6e19b5504ecfe0 Mon Sep 17 00:00:00 2001 From: German Osin Date: Wed, 15 Sep 2021 11:21:05 +0300 Subject: [PATCH] ISSUE-876 Fixed avro json schema generation & deserializing (#877) * ISSUE-876 Fixed avro json schema generation & deserializing * Fixed checkstyle * Fixed proto test --- kafka-ui-api/pom.xml | 6 + .../jsonschema/AvroJsonSchemaConverter.java | 41 ++++--- .../ui/util/jsonschema/EnumJsonType.java | 5 +- .../AvroJsonSchemaConverterTest.java | 112 ++++++++++++++++-- .../ProtobufSchemaConverterTest.java | 15 ++- 5 files changed, 144 insertions(+), 35 deletions(-) diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 001c22143f..0209bc2745 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -170,6 +170,12 @@ ${assertj.version} test + + com.github.java-json-tools + json-schema-validator + 2.2.14 + test + diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java index d354340f08..ac12109d5a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java @@ -1,8 +1,10 @@ package com.provectus.kafka.ui.util.jsonschema; import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -41,13 +43,7 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter { private FieldSchema convertSchema(String name, Schema schema, Map definitions, boolean ref) { - if (!schema.isUnion() || (schema.getTypes().size() == 2 && schema.isNullable())) { - if (schema.isUnion()) { - final Optional firstType = - schema.getTypes().stream().filter(t -> !t.getType().equals(Schema.Type.NULL)) - .findFirst(); - schema = firstType.orElseThrow(); - } + if (!schema.isUnion()) { JsonType type = convertType(schema); switch (type.getType()) { case BOOLEAN: @@ -68,20 +64,29 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter { default: throw new RuntimeException("Unknown type"); } } else { - return new OneOfFieldSchema( - schema.getTypes().stream() - .map(typeSchema -> - convertSchema( - name + UUID.randomUUID().toString(), - typeSchema, - definitions, - true - ) - ).collect(Collectors.toList()) - ); + return createUnionSchema(schema, definitions); } } + private FieldSchema createUnionSchema(Schema schema, Map definitions) { + final Map fields = schema.getTypes().stream() + .filter(t -> !t.getType().equals(Schema.Type.NULL)) + .map(f -> Tuples.of( + f.getType().getName().toLowerCase(Locale.ROOT), + convertSchema( + f.getType().getName().toLowerCase(Locale.ROOT), + f, definitions, true + ) + )).collect(Collectors.toMap( + Tuple2::getT1, + Tuple2::getT2 + )); + + return new ObjectFieldSchema( + fields, Collections.emptyList() + ); + } + private FieldSchema createObjectSchema(String name, Schema schema, Map definitions, boolean ref) { final Map fields = schema.getFields().stream() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java index 13ac8c8b52..715f7d5f44 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.util.jsonschema; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.TextNode; import java.util.List; import java.util.Map; @@ -17,7 +18,9 @@ public class EnumJsonType extends JsonType { @Override public Map toJsonNode(ObjectMapper mapper) { return Map.of( - this.type.getName(), + "type", + new TextNode(Type.STRING.getName()), + Type.ENUM.getName(), mapper.valueToTree(values) ); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java index 036b1c48ed..dbe37f5695 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java @@ -1,15 +1,23 @@ package com.provectus.kafka.ui.util.jsonschema; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.fge.jsonschema.core.exceptions.ProcessingException; +import com.github.fge.jsonschema.core.report.ProcessingReport; +import com.github.fge.jsonschema.main.JsonSchemaFactory; +import com.provectus.kafka.ui.serde.schemaregistry.AvroMessageFormatter; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class AvroJsonSchemaConverterTest { @Test - public void avroConvertTest() throws URISyntaxException { + public void avroConvertTest() throws URISyntaxException, JsonProcessingException { final AvroJsonSchemaConverter converter = new AvroJsonSchemaConverter(); URI basePath = new URI("http://example.com/"); @@ -72,20 +80,98 @@ public class AvroJsonSchemaConverterTest { + " }" ); - String expected = - "{\"$id\":\"http://example.com/Message\"," - + "\"$schema\":\"https://json-schema.org/draft/2020-12/schema\"," - + "\"type\":\"object\",\"properties\":{\"record\":{\"$ref\":" - + "\"#/definitions/RecordInnerMessage\"}},\"required\":[\"record\"]," - + "\"definitions\":{\"RecordInnerMessage\":{\"type\":\"object\",\"properties\":" - + "{\"long_text\":{\"type\":\"string\"},\"array\":{\"type\":\"array\",\"items\":" - + "{\"type\":\"string\"}},\"id\":{\"type\":\"integer\"},\"text\":{\"type\":\"string\"}," - + "\"map\":{\"type\":\"object\",\"additionalProperties\":{\"type\":\"integer\"}}," - + "\"order\":{\"enum\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]}}," - + "\"required\":[\"id\",\"text\",\"order\",\"array\",\"map\"]}}}"; + + String expected = "{\"$id\":\"http://example.com/Message\"," + + "\"$schema\":\"https://json-schema.org/draft/2020-12/schema\"," + + "\"type\":\"object\",\"properties\":{\"record\":" + + "{\"$ref\":\"#/definitions/RecordInnerMessage\"}}," + + "\"required\":[\"record\"],\"definitions\":" + + "{\"RecordInnerMessage\":{\"type\":\"object\",\"" + + "properties\":{\"long_text\":{\"type\":\"object\"," + + "\"properties\":{\"string\":{\"type\":\"string\"}}}," + + "\"array\":{\"type\":\"array\",\"items\":{\"type\":\"string\"}}," + + "\"id\":{\"type\":\"integer\"},\"text\":{\"type\":\"string\"}," + + "\"map\":{\"type\":\"object\",\"additionalProperties\":" + + "{\"type\":\"integer\"}},\"order\":{\"type\":\"string\"," + + "\"enum\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]}}," + + "\"required\":[\"id\",\"text\",\"order\",\"array\",\"map\"]}}}"; final JsonSchema convertRecord = converter.convert(basePath, recordSchema); - Assertions.assertEquals(expected, convertRecord.toJson(new ObjectMapper())); + + ObjectMapper om = new ObjectMapper(); + Assertions.assertEquals( + om.readTree(expected), + om.readTree( + convertRecord.toJson(om) + ) + ); } + + @Test + public void testNullableUnions() throws URISyntaxException, IOException, ProcessingException { + final AvroJsonSchemaConverter converter = new AvroJsonSchemaConverter(); + URI basePath = new URI("http://example.com/"); + final ObjectMapper objectMapper = new ObjectMapper(); + + Schema recordSchema = (new Schema.Parser()).parse( + " {" + + " \"type\": \"record\"," + + " \"name\": \"Message\"," + + " \"namespace\": \"com.provectus.kafka\"," + + " \"fields\": [" + + " {" + + " \"name\": \"text\"," + + " \"type\": [" + + " \"null\"," + + " \"string\"" + + " ]," + + " \"default\": null" + + " }," + + " {" + + " \"name\": \"value\"," + + " \"type\": [" + + " \"null\"," + + " \"string\"," + + " \"long\"" + + " ]," + + " \"default\": null" + + " }" + + " ]" + + " }" + ); + + final GenericData.Record record = new GenericData.Record(recordSchema); + record.put("text", "Hello world"); + record.put("value", 100L); + byte[] jsonBytes = AvroSchemaUtils.toJson(record); + String serialized = new String(jsonBytes); + + String expected = + "{\"$id\":\"http://example.com/Message\"," + + "\"$schema\":\"https://json-schema.org/draft/2020-12/schema\"," + + "\"type\":\"object\",\"properties\":{\"text\":" + + "{\"type\":\"object\",\"properties\":{\"string\":" + + "{\"type\":\"string\"}}},\"value\":{\"type\":\"object\"," + + "\"properties\":{\"string\":{\"type\":\"string\"}," + + "\"long\":{\"type\":\"integer\"}}}}}"; + + final JsonSchema convert = converter.convert(basePath, recordSchema); + Assertions.assertEquals( + objectMapper.readTree(expected), + objectMapper.readTree( + convert.toJson(objectMapper) + ) + ); + + + final ProcessingReport validate = + JsonSchemaFactory.byDefault().getJsonSchema( + objectMapper.readTree(expected) + ).validate( + objectMapper.readTree(serialized) + ); + + Assertions.assertTrue(validate.isSuccess()); + } } \ No newline at end of file diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java index fa4235d42c..cc90546077 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.util.jsonschema; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import java.net.URI; @@ -11,7 +12,7 @@ import org.junit.jupiter.api.Test; public class ProtobufSchemaConverterTest { @Test - public void testSimpleProto() throws URISyntaxException { + public void testSimpleProto() throws URISyntaxException, JsonProcessingException { String proto = "syntax = \"proto3\";\n" + "package com.acme;\n" @@ -49,7 +50,8 @@ public class ProtobufSchemaConverterTest { + "{\"type\":\"object\",\"properties\":" + "{\"optionalField\":{\"oneOf\":[{\"type\":\"string\"}," + "{\"type\":\"integer\"}]},\"other_id\":" - + "{\"type\":\"integer\"},\"order\":{\"enum\":[\"FIRST\",\"SECOND\"]}}}}}"; + + "{\"type\":\"integer\"},\"order\":{\"enum\":[\"FIRST\",\"SECOND\"]," + + "\"type\":\"string\"}}}}}"; ProtobufSchema protobufSchema = new ProtobufSchema(proto); @@ -58,6 +60,13 @@ public class ProtobufSchemaConverterTest { final JsonSchema convert = converter.convert(basePath, protobufSchema.toDescriptor("MyRecord")); - Assertions.assertEquals(expected, convert.toJson(new ObjectMapper())); + + ObjectMapper om = new ObjectMapper(); + Assertions.assertEquals( + om.readTree(expected), + om.readTree( + convert.toJson(om) + ) + ); } } \ No newline at end of file