From f6253b6bfaa249236ac1b4f0505f4b7af8f89116 Mon Sep 17 00:00:00 2001 From: azatsafin Date: Fri, 30 Oct 2020 17:42:38 +0300 Subject: [PATCH 1/4] Update Jenkinsfile --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index a52994d995..f9fcca5dab 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -66,7 +66,7 @@ spec: expression { return env.GIT_BRANCH == 'master'; } } steps { - sh 'git merge master' + sh 'git merge origin/master' } } stage('Remove SNAPSHOT from version') { From 2a743a5bf4b27a6cc9cb857bd178c2e724d98821 Mon Sep 17 00:00:00 2001 From: 66632 <> Date: Fri, 30 Oct 2020 14:52:14 +0000 Subject: [PATCH 2/4] Increased version in pom.xml --- kafka-ui-api/pom.xml | 2 +- kafka-ui-contract/pom.xml | 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 5cd873cdad..f27ee0552d 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -5,7 +5,7 @@ kafka-ui com.provectus - 0.0.6-SNAPSHOT + 0.0.7-SNAPSHOT 4.0.0 diff --git a/kafka-ui-contract/pom.xml b/kafka-ui-contract/pom.xml index 3eb7fdc8fe..b73f7b8ef9 100644 --- a/kafka-ui-contract/pom.xml +++ b/kafka-ui-contract/pom.xml @@ -5,7 +5,7 @@ kafka-ui com.provectus - 0.0.6-SNAPSHOT + 0.0.7-SNAPSHOT 4.0.0 diff --git a/pom.xml b/pom.xml index 9eb609c7c6..cd49daaa75 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ com.provectus kafka-ui - 0.0.6-SNAPSHOT + 0.0.7-SNAPSHOT kafka-ui Kafka metrics for UI panel From e9a6b52f2a2e852d67071cc3c6cf2cc957bc5388 Mon Sep 17 00:00:00 2001 From: 66632 <> Date: Fri, 30 Oct 2020 15:00:38 +0000 Subject: [PATCH 3/4] Increased version in pom.xml --- kafka-ui-api/pom.xml | 2 +- kafka-ui-contract/pom.xml | 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index f27ee0552d..9235bcb983 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -5,7 +5,7 @@ kafka-ui com.provectus - 0.0.7-SNAPSHOT + 0.0.8-SNAPSHOT 4.0.0 diff --git a/kafka-ui-contract/pom.xml b/kafka-ui-contract/pom.xml index b73f7b8ef9..476e0c6d8e 100644 --- a/kafka-ui-contract/pom.xml +++ b/kafka-ui-contract/pom.xml @@ -5,7 +5,7 @@ kafka-ui com.provectus - 0.0.7-SNAPSHOT + 0.0.8-SNAPSHOT 4.0.0 diff --git a/pom.xml b/pom.xml index cd49daaa75..dec7fd3bdd 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ com.provectus kafka-ui - 0.0.7-SNAPSHOT + 0.0.8-SNAPSHOT kafka-ui Kafka metrics for UI panel From 88cc301bb6db74ce3279e80ddfc8b0d25f601cdc Mon Sep 17 00:00:00 2001 From: German Osin Date: Mon, 2 Nov 2020 15:21:07 +0300 Subject: [PATCH 4/4] Added common deserializer (#109) --- .../DeserializationService.java | 25 +++---- .../SchemaRegistryRecordDeserializer.java | 75 ++++++++++--------- 2 files changed, 49 insertions(+), 51 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java index 4811d96031..1fdeddca24 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java @@ -1,25 +1,24 @@ package com.provectus.kafka.ui.cluster.deserialization; -import lombok.RequiredArgsConstructor; - -import java.util.Map; -import java.util.stream.Collectors; - -import javax.annotation.PostConstruct; - -import org.apache.commons.lang3.StringUtils; -import org.springframework.stereotype.Component; - +import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Map; +import java.util.stream.Collectors; @Component @RequiredArgsConstructor public class DeserializationService { private final ClustersStorage clustersStorage; + private final ObjectMapper objectMapper; private Map clusterDeserializers; + @PostConstruct public void init() { this.clusterDeserializers = clustersStorage.getKafkaClusters().stream() @@ -30,11 +29,7 @@ public class DeserializationService { } private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) { - if (StringUtils.isEmpty(cluster.getSchemaRegistry())) { - return new SimpleRecordDeserializer(); - } else { - return new SchemaRegistryRecordDeserializer(cluster); - } + return new SchemaRegistryRecordDeserializer(cluster, objectMapper); } public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java index 06b2bf1047..5b3393a39d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java @@ -1,55 +1,56 @@ package com.provectus.kafka.ui.cluster.deserialization; -import lombok.RequiredArgsConstructor; -import lombok.extern.log4j.Log4j2; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.Bytes; - -import io.confluent.kafka.schemaregistry.SchemaProvider; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.cluster.model.KafkaCluster; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import lombok.extern.log4j.Log4j2; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Bytes; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; @Log4j2 -@RequiredArgsConstructor public class SchemaRegistryRecordDeserializer implements RecordDeserializer { private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100; private final KafkaCluster cluster; - private final SchemaRegistryClient schemaRegistryClient; - private KafkaAvroDeserializer avroDeserializer; - private ObjectMapper objectMapper; - private StringDeserializer stringDeserializer; + private final SchemaRegistryClient schemaRegistryClient; + private final KafkaAvroDeserializer avroDeserializer; + private final ObjectMapper objectMapper; + private final StringDeserializer stringDeserializer; private final Map topicFormatMap = new ConcurrentHashMap<>(); - public SchemaRegistryRecordDeserializer(KafkaCluster cluster) { + public SchemaRegistryRecordDeserializer(KafkaCluster cluster, ObjectMapper objectMapper) { this.cluster = cluster; + this.objectMapper = objectMapper; - List endpoints = Collections.singletonList(cluster.getSchemaRegistry()); - List providers = Collections.singletonList(new AvroSchemaProvider()); - this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap()); + this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e -> + new CachedSchemaRegistryClient( + Collections.singletonList(e), + CLIENT_IDENTITY_MAP_CAPACITY, + Collections.singletonList(new AvroSchemaProvider()), + Collections.emptyMap() + ) + ).orElse(null); - this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); - this.objectMapper = new ObjectMapper(); + this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient) + .map(KafkaAvroDeserializer::new) + .orElse(null); this.stringDeserializer = new StringDeserializer(); } @@ -83,11 +84,13 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { private MessageFormat detectFormat(ConsumerRecord record) { String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic()); - try { - schemaRegistryClient.getAllVersions(avroSchema); - return MessageFormat.AVRO; - } catch (RestClientException | IOException e) { - log.info("Failed to get Avro schema for topic {}", record.topic()); + if (schemaRegistryClient != null) { + try { + schemaRegistryClient.getAllVersions(avroSchema); + return MessageFormat.AVRO; + } catch (RestClientException | IOException e) { + log.info("Failed to get Avro schema for topic {}", record.topic()); + } } try { @@ -102,7 +105,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { private Object parseAvroRecord(ConsumerRecord record) throws IOException { String topic = record.topic(); - if (record.value()!=null) { + if (record.value()!=null && avroDeserializer !=null) { byte[] valueBytes = record.value().get(); GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes); byte[] bytes = AvroSchemaUtils.toJson(avroRecord);