Browse Source

ISSUE-897: SchemaRegistryAwareRecordSerDe refactor (#1593)

* SchemaRegistryAwareRecordSerDe refactor

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Ilya Kuramshin 3 năm trước cách đây
mục cha
commit
b3ef8da446

+ 6 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.serde;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.serde.schemaregistry.SchemaRegistryAwareRecordSerDe;
 import com.provectus.kafka.ui.service.ClustersStorage;
@@ -17,10 +16,8 @@ import org.springframework.stereotype.Component;
 public class DeserializationService {
 
   private final ClustersStorage clustersStorage;
-  private final ObjectMapper objectMapper;
   private Map<String, RecordSerDe> clusterDeserializers;
 
-
   @PostConstruct
   public void init() {
     this.clusterDeserializers = clustersStorage.getKafkaClusters().stream()
@@ -35,11 +32,13 @@ public class DeserializationService {
       if (cluster.getProtobufFile() != null) {
         log.info("Using ProtobufFileRecordSerDe for cluster '{}'", cluster.getName());
         return new ProtobufFileRecordSerDe(cluster.getProtobufFile(),
-            cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName(),
-            objectMapper);
-      } else {
+            cluster.getProtobufMessageNameByTopic(), cluster.getProtobufMessageName());
+      } else if (cluster.getSchemaRegistry() != null) {
         log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName());
-        return new SchemaRegistryAwareRecordSerDe(cluster, objectMapper);
+        return new SchemaRegistryAwareRecordSerDe(cluster);
+      } else {
+        log.info("Using SimpleRecordSerDe for cluster '{}'", cluster.getName());
+        return new SimpleRecordSerDe();
       }
     } catch (Throwable e) {
       throw new RuntimeException("Can't init deserializer", e);

+ 3 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDe.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.serde;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.util.JsonFormat;
@@ -30,16 +29,14 @@ import org.apache.kafka.common.utils.Bytes;
 //TODO: currently we assume that keys for this serde are always string - need to discuss if it is ok
 public class ProtobufFileRecordSerDe implements RecordSerDe {
   private final ProtobufSchema protobufSchema;
-  private final ObjectMapper objectMapper;
   private final Path protobufSchemaPath;
   private final ProtobufSchemaConverter schemaConverter = new ProtobufSchemaConverter();
   private final Map<String, Descriptor> messageDescriptorMap;
   private final Descriptor defaultMessageDescriptor;
 
   public ProtobufFileRecordSerDe(Path protobufSchemaPath, Map<String, String> messageNameMap,
-                                 String defaultMessageName, ObjectMapper objectMapper)
+                                 String defaultMessageName)
       throws IOException {
-    this.objectMapper = objectMapper;
     this.protobufSchemaPath = protobufSchemaPath;
     try (final Stream<String> lines = Files.lines(protobufSchemaPath)) {
       var schema = new ProtobufSchema(
@@ -130,12 +127,12 @@ public class ProtobufFileRecordSerDe implements RecordSerDe {
     final MessageSchemaDTO keySchema = new MessageSchemaDTO()
         .name(protobufSchema.fullName())
         .source(MessageSchemaDTO.SourceEnum.PROTO_FILE)
-        .schema(JsonSchema.stringSchema().toJson(objectMapper));
+        .schema(JsonSchema.stringSchema().toJson());
 
     final MessageSchemaDTO valueSchema = new MessageSchemaDTO()
         .name(protobufSchema.fullName())
         .source(MessageSchemaDTO.SourceEnum.PROTO_FILE)
-        .schema(jsonSchema.toJson(objectMapper));
+        .schema(jsonSchema.toJson());
 
     return new TopicMessageSchemaDTO()
         .key(keySchema)

+ 7 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/SimpleRecordSerDe.java

@@ -1,10 +1,8 @@
 package com.provectus.kafka.ui.serde;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.provectus.kafka.ui.model.MessageSchemaDTO;
 import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
-import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
-import com.provectus.kafka.ui.util.ConsumerRecordUtil;
+import com.provectus.kafka.ui.serde.schemaregistry.StringMessageFormatter;
 import com.provectus.kafka.ui.util.jsonschema.JsonSchema;
 import javax.annotation.Nullable;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -13,18 +11,18 @@ import org.apache.kafka.common.utils.Bytes;
 
 public class SimpleRecordSerDe implements RecordSerDe {
 
-  private static final ObjectMapper objectMapper = new ObjectMapper();
+  private static final StringMessageFormatter FORMATTER = new StringMessageFormatter();
 
   @Override
   public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
     var builder = DeserializedKeyValue.builder();
     if (msg.key() != null) {
-      builder.key(new String(msg.key().get()))
-          .keyFormat(MessageFormat.UNKNOWN);
+      builder.key(FORMATTER.format(msg.topic(), msg.key().get()));
+      builder.keyFormat(FORMATTER.getFormat());
     }
     if (msg.value() != null) {
-      builder.value(new String(msg.value().get()))
-          .valueFormat(MessageFormat.UNKNOWN);
+      builder.value(FORMATTER.format(msg.topic(), msg.value().get()));
+      builder.valueFormat(FORMATTER.getFormat());
     }
     return builder.build();
   }
@@ -47,7 +45,7 @@ public class SimpleRecordSerDe implements RecordSerDe {
     final MessageSchemaDTO schema = new MessageSchemaDTO()
         .name("unknown")
         .source(MessageSchemaDTO.SourceEnum.UNKNOWN)
-        .schema(JsonSchema.stringSchema().toJson(objectMapper));
+        .schema(JsonSchema.stringSchema().toJson());
     return new TopicMessageSchemaDTO()
         .key(schema)
         .value(schema);

+ 8 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageFormat.java

@@ -1,8 +1,15 @@
 package com.provectus.kafka.ui.serde.schemaregistry;
 
+import java.util.Optional;
+import org.apache.commons.lang3.EnumUtils;
+
 public enum MessageFormat {
   AVRO,
   JSON,
   PROTOBUF,
-  UNKNOWN
+  UNKNOWN;
+
+  public static Optional<MessageFormat> fromString(String typeString) {
+    return Optional.ofNullable(EnumUtils.getEnum(MessageFormat.class, typeString));
+  }
 }

+ 75 - 135
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java

@@ -4,12 +4,13 @@ package com.provectus.kafka.ui.serde.schemaregistry;
 import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
 import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.MessageSchemaDTO;
 import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
 import com.provectus.kafka.ui.serde.RecordSerDe;
+import com.provectus.kafka.ui.serde.RecordSerDe.DeserializedKeyValue.DeserializedKeyValueBuilder;
 import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
 import com.provectus.kafka.ui.util.jsonschema.JsonSchema;
 import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
@@ -29,10 +30,8 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -43,32 +42,17 @@ import org.apache.kafka.common.utils.Bytes;
 @Slf4j
 public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
 
-  private static final int CLIENT_IDENTITY_MAP_CAPACITY = 100;
+  private static final StringMessageFormatter FALLBACK_FORMATTER = new StringMessageFormatter();
 
-  private static final StringMessageFormatter stringFormatter = new StringMessageFormatter();
   private static final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter();
   private static final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter();
 
   private final KafkaCluster cluster;
-  private final Map<String, MessageFormatter> valueFormatMap = new ConcurrentHashMap<>();
-  private final Map<String, MessageFormatter> keyFormatMap = new ConcurrentHashMap<>();
-
-  @Nullable
   private final SchemaRegistryClient schemaRegistryClient;
-  @Nullable
-  private final AvroMessageFormatter avroFormatter;
-  @Nullable
-  private final ProtobufMessageFormatter protobufFormatter;
-  @Nullable
-  private final JsonSchemaMessageFormatter jsonSchemaMessageFormatter;
-
-  private final ObjectMapper objectMapper;
-
-  private SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) {
-    if (cluster.getSchemaRegistry() == null) {
-      throw new ValidationException("schemaRegistry is not specified");
-    }
 
+  private final Map<MessageFormat, MessageFormatter> schemaRegistryFormatters;
+
+  private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) {
     List<SchemaProvider> schemaProviders =
         List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider());
 
@@ -88,51 +72,35 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
     }
     return new CachedSchemaRegistryClient(
         cluster.getSchemaRegistry().getUrl(),
-        CLIENT_IDENTITY_MAP_CAPACITY,
+        1_000,
         schemaProviders,
         configs
     );
   }
 
-  public SchemaRegistryAwareRecordSerDe(KafkaCluster cluster, ObjectMapper objectMapper) {
+  public SchemaRegistryAwareRecordSerDe(KafkaCluster cluster) {
+    this(cluster, createSchemaRegistryClient(cluster));
+  }
+
+  @VisibleForTesting
+  SchemaRegistryAwareRecordSerDe(KafkaCluster cluster, SchemaRegistryClient schemaRegistryClient) {
     this.cluster = cluster;
-    this.objectMapper = objectMapper;
-    this.schemaRegistryClient = cluster.getSchemaRegistry() != null
-        ? createSchemaRegistryClient(cluster)
-        : null;
-    if (schemaRegistryClient != null) {
-      this.avroFormatter = new AvroMessageFormatter(schemaRegistryClient);
-      this.protobufFormatter = new ProtobufMessageFormatter(schemaRegistryClient);
-      this.jsonSchemaMessageFormatter = new JsonSchemaMessageFormatter(schemaRegistryClient);
-    } else {
-      this.avroFormatter = null;
-      this.protobufFormatter = null;
-      this.jsonSchemaMessageFormatter = null;
-    }
+    this.schemaRegistryClient = schemaRegistryClient;
+    this.schemaRegistryFormatters = Map.of(
+        MessageFormat.AVRO, new AvroMessageFormatter(schemaRegistryClient),
+        MessageFormat.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient),
+        MessageFormat.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
+    );
   }
 
   public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
     try {
-      var builder = DeserializedKeyValue.builder();
+      DeserializedKeyValueBuilder builder = DeserializedKeyValue.builder();
       if (msg.key() != null) {
-        MessageFormatter messageFormatter = getMessageFormatter(msg, true);
-        builder.key(messageFormatter.format(msg.topic(), msg.key().get()));
-        builder.keyFormat(messageFormatter.getFormat());
-        builder.keySchemaId(
-            getSchemaId(msg.key(), messageFormatter.getFormat())
-                .map(String::valueOf)
-                .orElse(null)
-        );
+        fillDeserializedKvBuilder(msg, true, builder);
       }
       if (msg.value() != null) {
-        MessageFormatter messageFormatter = getMessageFormatter(msg, false);
-        builder.value(messageFormatter.format(msg.topic(), msg.value().get()));
-        builder.valueFormat(messageFormatter.getFormat());
-        builder.valueSchemaId(
-            getSchemaId(msg.value(), messageFormatter.getFormat())
-                .map(String::valueOf)
-                .orElse(null)
-        );
+        fillDeserializedKvBuilder(msg, false, builder);
       }
       return builder.build();
     } catch (Throwable e) {
@@ -140,6 +108,41 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
     }
   }
 
+  private void fillDeserializedKvBuilder(ConsumerRecord<Bytes, Bytes> rec,
+                                         boolean isKey,
+                                         DeserializedKeyValueBuilder builder) {
+    Optional<Integer> schemaId = extractSchemaIdFromMsg(rec, isKey);
+    Optional<MessageFormat> format = schemaId.flatMap(this::getMessageFormatBySchemaId);
+    if (format.isPresent() && schemaRegistryFormatters.containsKey(format.get())) {
+      var formatter = schemaRegistryFormatters.get(format.get());
+      try {
+        var deserialized = formatter.format(rec.topic(), isKey ? rec.key().get() : rec.value().get());
+        if (isKey) {
+          builder.key(deserialized);
+          builder.keyFormat(formatter.getFormat());
+          builder.keySchemaId(String.valueOf(schemaId.get()));
+        } else {
+          builder.value(deserialized);
+          builder.valueFormat(formatter.getFormat());
+          builder.valueSchemaId(String.valueOf(schemaId.get()));
+        }
+        return;
+      } catch (Exception e) {
+        log.trace("Can't deserialize record {} with formatter {}",
+            rec, formatter.getClass().getSimpleName(), e);
+      }
+    }
+
+    // fallback
+    if (isKey) {
+      builder.key(FALLBACK_FORMATTER.format(rec.topic(), isKey ? rec.key().get() : rec.value().get()));
+      builder.keyFormat(FALLBACK_FORMATTER.getFormat());
+    } else {
+      builder.value(FALLBACK_FORMATTER.format(rec.topic(), isKey ? rec.key().get() : rec.value().get()));
+      builder.valueFormat(FALLBACK_FORMATTER.getFormat());
+    }
+  }
+
   @Override
   public ProducerRecord<byte[], byte[]> serialize(String topic,
                                                   @Nullable String key,
@@ -192,10 +195,10 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
     final Optional<SchemaMetadata> maybeKeySchema = getSchemaBySubject(topic, true);
 
     String sourceValueSchema = maybeValueSchema.map(this::convertSchema)
-        .orElseGet(() -> JsonSchema.stringSchema().toJson(objectMapper));
+        .orElseGet(() -> JsonSchema.stringSchema().toJson());
 
     String sourceKeySchema = maybeKeySchema.map(this::convertSchema)
-        .orElseGet(() -> JsonSchema.stringSchema().toJson(objectMapper));
+        .orElseGet(() -> JsonSchema.stringSchema().toJson());
 
     final MessageSchemaDTO keySchema = new MessageSchemaDTO()
         .name(maybeKeySchema.map(
@@ -222,110 +225,47 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
     String jsonSchema;
     URI basePath = new URI(cluster.getSchemaRegistry().getFirstUrl())
         .resolve(Integer.toString(schema.getId()));
-    final ParsedSchema schemaById = Objects.requireNonNull(schemaRegistryClient)
-        .getSchemaById(schema.getId());
+    final ParsedSchema schemaById = schemaRegistryClient.getSchemaById(schema.getId());
 
     if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
       final ProtobufSchema protobufSchema = (ProtobufSchema) schemaById;
       jsonSchema = protoSchemaConverter
           .convert(basePath, protobufSchema.toDescriptor())
-          .toJson(objectMapper);
+          .toJson();
     } else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) {
       final AvroSchema avroSchema = (AvroSchema) schemaById;
       jsonSchema = avroSchemaConverter
           .convert(basePath, avroSchema.rawSchema())
-          .toJson(objectMapper);
+          .toJson();
     } else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) {
       jsonSchema = schema.getSchema();
     } else {
-      jsonSchema = JsonSchema.stringSchema().toJson(objectMapper);
+      jsonSchema = JsonSchema.stringSchema().toJson();
     }
 
     return jsonSchema;
   }
 
-  private MessageFormatter getMessageFormatter(ConsumerRecord<Bytes, Bytes> msg, boolean isKey) {
-    if (isKey) {
-      return keyFormatMap.computeIfAbsent(msg.topic(), k -> detectFormat(msg, true));
-    } else {
-      return valueFormatMap.computeIfAbsent(msg.topic(), k -> detectFormat(msg, false));
-    }
-  }
-
-  private MessageFormatter detectFormat(ConsumerRecord<Bytes, Bytes> msg, boolean isKey) {
-    if (schemaRegistryClient != null) {
-      try {
-        final Optional<String> type = getSchemaFromMessage(msg, isKey)
-            .or(() -> getSchemaBySubject(msg.topic(), isKey).map(SchemaMetadata::getSchemaType));
-        if (type.isPresent()) {
-          if (type.get().equals(MessageFormat.PROTOBUF.name())) {
-            if (tryFormatter(protobufFormatter, msg, isKey).isPresent()) {
-              return protobufFormatter;
-            }
-          } else if (type.get().equals(MessageFormat.AVRO.name())) {
-            if (tryFormatter(avroFormatter, msg, isKey).isPresent()) {
-              return avroFormatter;
-            }
-          } else if (type.get().equals(MessageFormat.JSON.name())) {
-            if (tryFormatter(jsonSchemaMessageFormatter, msg, isKey).isPresent()) {
-              return jsonSchemaMessageFormatter;
-            }
-          } else {
-            throw new IllegalStateException("Unsupported schema type: " + type.get());
-          }
-        }
-      } catch (Exception e) {
-        log.warn("Failed to get Schema for topic {}", msg.topic(), e);
-      }
-    }
-    return stringFormatter;
+  private Optional<MessageFormat> getMessageFormatBySchemaId(int schemaId) {
+    return wrapClientCall(() -> schemaRegistryClient.getSchemaById(schemaId))
+        .map(ParsedSchema::schemaType)
+        .flatMap(MessageFormat::fromString);
   }
 
-  private Optional<MessageFormatter> tryFormatter(
-      MessageFormatter formatter, ConsumerRecord<Bytes, Bytes> msg, boolean isKey) {
-    try {
-      formatter.format(msg.topic(), isKey ? msg.key().get() : msg.value().get());
-      return Optional.of(formatter);
-    } catch (Throwable e) {
-      log.warn("Failed to parse by {} from topic {}", formatter.getClass(), msg.topic(), e);
+  private Optional<Integer> extractSchemaIdFromMsg(ConsumerRecord<Bytes, Bytes> msg, boolean isKey) {
+    Bytes bytes = isKey ? msg.key() : msg.value();
+    ByteBuffer buffer = ByteBuffer.wrap(bytes.get());
+    if (buffer.get() == 0 && buffer.remaining() > 4) {
+      int id = buffer.getInt();
+      return Optional.of(id);
     }
-
     return Optional.empty();
   }
 
-  @SneakyThrows
-  private Optional<String> getSchemaFromMessage(ConsumerRecord<Bytes, Bytes> msg, boolean isKey) {
-    Optional<String> result = Optional.empty();
-    final Bytes value = isKey ? msg.key() : msg.value();
-    if (value != null) {
-      ByteBuffer buffer = ByteBuffer.wrap(value.get());
-      if (buffer.get() == 0) {
-        int id = buffer.getInt();
-        result =
-            Optional.ofNullable(schemaRegistryClient)
-                .flatMap(client -> wrapClientCall(() -> client.getSchemaById(id)))
-                .map(ParsedSchema::schemaType);
-      }
-    }
-    return result;
-  }
-
-  private Optional<Integer> getSchemaId(Bytes value, MessageFormat format) {
-    if (format != MessageFormat.AVRO
-        && format != MessageFormat.PROTOBUF
-        && format != MessageFormat.JSON) {
-      return Optional.empty();
-    }
-    ByteBuffer buffer = ByteBuffer.wrap(value.get());
-    return buffer.get() == 0 ? Optional.of(buffer.getInt()) : Optional.empty();
-  }
-
   @SneakyThrows
   private Optional<SchemaMetadata> getSchemaBySubject(String topic, boolean isKey) {
-    return Optional.ofNullable(schemaRegistryClient)
-        .flatMap(client ->
-            wrapClientCall(() ->
-                client.getLatestSchemaMetadata(schemaSubject(topic, isKey))));
+    return wrapClientCall(() ->
+        schemaRegistryClient.getLatestSchemaMetadata(schemaSubject(topic, isKey)));
   }
 
   @SneakyThrows

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonSchema.java

@@ -24,7 +24,8 @@ public class JsonSchema {
   private final Map<String, FieldSchema> definitions;
   private final List<String> required;
 
-  public String toJson(ObjectMapper mapper) {
+  public String toJson() {
+    final ObjectMapper mapper = new ObjectMapper();
     final ObjectNode objectNode = mapper.createObjectNode();
     objectNode.set("$id", new TextNode(id.toString()));
     objectNode.set("$schema", new TextNode(schema.toString()));

+ 4 - 8
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/ProtobufFileRecordSerDeTest.java

@@ -4,7 +4,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.util.JsonFormat;
 import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
@@ -53,7 +52,7 @@ class ProtobufFileRecordSerDeTest {
         "topic1", "test.Person",
         "topic2", "test.AddressBook");
     var deserializer =
-        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null, new ObjectMapper());
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, null);
     var msg1 = deserializer
         .deserialize(new ConsumerRecord<>("topic1", 1, 0, Bytes.wrap("key".getBytes()),
             Bytes.wrap(personMessage)));
@@ -70,8 +69,7 @@ class ProtobufFileRecordSerDeTest {
   void testNoDefaultMessageName() throws IOException {
     // by default the first message type defined in proto definition is used
     var deserializer =
-        new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null,
-            new ObjectMapper());
+        new ProtobufFileRecordSerDe(protobufSchemaPath, Collections.emptyMap(), null);
     var msg = deserializer
         .deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()),
             Bytes.wrap(personMessage)));
@@ -82,8 +80,7 @@ class ProtobufFileRecordSerDeTest {
   void testDefaultMessageName() throws IOException {
     var messageNameMap = Map.of("topic1", "test.Person");
     var deserializer =
-        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook",
-            new ObjectMapper());
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
     var msg = deserializer
         .deserialize(new ConsumerRecord<>("a_random_topic", 1, 0, Bytes.wrap("key".getBytes()),
             Bytes.wrap(addressBookMessage)));
@@ -94,8 +91,7 @@ class ProtobufFileRecordSerDeTest {
   void testSerialize() throws IOException {
     var messageNameMap = Map.of("topic1", "test.Person");
     var serializer =
-        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook",
-            new ObjectMapper());
+        new ProtobufFileRecordSerDe(protobufSchemaPath, messageNameMap, "test.AddressBook");
     var serialized = serializer.serialize("topic1", "key1", "{\"name\":\"MyName\"}", 0);
     assertNotNull(serialized.value());
   }

+ 4 - 12
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SchemaRegistryRecordDeserializerTest.java → kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SimpleRecordSerDeTest.java

@@ -3,27 +3,19 @@ package com.provectus.kafka.ui.serde;
 import static com.provectus.kafka.ui.serde.RecordSerDe.DeserializedKeyValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.serde.schemaregistry.MessageFormat;
-import com.provectus.kafka.ui.serde.schemaregistry.SchemaRegistryAwareRecordSerDe;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.Test;
 
-class SchemaRegistryRecordDeserializerTest {
+class SimpleRecordSerDeTest {
 
-  private final SchemaRegistryAwareRecordSerDe deserializer =
-      new SchemaRegistryAwareRecordSerDe(
-          KafkaCluster.builder()
-              .schemaNameTemplate("%s-value")
-              .build(), new ObjectMapper()
-      );
+  private final SimpleRecordSerDe serde = new SimpleRecordSerDe();
 
   @Test
   public void shouldDeserializeStringValue() {
     var value = "test";
-    var deserializedRecord = deserializer.deserialize(
+    var deserializedRecord = serde.deserialize(
         new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()),
             Bytes.wrap(value.getBytes())));
     DeserializedKeyValue expected = DeserializedKeyValue.builder()
@@ -37,7 +29,7 @@ class SchemaRegistryRecordDeserializerTest {
 
   @Test
   public void shouldDeserializeNullValueRecordToEmptyMap() {
-    var deserializedRecord = deserializer
+    var deserializedRecord = serde
         .deserialize(new ConsumerRecord<>("topic", 1, 0, Bytes.wrap("key".getBytes()), null));
     DeserializedKeyValue expected = DeserializedKeyValue.builder()
         .key("key")

+ 194 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDeTest.java

@@ -0,0 +1,194 @@
+package com.provectus.kafka.ui.serde.schemaregistry;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+class SchemaRegistryAwareRecordSerDeTest {
+
+  private final SchemaRegistryClient registryClient = mock(SchemaRegistryClient.class);
+
+  private final SchemaRegistryAwareRecordSerDe serde = new SchemaRegistryAwareRecordSerDe(
+      KafkaCluster.builder().build(),
+      registryClient
+  );
+
+  @Nested
+  class Deserialize {
+
+    @Test
+    void callsSchemaFormatterWhenValueHasMagicByteAndValidSchemaId() throws Exception {
+      AvroSchema schema = new AvroSchema(
+          "{"
+              + "  \"type\": \"record\","
+              + "  \"name\": \"TestAvroRecord1\","
+              + "  \"fields\": ["
+              + "    {"
+              + "      \"name\": \"field1\","
+              + "      \"type\": \"string\""
+              + "    },"
+              + "    {"
+              + "      \"name\": \"field2\","
+              + "      \"type\": \"int\""
+              + "    }"
+              + "  ]"
+              + "}"
+      );
+
+      String jsonValueForSchema = "{ \"field1\":\"testStr\", \"field2\": 123 }";
+
+      int schemaId = 1234;
+      when(registryClient.getSchemaById(schemaId)).thenReturn(schema);
+
+      var result = serde.deserialize(
+          new ConsumerRecord<>(
+              "test-topic",
+              1,
+              100,
+              Bytes.wrap("key".getBytes()),
+              bytesWithMagicByteAndSchemaId(schemaId, jsonToAvro(jsonValueForSchema, schema))
+          )
+      );
+
+      // called twice: once by serde code, once by formatter (will be cached)
+      verify(registryClient, times(2)).getSchemaById(schemaId);
+
+      assertThat(result.getKeySchemaId()).isNull();
+      assertThat(result.getKeyFormat()).isEqualTo(MessageFormat.UNKNOWN);
+      assertThat(result.getKey()).isEqualTo("key");
+
+      assertThat(result.getValueSchemaId()).isEqualTo(schemaId + "");
+      assertThat(result.getValueFormat()).isEqualTo(MessageFormat.AVRO);
+      assertJsonsEqual(jsonValueForSchema, result.getValue());
+    }
+
+    @Test
+    void fallsBackToStringFormatterIfValueContainsMagicByteButSchemaNotFound() throws Exception {
+      int nonExistingSchemaId = 12341234;
+      when(registryClient.getSchemaById(nonExistingSchemaId))
+          .thenThrow(new RestClientException("not fount", 404, 404));
+
+      Bytes value = bytesWithMagicByteAndSchemaId(nonExistingSchemaId, "somedata".getBytes());
+      var result = serde.deserialize(
+          new ConsumerRecord<>(
+              "test-topic",
+              1,
+              100,
+              Bytes.wrap("key".getBytes()),
+              value
+          )
+      );
+
+      // called to get schema by id - will throw not found
+      verify(registryClient, times(1)).getSchemaById(nonExistingSchemaId);
+
+      assertThat(result.getKeySchemaId()).isNull();
+      assertThat(result.getKeyFormat()).isEqualTo(MessageFormat.UNKNOWN);
+      assertThat(result.getKey()).isEqualTo("key");
+
+      assertThat(result.getValueSchemaId()).isNull();
+      assertThat(result.getValueFormat()).isEqualTo(MessageFormat.UNKNOWN);
+      assertThat(result.getValue()).isEqualTo(new String(value.get()));
+    }
+
+    @Test
+    void fallsBackToStringFormatterIfMagicByteAndSchemaIdFoundButFormatterFailed() throws Exception {
+      int schemaId = 1234;
+      when(registryClient.getSchemaById(schemaId))
+          .thenReturn(new AvroSchema("{ \"type\": \"string\" }"));
+
+      // will cause exception in avro deserializer
+      Bytes nonAvroValue =  bytesWithMagicByteAndSchemaId(schemaId, "123".getBytes());
+      var result = serde.deserialize(
+          new ConsumerRecord<>(
+              "test-topic",
+              1,
+              100,
+              Bytes.wrap("key".getBytes()),
+             nonAvroValue
+          )
+      );
+
+      // called twice: once by serde code, once by formatter (will be cached)
+      verify(registryClient, times(2)).getSchemaById(schemaId);
+
+      assertThat(result.getKeySchemaId()).isNull();
+      assertThat(result.getKeyFormat()).isEqualTo(MessageFormat.UNKNOWN);
+      assertThat(result.getKey()).isEqualTo("key");
+
+      assertThat(result.getValueSchemaId()).isNull();
+      assertThat(result.getValueFormat()).isEqualTo(MessageFormat.UNKNOWN);
+      assertThat(result.getValue()).isEqualTo(new String(nonAvroValue.get()));
+    }
+
+    @Test
+    void useStringFormatterWithoutRegistryManipulationIfMagicByteNotSet() {
+      var result = serde.deserialize(
+          new ConsumerRecord<>(
+              "test-topic",
+              1,
+              100,
+              Bytes.wrap("key".getBytes()),
+              Bytes.wrap("val".getBytes())
+          )
+      );
+
+      verifyZeroInteractions(registryClient);
+
+      assertThat(result.getKeySchemaId()).isNull();
+      assertThat(result.getKeyFormat()).isEqualTo(MessageFormat.UNKNOWN);
+      assertThat(result.getKey()).isEqualTo("key");
+
+      assertThat(result.getValueSchemaId()).isNull();
+      assertThat(result.getValueFormat()).isEqualTo(MessageFormat.UNKNOWN);
+      assertThat(result.getValue()).isEqualTo("val");
+    }
+
+    private void assertJsonsEqual(String expected, String actual) throws JsonProcessingException {
+      var mapper = new JsonMapper();
+      assertThat(mapper.readTree(actual)).isEqualTo(mapper.readTree(expected));
+    }
+
+    private Bytes bytesWithMagicByteAndSchemaId(int schemaId, byte[] body) {
+      return new Bytes(
+          ByteBuffer.allocate(1 + 4 + body.length)
+              .put((byte) 0)
+              .putInt(schemaId)
+              .put(body)
+              .array()
+      );
+    }
+
+    private byte[] jsonToAvro(String json, AvroSchema schema) throws IOException {
+      GenericDatumWriter<Object> writer = new GenericDatumWriter<>(schema.rawSchema());
+      ByteArrayOutputStream output = new ByteArrayOutputStream();
+      Encoder encoder = EncoderFactory.get().binaryEncoder(output, null);
+      writer.write(AvroSchemaUtils.toObject(json, schema), encoder);
+      encoder.flush();
+      return output.toByteArray();
+    }
+  }
+
+
+}

+ 2 - 4
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/AvroJsonSchemaConverterTest.java

@@ -103,7 +103,7 @@ public class AvroJsonSchemaConverterTest {
     Assertions.assertEquals(
         om.readTree(expected),
         om.readTree(
-            convertRecord.toJson(om)
+            convertRecord.toJson()
         )
     );
 
@@ -160,9 +160,7 @@ public class AvroJsonSchemaConverterTest {
     final JsonSchema convert = converter.convert(basePath, recordSchema);
     Assertions.assertEquals(
         objectMapper.readTree(expected),
-        objectMapper.readTree(
-            convert.toJson(objectMapper)
-        )
+        objectMapper.readTree(convert.toJson())
     );
 
 

+ 1 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/jsonschema/ProtobufSchemaConverterTest.java

@@ -64,9 +64,7 @@ public class ProtobufSchemaConverterTest {
     ObjectMapper om = new ObjectMapper();
     Assertions.assertEquals(
         om.readTree(expected),
-        om.readTree(
-            convert.toJson(om)
-        )
+        om.readTree(convert.toJson())
     );
   }
 }