Преглед изворни кода

ISSUE-876 Fixed avro json schema generation & deserializing (#877)

* ISSUE-876 Fixed avro json schema generation & deserializing

* Fixed checkstyle

* Fixed proto test
German Osin пре 3 година
родитељ
комит
4c231980aa

+ 6 - 0
kafka-ui-api/pom.xml

@@ -170,6 +170,12 @@
             <version>${assertj.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.java-json-tools</groupId>
+            <artifactId>json-schema-validator</artifactId>
+            <version>2.2.14</version>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 

+ 23 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java

@@ -1,8 +1,10 @@
 package com.provectus.kafka.ui.util.jsonschema;
 
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
@@ -41,13 +43,7 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter<Schema> {
 
   private FieldSchema convertSchema(String name, Schema schema,
                                     Map<String, FieldSchema> definitions, boolean ref) {
-    if (!schema.isUnion() || (schema.getTypes().size() == 2 && schema.isNullable())) {
-      if (schema.isUnion()) {
-        final Optional<Schema> firstType =
-            schema.getTypes().stream().filter(t -> !t.getType().equals(Schema.Type.NULL))
-                .findFirst();
-        schema = firstType.orElseThrow();
-      }
+    if (!schema.isUnion()) {
       JsonType type = convertType(schema);
       switch (type.getType()) {
         case BOOLEAN:
@@ -68,20 +64,29 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter<Schema> {
         default: throw new RuntimeException("Unknown type");
       }
     } else {
-      return new OneOfFieldSchema(
-          schema.getTypes().stream()
-              .map(typeSchema ->
-                  convertSchema(
-                      name + UUID.randomUUID().toString(),
-                      typeSchema,
-                      definitions,
-                      true
-                  )
-              ).collect(Collectors.toList())
-      );
+      return createUnionSchema(schema, definitions);
     }
   }
 
+  private FieldSchema createUnionSchema(Schema schema, Map<String, FieldSchema> definitions) {
+    final Map<String, FieldSchema> fields = schema.getTypes().stream()
+        .filter(t -> !t.getType().equals(Schema.Type.NULL))
+        .map(f -> Tuples.of(
+            f.getType().getName().toLowerCase(Locale.ROOT),
+            convertSchema(
+                f.getType().getName().toLowerCase(Locale.ROOT),
+                f, definitions, true
+            )
+        )).collect(Collectors.toMap(
+            Tuple2::getT1,
+            Tuple2::getT2
+        ));
+
+    return new ObjectFieldSchema(
+        fields, Collections.emptyList()
+    );
+  }
+
   private FieldSchema createObjectSchema(String name, Schema schema,
                                          Map<String, FieldSchema> definitions, boolean ref) {
     final Map<String, FieldSchema> fields = schema.getFields().stream()

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.util.jsonschema;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.TextNode;
 import java.util.List;
 import java.util.Map;
 
@@ -17,7 +18,9 @@ public class EnumJsonType extends JsonType {
   @Override
   public Map<String, JsonNode> toJsonNode(ObjectMapper mapper) {
     return Map.of(
-        this.type.getName(),
+        "type",
+        new TextNode(Type.STRING.getName()),
+        Type.ENUM.getName(),
         mapper.valueToTree(values)
     );
   }

+ 99 - 13
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java

@@ -1,15 +1,23 @@
 package com.provectus.kafka.ui.util.jsonschema;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.fge.jsonschema.core.exceptions.ProcessingException;
+import com.github.fge.jsonschema.core.report.ProcessingReport;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
+import com.provectus.kafka.ui.serde.schemaregistry.AvroMessageFormatter;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class AvroJsonSchemaConverterTest {
   @Test
-  public void avroConvertTest() throws URISyntaxException {
+  public void avroConvertTest() throws URISyntaxException, JsonProcessingException {
     final AvroJsonSchemaConverter converter = new AvroJsonSchemaConverter();
     URI basePath = new URI("http://example.com/");
 
@@ -72,20 +80,98 @@ public class AvroJsonSchemaConverterTest {
             + " }"
     );
 
-    String expected =
-            "{\"$id\":\"http://example.com/Message\","
-            + "\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
-            + "\"type\":\"object\",\"properties\":{\"record\":{\"$ref\":"
-            + "\"#/definitions/RecordInnerMessage\"}},\"required\":[\"record\"],"
-            + "\"definitions\":{\"RecordInnerMessage\":{\"type\":\"object\",\"properties\":"
-            + "{\"long_text\":{\"type\":\"string\"},\"array\":{\"type\":\"array\",\"items\":"
-            + "{\"type\":\"string\"}},\"id\":{\"type\":\"integer\"},\"text\":{\"type\":\"string\"},"
-            + "\"map\":{\"type\":\"object\",\"additionalProperties\":{\"type\":\"integer\"}},"
-            + "\"order\":{\"enum\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]}},"
-            + "\"required\":[\"id\",\"text\",\"order\",\"array\",\"map\"]}}}";
+
+    String expected = "{\"$id\":\"http://example.com/Message\","
+        + "\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
+        + "\"type\":\"object\",\"properties\":{\"record\":"
+        + "{\"$ref\":\"#/definitions/RecordInnerMessage\"}},"
+        + "\"required\":[\"record\"],\"definitions\":"
+        + "{\"RecordInnerMessage\":{\"type\":\"object\",\""
+        + "properties\":{\"long_text\":{\"type\":\"object\","
+        + "\"properties\":{\"string\":{\"type\":\"string\"}}},"
+        + "\"array\":{\"type\":\"array\",\"items\":{\"type\":\"string\"}},"
+        + "\"id\":{\"type\":\"integer\"},\"text\":{\"type\":\"string\"},"
+        + "\"map\":{\"type\":\"object\",\"additionalProperties\":"
+        + "{\"type\":\"integer\"}},\"order\":{\"type\":\"string\","
+        + "\"enum\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]}},"
+        + "\"required\":[\"id\",\"text\",\"order\",\"array\",\"map\"]}}}";
 
     final JsonSchema convertRecord = converter.convert(basePath, recordSchema);
-    Assertions.assertEquals(expected, convertRecord.toJson(new ObjectMapper()));
 
+    ObjectMapper om = new ObjectMapper();
+    Assertions.assertEquals(
+        om.readTree(expected),
+        om.readTree(
+            convertRecord.toJson(om)
+        )
+    );
+
+  }
+
+  @Test
+  public void testNullableUnions() throws URISyntaxException, IOException, ProcessingException {
+    final AvroJsonSchemaConverter converter = new AvroJsonSchemaConverter();
+    URI basePath = new URI("http://example.com/");
+    final ObjectMapper objectMapper = new ObjectMapper();
+
+    Schema recordSchema = (new Schema.Parser()).parse(
+        " {"
+            + "     \"type\": \"record\","
+            + "     \"name\": \"Message\","
+            + "     \"namespace\": \"com.provectus.kafka\","
+            + "     \"fields\": ["
+            + "                     {"
+            + "                         \"name\": \"text\","
+            + "                         \"type\": ["
+            + "                             \"null\","
+            + "                             \"string\""
+            + "                         ],"
+            + "                         \"default\": null"
+            + "                     },"
+            + "                     {"
+            + "                         \"name\": \"value\","
+            + "                         \"type\": ["
+            + "                             \"null\","
+            + "                             \"string\","
+            + "                             \"long\""
+            + "                         ],"
+            + "                         \"default\": null"
+            + "                     }"
+            + "     ]"
+            + " }"
+    );
+
+    final GenericData.Record record = new GenericData.Record(recordSchema);
+    record.put("text", "Hello world");
+    record.put("value", 100L);
+    byte[] jsonBytes = AvroSchemaUtils.toJson(record);
+    String serialized = new String(jsonBytes);
+
+    String expected =
+        "{\"$id\":\"http://example.com/Message\","
+        + "\"$schema\":\"https://json-schema.org/draft/2020-12/schema\","
+        + "\"type\":\"object\",\"properties\":{\"text\":"
+        + "{\"type\":\"object\",\"properties\":{\"string\":"
+        + "{\"type\":\"string\"}}},\"value\":{\"type\":\"object\","
+        + "\"properties\":{\"string\":{\"type\":\"string\"},"
+        + "\"long\":{\"type\":\"integer\"}}}}}";
+
+    final JsonSchema convert = converter.convert(basePath, recordSchema);
+    Assertions.assertEquals(
+        objectMapper.readTree(expected),
+        objectMapper.readTree(
+            convert.toJson(objectMapper)
+        )
+    );
+
+
+    final ProcessingReport validate =
+        JsonSchemaFactory.byDefault().getJsonSchema(
+            objectMapper.readTree(expected)
+        ).validate(
+            objectMapper.readTree(serialized)
+        );
+
+    Assertions.assertTrue(validate.isSuccess());
   }
 }

+ 12 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.util.jsonschema;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.net.URI;
@@ -11,7 +12,7 @@ import org.junit.jupiter.api.Test;
 public class ProtobufSchemaConverterTest {
 
   @Test
-  public void testSimpleProto() throws URISyntaxException {
+  public void testSimpleProto() throws URISyntaxException, JsonProcessingException {
 
     String proto = "syntax = \"proto3\";\n"
         + "package com.acme;\n"
@@ -49,7 +50,8 @@ public class ProtobufSchemaConverterTest {
         + "{\"type\":\"object\",\"properties\":"
         + "{\"optionalField\":{\"oneOf\":[{\"type\":\"string\"},"
         + "{\"type\":\"integer\"}]},\"other_id\":"
-        + "{\"type\":\"integer\"},\"order\":{\"enum\":[\"FIRST\",\"SECOND\"]}}}}}";
+        + "{\"type\":\"integer\"},\"order\":{\"enum\":[\"FIRST\",\"SECOND\"],"
+        + "\"type\":\"string\"}}}}}";
 
     ProtobufSchema protobufSchema = new ProtobufSchema(proto);
 
@@ -58,6 +60,13 @@ public class ProtobufSchemaConverterTest {
 
     final JsonSchema convert =
         converter.convert(basePath, protobufSchema.toDescriptor("MyRecord"));
-    Assertions.assertEquals(expected, convert.toJson(new ObjectMapper()));
+
+    ObjectMapper om = new ObjectMapper();
+    Assertions.assertEquals(
+        om.readTree(expected),
+        om.readTree(
+            convert.toJson(om)
+        )
+    );
   }
 }