Browse Source

avro -> json conversion implemented

iliax 2 years ago
parent
commit
6367f44a90

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java

@@ -37,7 +37,7 @@ class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer<Object> {
   @Override
   protected Object serialize(String value, ParsedSchema schema) {
     try {
-      return JsonAvroConversion.convert(value, ((AvroSchema) schema).rawSchema());
+      return JsonAvroConversion.convertJsonToAvro(value, ((AvroSchema) schema).rawSchema());
     } catch (Throwable e) {
       throw new RuntimeException("Failed to serialize record for topic " + topic, e);
     }

+ 4 - 19
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/MessageFormatter.java

@@ -3,17 +3,16 @@ package com.provectus.kafka.ui.serdes.builtin.sr;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.protobuf.Message;
 import com.google.protobuf.util.JsonFormat;
+import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
 import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
 import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
-import java.util.IdentityHashMap;
 import java.util.Map;
-import java.util.UUID;
 import lombok.SneakyThrows;
-import org.apache.avro.generic.GenericData;
 
 interface MessageFormatter {
 
@@ -46,22 +45,8 @@ interface MessageFormatter {
     @Override
     public String format(String topic, byte[] value) {
       Object deserialized = avroDeserializer.deserialize(topic, value);
-      return GenericDataWithFixedUuidJsonConversion.INSTANCE.toString(deserialized);
-    }
-
-    //need to be explicitly overwritten before AVRO-3676 fix released
-    static class GenericDataWithFixedUuidJsonConversion extends GenericData {
-
-      static final GenericData INSTANCE = new GenericDataWithFixedUuidJsonConversion();
-
-      @Override
-      protected void toString(Object datum, StringBuilder buffer, IdentityHashMap<Object, Object> seenObjects) {
-        if (datum instanceof UUID uuid) {
-          super.toString(uuid.toString(), buffer, seenObjects);
-        } else {
-          super.toString(datum, buffer, seenObjects);
-        }
-      }
+      var schema = AvroSchemaUtils.getSchema(deserialized);
+      return JsonAvroConversion.convertAvroToJson(deserialized, schema).toString();
     }
   }
 

+ 153 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java

@@ -3,10 +3,20 @@ package com.provectus.kafka.ui.util.jsonschema;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.FloatNode;
+import com.fasterxml.jackson.databind.node.IntNode;
 import com.fasterxml.jackson.databind.node.JsonNodeType;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 import com.google.common.collect.Lists;
 import com.provectus.kafka.ui.exception.ValidationException;
+import io.confluent.kafka.serializers.AvroData;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
@@ -18,6 +28,7 @@ import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -26,6 +37,7 @@ import java.util.stream.Stream;
 import lombok.SneakyThrows;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
 
 // converts json into Object that is expected input for KafkaAvroSerializer
 // (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!)
@@ -34,7 +46,7 @@ public class JsonAvroConversion {
   private static final JsonMapper MAPPER = new JsonMapper();
 
   @SneakyThrows
-  public static Object convert(String jsonString, Schema avroSchema) {
+  public static Object convertJsonToAvro(String jsonString, Schema avroSchema) {
     JsonNode rootNode = MAPPER.readTree(jsonString);
     return convert(rootNode, avroSchema);
   }
@@ -160,13 +172,115 @@ public class JsonAvroConversion {
     };
   }
 
+  public static JsonNode convertAvroToJson(Object obj, Schema avroSchema) {
+    if (obj == null) {
+      return NullNode.getInstance();
+    }
+    return switch (avroSchema.getType()) {
+      case RECORD -> {
+        var rec = (GenericData.Record) obj;
+        ObjectNode node = MAPPER.createObjectNode();
+        for (Schema.Field field : avroSchema.getFields()) {
+          var fieldVal = rec.get(field.name());
+          if (fieldVal != null) {
+            node.set(field.name(), convertAvroToJson(fieldVal, field.schema()));
+          }
+        }
+        yield node;
+      }
+      case MAP -> {
+        var map = (Map<Utf8, Object>) obj;
+        ObjectNode node = MAPPER.createObjectNode();
+        map.forEach((k, v) -> {
+          node.set(k.toString(), convertAvroToJson(v, avroSchema.getValueType()));
+        });
+        yield node;
+      }
+      case ARRAY -> {
+        var list = (List<Object>) obj;
+        ArrayNode node = MAPPER.createArrayNode();
+        list.forEach(e -> node.add(convertAvroToJson(e, avroSchema.getElementType())));
+        yield node;
+      }
+      case ENUM -> {
+        yield new TextNode(obj.toString());
+      }
+      case UNION -> {
+        //TODO: cover with tests
+        // non-null case
+        ObjectNode node = MAPPER.createObjectNode();
+        int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj);
+        Schema unionType = avroSchema.getTypes().get(unionIdx);
+        node.set(unionType.getFullName(), convertAvroToJson(obj, unionType));
+        yield node;
+      }
+      case STRING -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        yield new TextNode(obj.toString());
+      }
+      case LONG -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        yield new LongNode((Long) obj);
+      }
+      case INT -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        yield new IntNode((Integer) obj);
+      }
+      case FLOAT -> {
+        yield new FloatNode((Float) obj);
+      }
+      case DOUBLE -> {
+        yield new DoubleNode((Double) obj);
+      }
+      case BOOLEAN -> {
+        yield BooleanNode.valueOf((Boolean) obj);
+      }
+      case NULL -> {
+        yield NullNode.getInstance();
+      }
+      case BYTES -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        //TODO: check with tests
+        byte[] bytes = (byte[]) obj;
+        yield new TextNode(new String(bytes)); //TODO: encoding
+      }
+      case FIXED -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(obj, avroSchema);
+        }
+        var fixed = (GenericData.Fixed) obj; //TODO: encoding
+        yield new TextNode(new String(fixed.bytes()));
+      }
+    };
+  }
+
   private static Object processLogicalType(JsonNode node, Schema schema) {
     String logicalTypeName = schema.getLogicalType().getName();
     var conversion = Stream.of(LogicalTypeConversion.values())
         .filter(t -> t.name.equalsIgnoreCase(logicalTypeName))
         .findFirst();
     return conversion
-        .map(c -> c.conversion.apply(node, schema))
+        .map(c -> c.jsonToAvroConversion.apply(node, schema))
+        .orElseThrow(() ->
+            new JsonToAvroConversionException("'%s' logical type is not supported"
+                .formatted(logicalTypeName)));
+  }
+
+  private static JsonNode processLogicalType(Object obj, Schema schema) {
+    String logicalTypeName = schema.getLogicalType().getName();
+    var conversion = Stream.of(LogicalTypeConversion.values())
+        .filter(t -> t.name.equalsIgnoreCase(logicalTypeName))
+        .findFirst();
+    return conversion
+        .map(c -> c.avroToJsonConversion.apply(obj, schema))
         .orElseThrow(() ->
             new JsonToAvroConversionException("'%s' logical type is not supported"
                 .formatted(logicalTypeName)));
@@ -205,6 +319,9 @@ public class JsonAvroConversion {
           assertJsonType(node, JsonNodeType.STRING);
           return java.util.UUID.fromString(node.asText());
         },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
         new SimpleFieldSchema(
             new SimpleJsonType(
                 JsonType.Type.STRING,
@@ -223,6 +340,9 @@ public class JsonAvroConversion {
               "node '%s' can't be converted to decimal logical type"
                   .formatted(node));
         },
+        (obj, schema) -> {
+          return new DecimalNode((BigDecimal) obj);
+        },
         new SimpleFieldSchema(new SimpleJsonType(JsonType.Type.NUMBER))
     ),
 
@@ -238,6 +358,9 @@ public class JsonAvroConversion {
                     .formatted(node));
           }
         },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
         new SimpleFieldSchema(
             new SimpleJsonType(
                 JsonType.Type.STRING,
@@ -256,6 +379,9 @@ public class JsonAvroConversion {
                     .formatted(node));
           }
         },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
         new SimpleFieldSchema(
             new SimpleJsonType(
                 JsonType.Type.STRING,
@@ -274,6 +400,9 @@ public class JsonAvroConversion {
                     .formatted(node));
           }
         },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
         new SimpleFieldSchema(
             new SimpleJsonType(
                 JsonType.Type.STRING,
@@ -292,6 +421,9 @@ public class JsonAvroConversion {
                     .formatted(node));
           }
         },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
         new SimpleFieldSchema(
             new SimpleJsonType(
                 JsonType.Type.STRING,
@@ -314,6 +446,9 @@ public class JsonAvroConversion {
                     .formatted(node));
           }
         },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
         new SimpleFieldSchema(
             new SimpleJsonType(
                 JsonType.Type.STRING,
@@ -326,9 +461,12 @@ public class JsonAvroConversion {
             return LocalDateTime.parse(node.asText());
           }
           // TimeConversions.TimestampMicrosConversion for impl
-          Instant instant = (Instant) TIMESTAMP_MILLIS.conversion.apply(node, schema);
+          Instant instant = (Instant) TIMESTAMP_MILLIS.jsonToAvroConversion.apply(node, schema);
           return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
         },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
         new SimpleFieldSchema(
             new SimpleJsonType(
                 JsonType.Type.STRING,
@@ -340,9 +478,12 @@ public class JsonAvroConversion {
           if (node.isTextual()) {
             return LocalDateTime.parse(node.asText());
           }
-          Instant instant = (Instant) TIMESTAMP_MICROS.conversion.apply(node, schema);
+          Instant instant = (Instant) TIMESTAMP_MICROS.jsonToAvroConversion.apply(node, schema);
           return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
         },
+        (obj, schema) -> {
+          return new TextNode(obj.toString());
+        },
         new SimpleFieldSchema(
             new SimpleJsonType(
                 JsonType.Type.STRING,
@@ -350,17 +491,17 @@ public class JsonAvroConversion {
     );
 
     private final String name;
-
-    //assume that we have AVRO_USE_LOGICAL_TYPE_CONVERTERS set to true in serializing
-    //so, we need to convert into types that it requires
-    private final BiFunction<JsonNode, Schema, Object> conversion;
-
-    //assume
+    private final BiFunction<JsonNode, Schema, Object> jsonToAvroConversion;
+    private final BiFunction<Object, Schema, JsonNode> avroToJsonConversion;
     private final FieldSchema jsonSchema;
 
-    LogicalTypeConversion(String name, BiFunction<JsonNode, Schema, Object> conversion, FieldSchema jsonSchema) {
+    LogicalTypeConversion(String name,
+                          BiFunction<JsonNode, Schema, Object> jsonToAvroConversion,
+                          BiFunction<Object, Schema, JsonNode> avroToJsonConversion,
+                          FieldSchema jsonSchema) {
       this.name = name;
-      this.conversion = conversion;
+      this.jsonToAvroConversion = jsonToAvroConversion;
+      this.avroToJsonConversion = avroToJsonConversion;
       this.jsonSchema = jsonSchema;
     }
 

+ 6 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java

@@ -264,6 +264,10 @@ class SchemaRegistrySerdeTest {
                      "values" : "string",
                      "default": {}
                    }
+                 },
+                 {
+                  "name": "f_union",
+                  "type": ["null", "string", "int" ]
                  }
                ]
             }"""
@@ -278,12 +282,11 @@ class SchemaRegistrySerdeTest {
           "f_float": 123.1,
           "f_double": 123456.123456,
           "f_enum": "SPADES",
-          "f_map": { "k1": "string value" }
+          "f_map": { "k1": "string value" },
+          "f_union": { "int": 123 }
         }
         """;
 
-    //TODO: currently "union"-typed values has different representations on read and write
-
     registryClient.register("test-value", schema);
 
     byte[] serializedBytes = serde.serializer("test", Serde.Target.VALUE).serialize(jsonPayload);

+ 16 - 16
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java

@@ -1,6 +1,6 @@
 package com.provectus.kafka.ui.util.jsonschema;
 
-import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convert;
+import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convertJsonToAvro;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
@@ -19,19 +19,19 @@ class JsonAvroConversionTest {
 
   @Test
   void primitiveRoot() {
-    assertThat(convert("\"str\"", createSchema("\"string\"")))
+    assertThat(convertJsonToAvro("\"str\"", createSchema("\"string\"")))
         .isEqualTo("str");
 
-    assertThat(convert("123", createSchema("\"int\"")))
+    assertThat(convertJsonToAvro("123", createSchema("\"int\"")))
         .isEqualTo(123);
 
-    assertThat(convert("123", createSchema("\"long\"")))
+    assertThat(convertJsonToAvro("123", createSchema("\"long\"")))
         .isEqualTo(123L);
 
-    assertThat(convert("123.123", createSchema("\"float\"")))
+    assertThat(convertJsonToAvro("123.123", createSchema("\"float\"")))
         .isEqualTo(123.123F);
 
-    assertThat(convert("12345.12345", createSchema("\"double\"")))
+    assertThat(convertJsonToAvro("12345.12345", createSchema("\"double\"")))
         .isEqualTo(12345.12345);
   }
 
@@ -91,7 +91,7 @@ class JsonAvroConversionTest {
         }
         """;
 
-    var converted = convert(jsonPayload, schema);
+    var converted = convertJsonToAvro(jsonPayload, schema);
     assertThat(converted).isInstanceOf(GenericData.Record.class);
 
     var record = (GenericData.Record) converted;
@@ -114,13 +114,13 @@ class JsonAvroConversionTest {
   void unionRoot() {
     var sc = createSchema("[ \"null\", \"string\", \"int\" ]");
 
-    var converted = convert("{\"string\":\"string here\"}", sc);
+    var converted = convertJsonToAvro("{\"string\":\"string here\"}", sc);
     assertThat(converted).isEqualTo("string here");
 
-    converted = convert("{\"int\": 123}", sc);
+    converted = convertJsonToAvro("{\"int\": 123}", sc);
     assertThat(converted).isEqualTo(123);
 
-    converted = convert("null", sc);
+    converted = convertJsonToAvro("null", sc);
     assertThat(converted).isEqualTo(null);
   }
 
@@ -142,7 +142,7 @@ class JsonAvroConversionTest {
 
     String jsonPayload = "{ \"f_union\": null }";
 
-    var converted = convert(jsonPayload, schema);
+    var converted = convertJsonToAvro(jsonPayload, schema);
     assertThat(converted).isInstanceOf(GenericData.Record.class);
 
     var record = (GenericData.Record) converted;
@@ -150,11 +150,11 @@ class JsonAvroConversionTest {
 
 
     jsonPayload = "{ \"f_union\": { \"int\": 123 } }";
-    record = (GenericData.Record) convert(jsonPayload, schema);
+    record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
     assertThat(record.get("f_union")).isEqualTo(123);
 
     jsonPayload = "{ \"f_union\": { \"TestAvroRecord\": { \"f_union\": { \"int\": 123  } } } }";
-    record = (GenericData.Record) convert(jsonPayload, schema);
+    record = (GenericData.Record) convertJsonToAvro(jsonPayload, schema);
 
     assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class);
     var innerRec = (GenericData.Record) record.get("f_union");
@@ -216,7 +216,7 @@ class JsonAvroConversionTest {
         }
         """;
 
-    var converted = convert(jsonPayload, schema);
+    var converted = convertJsonToAvro(jsonPayload, schema);
     assertThat(converted).isInstanceOf(GenericData.Record.class);
 
     var record = (GenericData.Record) converted;
@@ -266,7 +266,7 @@ class JsonAvroConversionTest {
         }
         """;
 
-    var converted = convert(jsonPayload, schema);
+    var converted = convertJsonToAvro(jsonPayload, schema);
     assertThat(converted).isInstanceOf(GenericData.Record.class);
 
     var record = (GenericData.Record) converted;
@@ -335,7 +335,7 @@ class JsonAvroConversionTest {
         }
         """;
 
-    var converted = convert(jsonPayload, schema);
+    var converted = convertJsonToAvro(jsonPayload, schema);
     assertThat(converted).isInstanceOf(GenericData.Record.class);
 
     var record = (GenericData.Record) converted;