ソースを参照

resolves #2764, resolves #2943

tom.kaszuba 2 年 前
コミット
3c87f3a203

+ 35 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/MessageFormatter.java

@@ -3,13 +3,19 @@ package com.provectus.kafka.ui.serdes.builtin.sr;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.protobuf.Message;
 import com.google.protobuf.util.JsonFormat;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
 import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
 import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
 import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
 import java.util.Map;
 import lombok.SneakyThrows;
+import org.apache.avro.Conversions;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericData;
 
 interface MessageFormatter {
 
@@ -27,7 +33,29 @@ interface MessageFormatter {
     private final KafkaAvroDeserializer avroDeserializer;
 
     AvroMessageFormatter(SchemaRegistryClient client) {
-      this.avroDeserializer = new KafkaAvroDeserializer(client);
+      avroDeserializer = new KafkaAvroDeserializer(client);
+      // The key can be anything since it is ignored during the actual
+      // deserialization later on but is required for the configure call
+      final boolean isKey = false;
+
+      GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion());
+      GenericData.get().addLogicalTypeConversion(new Conversions.UUIDConversion());
+      GenericData.get().addLogicalTypeConversion(new TimeConversions.DateConversion());
+      GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
+      GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+      GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
+      GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+      GenericData.get().addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
+      GenericData.get().addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
+
+      avroDeserializer.configure(
+          Map.of(
+              AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused",
+              KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false,
+              KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false,
+              KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true
+          ),
+          isKey);
     }
 
     @Override
@@ -36,8 +64,12 @@ interface MessageFormatter {
       // deserialized will have type, that depends on schema type (record or primitive),
       // AvroSchemaUtils.toJson(...) method will take it into account
       Object deserialized = avroDeserializer.deserialize(topic, value);
-      byte[] jsonBytes = AvroSchemaUtils.toJson(deserialized);
-      return new String(jsonBytes);
+      if (deserialized instanceof GenericData.Record) {
+        return deserialized.toString();
+      } else {
+        byte[] jsonBytes = AvroSchemaUtils.toJson(deserialized);
+        return new String(jsonBytes);
+      }
     }
   }
 

+ 34 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java

@@ -130,6 +130,40 @@ class SchemaRegistrySerdeTest {
         .contains(Map.entry("schemaId", schemaId));
   }
 
+  @Test
+  void deserializeReturnsJsonAvroWithLogicalTypesMsgJsonRepresentation() throws RestClientException, IOException {
+    AvroSchema schema = new AvroSchema(
+        "{"
+            + "  \"type\": \"record\","
+            + "  \"name\": \"TestAvroRecord1\","
+            + "  \"fields\": ["
+            + "    {"
+            + "      \"name\": \"field1\","
+            + "      \"type\": {\"type\": \"int\", \"logicalType\": \"date\"}"
+            + "    },"
+            + "    {"
+            + "      \"name\": \"field2\","
+            + "      \"type\": {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 22, \"scale\":10}"
+            + "    }"
+            + "  ]"
+            + "}"
+    );
+    String jsonValueIn = "{ \"field1\": 7895, \"field2\": \"u001aÇØo\\u0080\" }";
+    String jsonValueOut = "{\"field1\":\"1991-08-14\",\"field2\":2.1617413862327545E11}";
+
+    String topic = "test";
+    int schemaId = registryClient.register(topic + "-value", schema);
+
+    byte[] data = toBytesWithMagicByteAndSchemaId(schemaId, jsonValueIn, schema);
+    var result = serde.deserializer(topic, Serde.Target.VALUE).deserialize(null, data);
+
+    assertJsonsEqual(jsonValueOut, result.getResult());
+    assertThat(result.getType()).isEqualTo(DeserializeResult.Type.JSON);
+    assertThat(result.getAdditionalProperties())
+        .contains(Map.entry("type", "AVRO"))
+        .contains(Map.entry("schemaId", schemaId));
+  }
+
   @Nested
   class SerdeWithDisabledSubjectExistenceCheck {