diff --git a/docker/kafka-clusters-only.yaml b/docker/kafka-clusters-only.yaml
index fc51d9d3a1..f37569d4d7 100644
--- a/docker/kafka-clusters-only.yaml
+++ b/docker/kafka-clusters-only.yaml
@@ -95,4 +95,23 @@ services:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
networks:
- - default
\ No newline at end of file
+ - default
+
+ schemaregistry0:
+ image: confluentinc/cp-schema-registry:5.1.0
+ depends_on:
+ - zookeeper0
+ - kafka0
+ - kafka01
+ ports:
+ - 8085:8085
+ environment:
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
+ SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
+ SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+ SCHEMA_REGISTRY_HOST_NAME: schemaregistry
+ SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
+
+ SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+ SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+ SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
diff --git a/docker/kafka-ui.yaml b/docker/kafka-ui.yaml
index 54adc33706..b008bd1fc7 100644
--- a/docker/kafka-ui.yaml
+++ b/docker/kafka-ui.yaml
@@ -12,6 +12,7 @@ services:
- zookeeper1
- kafka0
- kafka1
+ - schemaregistry0
command: [ "java", "-jar", "kafka-ui-api.jar", "--spring.profiles.active=sdp"]
zookeeper0:
@@ -53,3 +54,19 @@ services:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9997
+
+ schemaregistry0:
+ image: confluentinc/cp-schema-registry:5.1.0
+ depends_on:
+ - zookeeper0
+ - kafka0
+ environment:
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
+ SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181
+ SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+ SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
+ SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
+
+ SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+ SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+ SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index 5aa24fcb2b..c97c23104b 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -69,6 +69,21 @@
mapstruct
${org.mapstruct.version}
+
+ io.confluent
+ kafka-schema-registry-client
+ ${confluent.version}
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${confluent.version}
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
org.springframework.boot
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
index a19be53800..f77ee01b15 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java
@@ -19,5 +19,7 @@ public class ClustersProperties {
String name;
String bootstrapServers;
String zookeeper;
+ String schemaRegistry;
+ String schemaNameTemplate = "%s-value";
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java
new file mode 100644
index 0000000000..4811d96031
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java
@@ -0,0 +1,43 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import lombok.RequiredArgsConstructor;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.stereotype.Component;
+
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+
+@Component
+@RequiredArgsConstructor
+public class DeserializationService {
+
+ private final ClustersStorage clustersStorage;
+ private Map clusterDeserializers;
+
+ @PostConstruct
+ public void init() {
+ this.clusterDeserializers = clustersStorage.getKafkaClusters().stream()
+ .collect(Collectors.toMap(
+ KafkaCluster::getName,
+ this::createRecordDeserializerForCluster
+ ));
+ }
+
+ private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) {
+ if (StringUtils.isEmpty(cluster.getSchemaRegistry())) {
+ return new SimpleRecordDeserializer();
+ } else {
+ return new SchemaRegistryRecordDeserializer(cluster);
+ }
+ }
+
+ public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) {
+ return clusterDeserializers.get(cluster.getName());
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/RecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/RecordDeserializer.java
new file mode 100644
index 0000000000..e79896e878
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/RecordDeserializer.java
@@ -0,0 +1,9 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+
+public interface RecordDeserializer {
+
+ Object deserialize(ConsumerRecord record);
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java
new file mode 100644
index 0000000000..9ac089084d
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java
@@ -0,0 +1,131 @@
+package com.provectus.kafka.ui.cluster.deserialization;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+
+import io.confluent.kafka.schemaregistry.SchemaProvider;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
+import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+
+@Log4j2
+@RequiredArgsConstructor
+public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
+
+ private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;
+
+ private final KafkaCluster cluster;
+ private final SchemaRegistryClient schemaRegistryClient;
+ private KafkaAvroDeserializer avroDeserializer;
+ private ObjectMapper objectMapper;
+ private StringDeserializer stringDeserializer;
+
+ private final Map topicFormatMap = new ConcurrentHashMap<>();
+
+ public SchemaRegistryRecordDeserializer(KafkaCluster cluster) {
+ this.cluster = cluster;
+
+ List endpoints = Collections.singletonList(cluster.getSchemaRegistry());
+ List providers = Collections.singletonList(new AvroSchemaProvider());
+ this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap());
+
+ this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
+ this.objectMapper = new ObjectMapper();
+ this.stringDeserializer = new StringDeserializer();
+ }
+
+ public Object deserialize(ConsumerRecord record) {
+ MessageFormat format = getMessageFormat(record);
+
+ try {
+ Object parsedValue;
+ switch (format) {
+ case AVRO:
+ parsedValue = parseAvroRecord(record);
+ break;
+ case JSON:
+ parsedValue = parseJsonRecord(record);
+ break;
+ case STRING:
+ parsedValue = parseStringRecord(record);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown message format " + format + " for topic " + record.topic());
+ }
+ return parsedValue;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to parse record from topic " + record.topic(), e);
+ }
+ }
+
+ private MessageFormat getMessageFormat(ConsumerRecord record) {
+ return topicFormatMap.computeIfAbsent(record.topic(), k -> detectFormat(record));
+ }
+
+ private MessageFormat detectFormat(ConsumerRecord record) {
+ String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic());
+ try {
+ schemaRegistryClient.getAllVersions(avroSchema);
+ return MessageFormat.AVRO;
+ } catch (RestClientException | IOException e) {
+ log.info("Failed to get Avro schema for topic {}", record.topic());
+ }
+
+ try {
+ parseJsonRecord(record);
+ return MessageFormat.JSON;
+ } catch (IOException e) {
+ log.info("Failed to parse json from topic {}", record.topic());
+ }
+
+ return MessageFormat.STRING;
+ }
+
+ private Object parseAvroRecord(ConsumerRecord record) throws IOException {
+ String topic = record.topic();
+ byte[] valueBytes = record.value().get();
+ GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
+ byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
+ return parseJson(bytes);
+ }
+
+ private Object parseJsonRecord(ConsumerRecord record) throws IOException {
+ byte[] valueBytes = record.value().get();
+ return parseJson(valueBytes);
+ }
+
+ private Object parseJson(byte[] bytes) throws IOException {
+ return objectMapper.readValue(bytes, new TypeReference