diff --git a/Jenkinsfile b/Jenkinsfile
index a52994d995..f9fcca5dab 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -66,7 +66,7 @@ spec:
expression { return env.GIT_BRANCH == 'master'; }
}
steps {
- sh 'git merge master'
+ sh 'git merge origin/master'
}
}
stage('Remove SNAPSHOT from version') {
diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index 5cd873cdad..9235bcb983 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -5,7 +5,7 @@
kafka-ui
com.provectus
- 0.0.6-SNAPSHOT
+ 0.0.8-SNAPSHOT
4.0.0
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java
index 4811d96031..1fdeddca24 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java
@@ -1,25 +1,24 @@
package com.provectus.kafka.ui.cluster.deserialization;
-import lombok.RequiredArgsConstructor;
-
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import javax.annotation.PostConstruct;
-
-import org.apache.commons.lang3.StringUtils;
-import org.springframework.stereotype.Component;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.Map;
+import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
public class DeserializationService {
private final ClustersStorage clustersStorage;
+ private final ObjectMapper objectMapper;
private Map clusterDeserializers;
+
@PostConstruct
public void init() {
this.clusterDeserializers = clustersStorage.getKafkaClusters().stream()
@@ -30,11 +29,7 @@ public class DeserializationService {
}
private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) {
- if (StringUtils.isEmpty(cluster.getSchemaRegistry())) {
- return new SimpleRecordDeserializer();
- } else {
- return new SchemaRegistryRecordDeserializer(cluster);
- }
+ return new SchemaRegistryRecordDeserializer(cluster, objectMapper);
}
public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) {
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java
index 06b2bf1047..5b3393a39d 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java
@@ -1,55 +1,56 @@
package com.provectus.kafka.ui.cluster.deserialization;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.log4j.Log4j2;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Bytes;
-
-import io.confluent.kafka.schemaregistry.SchemaProvider;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import lombok.extern.log4j.Log4j2;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
@Log4j2
-@RequiredArgsConstructor
public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;
private final KafkaCluster cluster;
- private final SchemaRegistryClient schemaRegistryClient;
- private KafkaAvroDeserializer avroDeserializer;
- private ObjectMapper objectMapper;
- private StringDeserializer stringDeserializer;
+ private final SchemaRegistryClient schemaRegistryClient;
+ private final KafkaAvroDeserializer avroDeserializer;
+ private final ObjectMapper objectMapper;
+ private final StringDeserializer stringDeserializer;
private final Map topicFormatMap = new ConcurrentHashMap<>();
- public SchemaRegistryRecordDeserializer(KafkaCluster cluster) {
+ public SchemaRegistryRecordDeserializer(KafkaCluster cluster, ObjectMapper objectMapper) {
this.cluster = cluster;
+ this.objectMapper = objectMapper;
- List endpoints = Collections.singletonList(cluster.getSchemaRegistry());
- List providers = Collections.singletonList(new AvroSchemaProvider());
- this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap());
+ this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e ->
+ new CachedSchemaRegistryClient(
+ Collections.singletonList(e),
+ CLIENT_IDENTITY_MAP_CAPACITY,
+ Collections.singletonList(new AvroSchemaProvider()),
+ Collections.emptyMap()
+ )
+ ).orElse(null);
- this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
- this.objectMapper = new ObjectMapper();
+ this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient)
+ .map(KafkaAvroDeserializer::new)
+ .orElse(null);
this.stringDeserializer = new StringDeserializer();
}
@@ -83,11 +84,13 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
private MessageFormat detectFormat(ConsumerRecord record) {
String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic());
- try {
- schemaRegistryClient.getAllVersions(avroSchema);
- return MessageFormat.AVRO;
- } catch (RestClientException | IOException e) {
- log.info("Failed to get Avro schema for topic {}", record.topic());
+ if (schemaRegistryClient != null) {
+ try {
+ schemaRegistryClient.getAllVersions(avroSchema);
+ return MessageFormat.AVRO;
+ } catch (RestClientException | IOException e) {
+ log.info("Failed to get Avro schema for topic {}", record.topic());
+ }
}
try {
@@ -102,7 +105,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
private Object parseAvroRecord(ConsumerRecord record) throws IOException {
String topic = record.topic();
- if (record.value()!=null) {
+ if (record.value()!=null && avroDeserializer !=null) {
byte[] valueBytes = record.value().get();
GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
diff --git a/kafka-ui-contract/pom.xml b/kafka-ui-contract/pom.xml
index 3eb7fdc8fe..476e0c6d8e 100644
--- a/kafka-ui-contract/pom.xml
+++ b/kafka-ui-contract/pom.xml
@@ -5,7 +5,7 @@
kafka-ui
com.provectus
- 0.0.6-SNAPSHOT
+ 0.0.8-SNAPSHOT
4.0.0
diff --git a/pom.xml b/pom.xml
index 9eb609c7c6..dec7fd3bdd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
com.provectus
kafka-ui
- 0.0.6-SNAPSHOT
+ 0.0.8-SNAPSHOT
kafka-ui
Kafka metrics for UI panel