Explorar o código

Json -> Avro conversion implemented with logical types support

iliax %!s(int64=2) %!d(string=hai) anos
pai
achega
e8beb25560

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/AvroSchemaRegistrySerializer.java

@@ -1,12 +1,13 @@
 package com.provectus.kafka.ui.serdes.builtin.sr;
 
+import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.avro.AvroSchema;
-import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
 import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
 import java.util.Map;
 import org.apache.kafka.common.serialization.Serializer;
 
@@ -25,6 +26,7 @@ class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer<Object> {
         Map.of(
             "schema.registry.url", "wontbeused",
             AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
+            KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true,
             AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
         ),
         isKey
@@ -35,7 +37,7 @@ class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer<Object> {
   @Override
   protected Object serialize(String value, ParsedSchema schema) {
     try {
-      return AvroSchemaUtils.toObject(value, (AvroSchema) schema);
+      return JsonAvroConversion.convert(value, ((AvroSchema) schema).rawSchema());
     } catch (Throwable e) {
       throw new RuntimeException("Failed to serialize record for topic " + topic, e);
     }

+ 30 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/MessageFormatter.java

@@ -3,13 +3,17 @@ package com.provectus.kafka.ui.serdes.builtin.sr;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.protobuf.Message;
 import com.google.protobuf.util.JsonFormat;
-import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
 import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
 import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
+import java.util.IdentityHashMap;
 import java.util.Map;
+import java.util.UUID;
 import lombok.SneakyThrows;
+import org.apache.avro.generic.GenericData;
 
 interface MessageFormatter {
 
@@ -28,16 +32,36 @@ interface MessageFormatter {
 
     AvroMessageFormatter(SchemaRegistryClient client) {
       this.avroDeserializer = new KafkaAvroDeserializer(client);
+      this.avroDeserializer.configure(
+          Map.of(
+              AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused",
+              KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false,
+              KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false,
+              KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true
+          ),
+          false
+      );
     }
 
     @Override
-    @SneakyThrows
     public String format(String topic, byte[] value) {
-      // deserialized will have type, that depends on schema type (record or primitive),
-      // AvroSchemaUtils.toJson(...) method will take it into account
       Object deserialized = avroDeserializer.deserialize(topic, value);
-      byte[] jsonBytes = AvroSchemaUtils.toJson(deserialized);
-      return new String(jsonBytes);
+      return GenericDataWithFixedUuidJsonConversion.INSTANCE.toString(deserialized);
+    }
+
+    //need to be explicitly overwritten before AVRO-3676 fix released
+    static class GenericDataWithFixedUuidJsonConversion extends GenericData {
+
+      static final GenericData INSTANCE = new GenericDataWithFixedUuidJsonConversion();
+
+      @Override
+      protected void toString(Object datum, StringBuilder buffer, IdentityHashMap<Object, Object> seenObjects) {
+        if (datum instanceof UUID uuid) {
+          super.toString(uuid.toString(), buffer, seenObjects);
+        } else {
+          super.toString(datum, buffer, seenObjects);
+        }
+      }
     }
   }
 

+ 5 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverter.java

@@ -5,6 +5,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.avro.Schema;
 import reactor.util.function.Tuple2;
@@ -40,6 +41,10 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter<Schema> {
 
   private FieldSchema convertSchema(Schema schema,
                                     Map<String, FieldSchema> definitions, boolean isRoot) {
+    Optional<FieldSchema> logicalTypeSchema = JsonAvroConversion.LogicalTypeConversion.getJsonSchema(schema);
+    if (logicalTypeSchema.isPresent()) {
+      return logicalTypeSchema.get();
+    }
     if (!schema.isUnion()) {
       JsonType type = convertType(schema);
       switch (type.getType()) {
@@ -66,7 +71,6 @@ public class AvroJsonSchemaConverter implements JsonSchemaConverter<Schema> {
     }
   }
 
-
   // this method formats json-schema field in a way
   // to fit avro-> json encoding rules (https://avro.apache.org/docs/1.11.1/specification/_print/#json-encoding)
   private FieldSchema createUnionSchema(Schema schema, Map<String, FieldSchema> definitions) {

+ 380 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversion.java

@@ -0,0 +1,380 @@
+package com.provectus.kafka.ui.util.jsonschema;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.google.common.collect.Lists;
+import com.provectus.kafka.ui.exception.ValidationException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+import lombok.SneakyThrows;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+
+// converts json into Object that is expected input for KafkaAvroSerializer
+// (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!)
+public class JsonAvroConversion {
+
+  private static final JsonMapper MAPPER = new JsonMapper();
+
+  @SneakyThrows
+  public static Object convert(String jsonString, Schema avroSchema) {
+    JsonNode rootNode = MAPPER.readTree(jsonString);
+    return convert(rootNode, avroSchema);
+  }
+
+  private static Object convert(JsonNode node, Schema avroSchema) {
+    return switch (avroSchema.getType()) {
+      case RECORD -> {
+        assertJsonType(node, JsonNodeType.OBJECT);
+        var rec = new GenericData.Record(avroSchema);
+        for (Schema.Field field : avroSchema.getFields()) {
+          if (node.has(field.name()) && !node.get(field.name()).isNull()) {
+            rec.put(field.name(), convert(node.get(field.name()), field.schema()));
+          }
+        }
+        yield rec;
+      }
+      case MAP -> {
+        assertJsonType(node, JsonNodeType.OBJECT);
+        var map = new LinkedHashMap<String, Object>();
+        var valueSchema = avroSchema.getValueType();
+        node.fields().forEachRemaining(f -> map.put(f.getKey(), convert(f.getValue(), valueSchema)));
+        yield map;
+      }
+      case ARRAY -> {
+        assertJsonType(node, JsonNodeType.ARRAY);
+        var lst = new ArrayList<>();
+        node.elements().forEachRemaining(e -> lst.add(convert(e, avroSchema.getElementType())));
+        yield lst;
+      }
+      case ENUM -> {
+        assertJsonType(node, JsonNodeType.STRING);
+        String symbol = node.textValue();
+        if (!avroSchema.getEnumSymbols().contains(symbol)) {
+          throw new JsonToAvroConversionException("%s is not a part of enum symbols [%s]"
+              .formatted(symbol, avroSchema.getEnumSymbols()));
+        }
+        yield new GenericData.EnumSymbol(avroSchema, symbol);
+      }
+      case UNION -> {
+        // for types from enum (other than null) payload should be an object with single key == name of type
+        // ex: schema = [ "null", "int", "string" ], possible payloads = null, { "string": "str" },  { "int": 123 }
+        if (node.isNull() && avroSchema.getTypes().contains(Schema.create(Schema.Type.NULL))) {
+          yield null;
+        }
+
+        assertJsonType(node, JsonNodeType.OBJECT);
+        var elements = Lists.newArrayList(node.fields());
+        if (elements.size() != 1) {
+          throw new JsonToAvroConversionException(
+              "UNION field value should be an object with single field == type name");
+        }
+        var typeNameToValue = elements.get(0);
+        for (Schema unionType : avroSchema.getTypes()) {
+          if (typeNameToValue.getKey().equals(unionType.getFullName())) {
+            yield convert(typeNameToValue.getValue(), unionType);
+          }
+        }
+        throw new JsonToAvroConversionException(
+            "json value '%s' is cannot be converted to any of union types [%s]"
+                .formatted(node, avroSchema.getTypes()));
+      }
+      case STRING -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.STRING);
+        yield node.textValue();
+      }
+      case LONG -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.NUMBER);
+        assertJsonNumberType(node, JsonParser.NumberType.LONG, JsonParser.NumberType.INT);
+        yield node.longValue();
+      }
+      case INT -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.NUMBER);
+        assertJsonNumberType(node, JsonParser.NumberType.INT);
+        yield node.intValue();
+      }
+      case FLOAT -> {
+        assertJsonType(node, JsonNodeType.NUMBER);
+        assertJsonNumberType(node, JsonParser.NumberType.DOUBLE, JsonParser.NumberType.FLOAT);
+        yield node.floatValue();
+      }
+      case DOUBLE -> {
+        assertJsonType(node, JsonNodeType.NUMBER);
+        assertJsonNumberType(node, JsonParser.NumberType.DOUBLE, JsonParser.NumberType.FLOAT);
+        yield node.doubleValue();
+      }
+      case BOOLEAN -> {
+        assertJsonType(node, JsonNodeType.BOOLEAN);
+        yield node.booleanValue();
+      }
+      case NULL -> {
+        assertJsonType(node, JsonNodeType.NULL);
+        yield null;
+      }
+      case BYTES -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.STRING);
+        yield ByteBuffer.wrap(node.textValue().getBytes(StandardCharsets.UTF_8));
+      }
+      case FIXED -> {
+        if (isLogicalType(avroSchema)) {
+          yield processLogicalType(node, avroSchema);
+        }
+        assertJsonType(node, JsonNodeType.STRING);
+        byte[] bytes = node.textValue().getBytes(StandardCharsets.ISO_8859_1);
+        if (bytes.length != avroSchema.getFixedSize()) {
+          throw new JsonToAvroConversionException(
+              "Fixed field has unexpected size %d (should be %d)"
+                  .formatted(bytes.length, avroSchema.getFixedSize()));
+        }
+        yield new GenericData.Fixed(avroSchema, bytes);
+      }
+    };
+  }
+
+  private static Object processLogicalType(JsonNode node, Schema schema) {
+    String logicalTypeName = schema.getLogicalType().getName();
+    var conversion = Stream.of(LogicalTypeConversion.values())
+        .filter(t -> t.name.equalsIgnoreCase(logicalTypeName))
+        .findFirst();
+    return conversion
+        .map(c -> c.conversion.apply(node, schema))
+        .orElseThrow(() ->
+            new JsonToAvroConversionException("'%s' logical type is not supported"
+                .formatted(logicalTypeName)));
+  }
+
+  private static boolean isLogicalType(Schema schema) {
+    return schema.getLogicalType() != null;
+  }
+
+  public static class JsonToAvroConversionException extends ValidationException {
+    public JsonToAvroConversionException(String message) {
+      super(message);
+    }
+  }
+
+  private static void assertJsonType(JsonNode node, JsonNodeType... allowedTypes) {
+    if (Stream.of(allowedTypes).noneMatch(t -> node.getNodeType() == t)) {
+      throw new JsonToAvroConversionException(
+          "%s node has unexpected type, allowed types %s, actual type %s"
+              .formatted(node, Arrays.toString(allowedTypes), node.getNodeType()));
+    }
+  }
+
+  private static void assertJsonNumberType(JsonNode node, JsonParser.NumberType... allowedTypes) {
+    if (Stream.of(allowedTypes).noneMatch(t -> node.numberType() == t)) {
+      throw new JsonToAvroConversionException(
+          "%s node has unexpected numeric type, allowed types %s, actual type %s"
+              .formatted(node, Arrays.toString(allowedTypes), node.numberType()));
+    }
+  }
+
+  enum LogicalTypeConversion {
+
+    UUID("uuid",
+        (node, schema) -> {
+          assertJsonType(node, JsonNodeType.STRING);
+          return java.util.UUID.fromString(node.asText());
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("uuid"))))
+    ),
+
+    DECIMAL("decimal",
+        (node, schema) -> {
+          if (node.isTextual()) {
+            return new BigDecimal(node.asText());
+          } else if (node.isNumber()) {
+            //TODO: ????
+            return new BigDecimal(node.numberValue().toString());
+          }
+          throw new JsonToAvroConversionException(
+              "node '%s' can't be converted to decimal logical type"
+                  .formatted(node));
+        },
+        new SimpleFieldSchema(new SimpleJsonType(JsonType.Type.NUMBER))
+    ),
+
+    DATE("date",
+        (node, schema) -> {
+          if (node.isInt()) {
+            return LocalDate.ofEpochDay(node.intValue());
+          } else if (node.isTextual()) {
+            return LocalDate.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to date logical type"
+                    .formatted(node));
+          }
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date"))))
+    ),
+
+    TIME_MILLIS("time-millis",
+        (node, schema) -> {
+          if (node.isIntegralNumber()) {
+            return LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(node.longValue()));
+          } else if (node.isTextual()) {
+            return LocalTime.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to time-millis logical type"
+                    .formatted(node));
+          }
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("time"))))
+    ),
+
+    TIME_MICROS("time-micros",
+        (node, schema) -> {
+          if (node.isIntegralNumber()) {
+            return LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos(node.longValue()));
+          } else if (node.isTextual()) {
+            return LocalTime.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to time-micros logical type"
+                    .formatted(node));
+          }
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("time"))))
+    ),
+
+    TIMESTAMP_MILLIS("timestamp-millis",
+        (node, schema) -> {
+          if (node.isIntegralNumber()) {
+            return Instant.ofEpochMilli(node.longValue());
+          } else if (node.isTextual()) {
+            return Instant.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to timestamp-millis logical type"
+                    .formatted(node));
+          }
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date-time"))))
+    ),
+
+    TIMESTAMP_MICROS("timestamp-micros",
+        (node, schema) -> {
+          if (node.isIntegralNumber()) {
+            // TimeConversions.TimestampMicrosConversion for impl
+            long microsFromEpoch = node.longValue();
+            long epochSeconds = microsFromEpoch / (1_000_000L);
+            long nanoAdjustment = (microsFromEpoch % (1_000_000L)) * 1_000L;
+            return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
+          } else if (node.isTextual()) {
+            return Instant.parse(node.asText());
+          } else {
+            throw new JsonToAvroConversionException(
+                "node '%s' can't be converted to timestamp-millis logical type"
+                    .formatted(node));
+          }
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date-time"))))
+    ),
+
+    LOCAL_TIMESTAMP_MILLIS("local-timestamp-millis",
+        (node, schema) -> {
+          if (node.isTextual()) {
+            return LocalDateTime.parse(node.asText());
+          }
+          // TimeConversions.TimestampMicrosConversion for impl
+          Instant instant = (Instant) TIMESTAMP_MILLIS.conversion.apply(node, schema);
+          return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date-time"))))
+    ),
+
+    LOCAL_TIMESTAMP_MICROS("local-timestamp-micros",
+        (node, schema) -> {
+          if (node.isTextual()) {
+            return LocalDateTime.parse(node.asText());
+          }
+          Instant instant = (Instant) TIMESTAMP_MICROS.conversion.apply(node, schema);
+          return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+        },
+        new SimpleFieldSchema(
+            new SimpleJsonType(
+                JsonType.Type.STRING,
+                Map.of("format", new TextNode("date-time"))))
+    );
+
+    private final String name;
+
+    //assume that we have AVRO_USE_LOGICAL_TYPE_CONVERTERS set to true in serializing
+    //so, we need to convert into types that it requires
+    private final BiFunction<JsonNode, Schema, Object> conversion;
+
+    //assume
+    private final FieldSchema jsonSchema;
+
+    LogicalTypeConversion(String name, BiFunction<JsonNode, Schema, Object> conversion, FieldSchema jsonSchema) {
+      this.name = name;
+      this.conversion = conversion;
+      this.jsonSchema = jsonSchema;
+    }
+
+    static Optional<FieldSchema> getJsonSchema(Schema schema) {
+      if (schema.getLogicalType() == null) {
+        return Optional.empty();
+      }
+      String logicalTypeName = schema.getLogicalType().getName();
+      return Stream.of(JsonAvroConversion.LogicalTypeConversion.values())
+          .filter(t -> t.name.equalsIgnoreCase(logicalTypeName))
+          .map(c -> c.jsonSchema)
+          .findFirst();
+    }
+  }
+
+
+}

