diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java index 4811d96031..1fdeddca24 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java @@ -1,25 +1,24 @@ package com.provectus.kafka.ui.cluster.deserialization; -import lombok.RequiredArgsConstructor; - -import java.util.Map; -import java.util.stream.Collectors; - -import javax.annotation.PostConstruct; - -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Component; - +import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Map; +import java.util.stream.Collectors; @Component @RequiredArgsConstructor public class DeserializationService { private final ClustersStorage clustersStorage; + private final ObjectMapper objectMapper; private Map clusterDeserializers; + @PostConstruct public void init() { this.clusterDeserializers = clustersStorage.getKafkaClusters().stream() @@ -30,11 +29,7 @@ public class DeserializationService { } private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) { - if (StringUtils.isEmpty(cluster.getSchemaRegistry())) { - return new SimpleRecordDeserializer(); - } else { - return new SchemaRegistryRecordDeserializer(cluster); - } + return new SchemaRegistryRecordDeserializer(cluster, objectMapper); } public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java index 06b2bf1047..5b3393a39d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java @@ -1,55 +1,56 @@ package com.provectus.kafka.ui.cluster.deserialization; -import lombok.RequiredArgsConstructor; -import lombok.extern.log4j.Log4j2; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.Bytes; - -import io.confluent.kafka.schemaregistry.SchemaProvider; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.cluster.model.KafkaCluster; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import lombok.extern.log4j.Log4j2; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Bytes; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; @Log4j2 -@RequiredArgsConstructor public class SchemaRegistryRecordDeserializer implements RecordDeserializer { private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100; private final KafkaCluster cluster; - private final SchemaRegistryClient schemaRegistryClient; - private KafkaAvroDeserializer avroDeserializer; - private ObjectMapper objectMapper; - private StringDeserializer stringDeserializer; + private final SchemaRegistryClient schemaRegistryClient; + private final KafkaAvroDeserializer avroDeserializer; + private final ObjectMapper objectMapper; + private final StringDeserializer stringDeserializer; private final Map topicFormatMap = new ConcurrentHashMap<>(); - public SchemaRegistryRecordDeserializer(KafkaCluster cluster) { + public SchemaRegistryRecordDeserializer(KafkaCluster cluster, ObjectMapper objectMapper) { this.cluster = cluster; + this.objectMapper = objectMapper; - List endpoints = Collections.singletonList(cluster.getSchemaRegistry()); - List providers = Collections.singletonList(new AvroSchemaProvider()); - this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap()); + this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e -> + new CachedSchemaRegistryClient( + Collections.singletonList(e), + CLIENT_IDENTITY_MAP_CAPACITY, + Collections.singletonList(new AvroSchemaProvider()), + Collections.emptyMap() + ) + ).orElse(null); - this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); - this.objectMapper = new ObjectMapper(); + this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient) + .map(KafkaAvroDeserializer::new) + .orElse(null); this.stringDeserializer = new StringDeserializer(); } @@ -83,11 +84,13 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { private MessageFormat detectFormat(ConsumerRecord record) { String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic()); - try { - schemaRegistryClient.getAllVersions(avroSchema); - return MessageFormat.AVRO; - } catch (RestClientException | IOException e) { - log.info("Failed to get Avro schema for topic {}", record.topic()); + if (schemaRegistryClient != null) { + try { + schemaRegistryClient.getAllVersions(avroSchema); + return MessageFormat.AVRO; + } catch (RestClientException | IOException e) { + log.info("Failed to get Avro schema for topic {}", record.topic()); + } } try { @@ -102,7 +105,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { private Object parseAvroRecord(ConsumerRecord record) throws IOException { String topic = record.topic(); - if (record.value()!=null) { + if (record.value()!=null && avroDeserializer !=null) { byte[] valueBytes = record.value().get(); GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes); byte[] bytes = AvroSchemaUtils.toJson(avroRecord);