+ 82 - 4
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.serdes.builtin.sr;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
@@ -54,7 +53,8 @@ class SchemaRegistrySerdeTest {
 
     SchemaDescription schemaDescription = schemaOptional.get();
     assertThat(schemaDescription.getSchema())
-        .contains("{\"$id\":\"int\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"type\":\"integer\"}");
+        .contains(
+            "{\"$id\":\"int\",\"$schema\":\"https://json-schema.org/draft/2020-12/schema\",\"type\":\"integer\"}");
     assertThat(schemaDescription.getAdditionalProperties())
         .containsOnlyKeys("subject", "schemaId", "latestVersion", "type")
         .containsEntry("subject", subject)
@@ -84,11 +84,15 @@ class SchemaRegistrySerdeTest {
             + "    {"
             + "      \"name\": \"field2\","
             + "      \"type\": \"int\""
+            + "    },"
+            + "    {"
+            + "      \"name\": \"field3\","
+            + "      \"type\": {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 22, \"scale\":10}"
             + "    }"
             + "  ]"
             + "}"
     );
-    String jsonValue = "{ \"field1\":\"testStr\", \"field2\": 123 }";
+    String jsonValue = "{ \"field1\":\"testStr\", \"field2\": 123, \"field3\": 2.1617413862327545E11 }";
     String topic = "test";
 
     int schemaId = registryClient.register(topic + "-value", schema);
@@ -189,7 +193,8 @@ class SchemaRegistrySerdeTest {
     assertThat(serde.canSerialize(topic, Serde.Target.VALUE)).isFalse();
   }
 
-  private void assertJsonsEqual(String expected, String actual) throws JsonProcessingException {
+  @SneakyThrows
+  private void assertJsonsEqual(String expected, String actual) {
     var mapper = new JsonMapper();
     assertThat(mapper.readTree(actual)).isEqualTo(mapper.readTree(expected));
   }
@@ -216,4 +221,77 @@ class SchemaRegistrySerdeTest {
     return output.toByteArray();
   }
 
+  @Test
+  void logicalTypesRepresentationIsConsistentForSerializationAndDeserialization() throws Exception {
+    AvroSchema schema = new AvroSchema(
+        """
+             {
+               "type": "record",
+               "name": "TestAvroRecord",
+               "fields": [
+                 {
+                   "name": "lt_date",
+                   "type": { "type": "int", "logicalType": "date" }
+                 },
+                 {
+                   "name": "lt_uuid",
+                   "type": { "type": "string", "logicalType": "uuid" }
+                 },
+                 {
+                   "name": "lt_decimal",
+                   "type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 }
+                 },
+                 {
+                   "name": "lt_time_millis",
+                   "type": { "type": "int", "logicalType": "time-millis"}
+                 },
+                 {
+                   "name": "lt_time_micros",
+                   "type": { "type": "long", "logicalType": "time-micros"}
+                 },
+                 {
+                   "name": "lt_timestamp_millis",
+                   "type": { "type": "long", "logicalType": "timestamp-millis" }
+                 },
+                 {
+                   "name": "lt_timestamp_micros",
+                   "type": { "type": "long", "logicalType": "timestamp-micros" }
+                 },
+                 {
+                   "name": "lt_local_timestamp_millis",
+                   "type": { "type": "long", "logicalType": "local-timestamp-millis" }
+                 },
+                 {
+                   "name": "lt_local_timestamp_micros",
+                   "type": { "type": "long", "logicalType": "local-timestamp-micros" }
+                 }
+               ]
+            }"""
+    );
+
+    String jsonPayload = """
+        {
+          "lt_date":"1991-08-14",
+          "lt_decimal": 2.1617413862327545E11,
+          "lt_time_millis": "10:15:30.001",
+          "lt_time_micros": "10:15:30.123456",
+          "lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908",
+          "lt_timestamp_millis": "2007-12-03T10:15:30.123Z",
+          "lt_timestamp_micros": "2007-12-03T10:15:30.123456Z",
+          "lt_local_timestamp_millis": "2017-12-03T10:15:30.123",
+          "lt_local_timestamp_micros": "2017-12-03T10:15:30.123456"
+        }
+        """;
+
+    registryClient.register("test-value", schema);
+
+    byte[] serialized = serde.serializer("test", Serde.Target.VALUE).serialize(jsonPayload);
+
+    var deserializedJson = serde.deserializer("test", Serde.Target.VALUE)
+        .deserialize(null, serialized)
+        .getResult();
+
+    assertJsonsEqual(jsonPayload, deserializedJson);
+  }
+
 }

+ 365 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/JsonAvroConversionTest.java

@@ -0,0 +1,365 @@
+package com.provectus.kafka.ui.util.jsonschema;
+
+import static com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion.convert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.junit.jupiter.api.Test;
+
+class JsonAvroConversionTest {
+
+  @Test
+  void primitiveRoot() {
+    assertThat(convert("\"str\"", createSchema("\"string\"")))
+        .isEqualTo("str");
+
+    assertThat(convert("123", createSchema("\"int\"")))
+        .isEqualTo(123);
+
+    assertThat(convert("123", createSchema("\"long\"")))
+        .isEqualTo(123L);
+
+    assertThat(convert("123.123", createSchema("\"float\"")))
+        .isEqualTo(123.123F);
+
+    assertThat(convert("12345.12345", createSchema("\"double\"")))
+        .isEqualTo(12345.12345);
+  }
+
+  @Test
+  void primitiveTypedFields() {
+    var schema = createSchema(
+        """
+             {
+               "type": "record",
+               "name": "TestAvroRecord",
+               "fields": [
+                 {
+                   "name": "f_int",
+                   "type": "int"
+                 },
+                 {
+                   "name": "f_long",
+                   "type": "long"
+                 },
+                 {
+                   "name": "f_string",
+                   "type": "string"
+                 },
+                 {
+                   "name": "f_boolean",
+                   "type": "boolean"
+                 },
+                 {
+                   "name": "f_float",
+                   "type": "float"
+                 },
+                 {
+                   "name": "f_double",
+                   "type": "double"
+                 },
+                 {
+                   "name": "f_enum",
+                   "type" : {
+                    "type": "enum",
+                    "name": "Suit",
+                    "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
+                   }
+                 }
+               ]
+            }"""
+    );
+
+    String jsonPayload = """
+        {
+          "f_int": 123,
+          "f_long": 4294967294,
+          "f_string": "string here",
+          "f_boolean": true,
+          "f_float": 123.1,
+          "f_double": 123456.123456,
+          "f_enum": "SPADES"
+        }
+        """;
+
+    var converted = convert(jsonPayload, schema);
+    assertThat(converted).isInstanceOf(GenericData.Record.class);
+
+    var record = (GenericData.Record) converted;
+    assertThat(record.get("f_int")).isEqualTo(123);
+    assertThat(record.get("f_long")).isEqualTo(4294967294L);
+    assertThat(record.get("f_string")).isEqualTo("string here");
+    assertThat(record.get("f_boolean")).isEqualTo(true);
+    assertThat(record.get("f_float")).isEqualTo(123.1f);
+    assertThat(record.get("f_double")).isEqualTo(123456.123456);
+    assertThat(record.get("f_enum"))
+        .isEqualTo(
+            new GenericData.EnumSymbol(
+                schema.getField("f_enum").schema(),
+                "SPADES"
+            )
+        );
+  }
+
+  @Test
+  void unionRoot() {
+    var sc = createSchema("[ \"null\", \"string\", \"int\" ]");
+
+    var converted = convert("{\"string\":\"string here\"}", sc);
+    assertThat(converted).isEqualTo("string here");
+
+    converted = convert("{\"int\": 123}", sc);
+    assertThat(converted).isEqualTo(123);
+
+    converted = convert("null", sc);
+    assertThat(converted).isEqualTo(null);
+  }
+
+  @Test
+  void unionField() {
+    var schema = createSchema(
+        """
+             {
+               "type": "record",
+               "name": "TestAvroRecord",
+               "fields": [
+                 {
+                   "name": "f_union",
+                   "type": [ "null", "int", "TestAvroRecord"]
+                 }
+               ]
+            }"""
+    );
+
+    String jsonPayload = "{ \"f_union\": null }";
+
+    var converted = convert(jsonPayload, schema);
+    assertThat(converted).isInstanceOf(GenericData.Record.class);
+
+    var record = (GenericData.Record) converted;
+    assertThat(record.get("f_union")).isNull();
+
+
+    jsonPayload = "{ \"f_union\": { \"int\": 123 } }";
+    record = (GenericData.Record) convert(jsonPayload, schema);
+    assertThat(record.get("f_union")).isEqualTo(123);
+
+    jsonPayload = "{ \"f_union\": { \"TestAvroRecord\": { \"f_union\": { \"int\": 123  } } } }";
+    record = (GenericData.Record) convert(jsonPayload, schema);
+
+    assertThat(record.get("f_union")).isInstanceOf(GenericData.Record.class);
+    var innerRec = (GenericData.Record) record.get("f_union");
+    assertThat(innerRec.get("f_union")).isEqualTo(123);
+  }
+
+  @Test
+  void mapField() {
+    var schema = createSchema(
+        """
+             {
+               "type": "record",
+               "name": "TestAvroRecord",
+               "fields": [
+                 {
+                   "name": "long_map",
+                   "type": {
+                     "type": "map",
+                     "values" : "long",
+                     "default": {}
+                   }
+                 },
+                 {
+                   "name": "string_map",
+                   "type": {
+                     "type": "map",
+                     "values" : "string",
+                     "default": {}
+                   }
+                 },
+                 {
+                   "name": "self_ref_map",
+                   "type": {
+                     "type": "map",
+                     "values" : "TestAvroRecord",
+                     "default": {}
+                   }
+                 }
+               ]
+            }"""
+    );
+
+    String jsonPayload = """
+        {
+          "long_map": {
+            "k1": 123,
+            "k2": 456
+          },
+          "string_map": {
+            "k3": "s1",
+            "k4": "s2"
+          },
+          "self_ref_map": {
+            "k5" : {
+              "long_map": { "_k1": 222 },
+              "string_map": { "_k2": "_s1" }
+            }
+          }
+        }
+        """;
+
+    var converted = convert(jsonPayload, schema);
+    assertThat(converted).isInstanceOf(GenericData.Record.class);
+
+    var record = (GenericData.Record) converted;
+    assertThat(record.get("long_map"))
+        .isEqualTo(Map.of("k1", 123L, "k2", 456L));
+    assertThat(record.get("string_map"))
+        .isEqualTo(Map.of("k3", "s1", "k4", "s2"));
+    assertThat(record.get("self_ref_map"))
+        .isNotNull();
+
+    Map<String, Object> selfRefMapField = (Map<String, Object>) record.get("self_ref_map");
+    assertThat(selfRefMapField)
+        .hasSize(1)
+        .hasEntrySatisfying("k5", v -> {
+          assertThat(v).isInstanceOf(GenericData.Record.class);
+          var innerRec = (GenericData.Record) v;
+          assertThat(innerRec.get("long_map"))
+              .isEqualTo(Map.of("_k1", 222L));
+          assertThat(innerRec.get("string_map"))
+              .isEqualTo(Map.of("_k2", "_s1"));
+        });
+  }
+
+  @Test
+  void arrayField() {
+    var schema = createSchema(
+        """
+             {
+               "type": "record",
+               "name": "TestAvroRecord",
+               "fields": [
+                 {
+                   "name": "f_array",
+                   "type": {
+                      "type": "array",
+                      "items" : "string",
+                      "default": []
+                    }
+                 }
+               ]
+            }"""
+    );
+
+    String jsonPayload = """
+        {
+          "f_array": [ "e1", "e2" ]
+        }
+        """;
+
+    var converted = convert(jsonPayload, schema);
+    assertThat(converted).isInstanceOf(GenericData.Record.class);
+
+    var record = (GenericData.Record) converted;
+    assertThat(record.get("f_array")).isEqualTo(List.of("e1", "e2"));
+  }
+
+  @Test
+  void logicalTypesField() {
+    var schema = createSchema(
+        """
+             {
+               "type": "record",
+               "name": "TestAvroRecord",
+               "fields": [
+                 {
+                   "name": "lt_date",
+                   "type": { "type": "int", "logicalType": "date" }
+                 },
+                 {
+                   "name": "lt_uuid",
+                   "type": { "type": "string", "logicalType": "uuid" }
+                 },
+                 {
+                   "name": "lt_decimal",
+                   "type": { "type": "bytes", "logicalType": "decimal", "precision": 22, "scale":10 }
+                 },
+                 {
+                   "name": "lt_time_millis",
+                   "type": { "type": "int", "logicalType": "time-millis"}
+                 },
+                 {
+                   "name": "lt_time_micros",
+                   "type": { "type": "long", "logicalType": "time-micros"}
+                 },
+                 {
+                   "name": "lt_timestamp_millis",
+                   "type": { "type": "long", "logicalType": "timestamp-millis" }
+                 },
+                 {
+                   "name": "lt_timestamp_micros",
+                   "type": { "type": "long", "logicalType": "timestamp-micros" }
+                 },
+                 {
+                   "name": "lt_local_timestamp_millis",
+                   "type": { "type": "long", "logicalType": "local-timestamp-millis" }
+                 },
+                 {
+                   "name": "lt_local_timestamp_micros",
+                   "type": { "type": "long", "logicalType": "local-timestamp-micros" }
+                 }
+               ]
+            }"""
+    );
+
+    String jsonPayload = """
+        {
+          "lt_date":"1991-08-14",
+          "lt_decimal": 2.1617413862327545E11,
+          "lt_time_millis": "10:15:30.001",
+          "lt_time_micros": "10:15:30.123456",
+          "lt_uuid": "a37b75ca-097c-5d46-6119-f0637922e908",
+          "lt_timestamp_millis": "2007-12-03T10:15:30.123Z",
+          "lt_timestamp_micros": "2007-12-13T10:15:30.123456Z",
+          "lt_local_timestamp_millis": "2017-12-03T10:15:30.123",
+          "lt_local_timestamp_micros": "2017-12-13T10:15:30.123456"
+        }
+        """;
+
+    var converted = convert(jsonPayload, schema);
+    assertThat(converted).isInstanceOf(GenericData.Record.class);
+
+    var record = (GenericData.Record) converted;
+
+    assertThat(record.get("lt_date"))
+        .isEqualTo(LocalDate.of(1991, 8, 14));
+    assertThat(record.get("lt_decimal"))
+        .isEqualTo(new BigDecimal("2.1617413862327545E11"));
+    assertThat(record.get("lt_time_millis"))
+        .isEqualTo(LocalTime.parse("10:15:30.001"));
+    assertThat(record.get("lt_time_micros"))
+        .isEqualTo(LocalTime.parse("10:15:30.123456"));
+    assertThat(record.get("lt_timestamp_millis"))
+        .isEqualTo(Instant.parse("2007-12-03T10:15:30.123Z"));
+    assertThat(record.get("lt_timestamp_micros"))
+        .isEqualTo(Instant.parse("2007-12-13T10:15:30.123456Z"));
+    assertThat(record.get("lt_local_timestamp_millis"))
+        .isEqualTo(LocalDateTime.parse("2017-12-03T10:15:30.123"));
+    assertThat(record.get("lt_local_timestamp_micros"))
+        .isEqualTo(LocalDateTime.parse("2017-12-13T10:15:30.123456"));
+  }
+
+  private Schema createSchema(String schema) {
+    return new AvroSchema(schema).rawSchema();
+  }
+
+}