Merge branch 'master' of github.com:provectus/kafka-ui into feature/cluster-status-in-sidebar

This commit is contained in:
Sofia Shnaidman 2020-11-03 00:13:27 +03:00
commit b09af5c185
6 changed files with 53 additions and 55 deletions

2
Jenkinsfile vendored
View file

@ -66,7 +66,7 @@ spec:
expression { return env.GIT_BRANCH == 'master'; } expression { return env.GIT_BRANCH == 'master'; }
} }
steps { steps {
sh 'git merge master' sh 'git merge origin/master'
} }
} }
stage('Remove SNAPSHOT from version') { stage('Remove SNAPSHOT from version') {

View file

@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>kafka-ui</artifactId> <artifactId>kafka-ui</artifactId>
<groupId>com.provectus</groupId> <groupId>com.provectus</groupId>
<version>0.0.6-SNAPSHOT</version> <version>0.0.8-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View file

@ -1,25 +1,24 @@
package com.provectus.kafka.ui.cluster.deserialization; package com.provectus.kafka.ui.cluster.deserialization;
import lombok.RequiredArgsConstructor; import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.stream.Collectors;
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
public class DeserializationService { public class DeserializationService {
private final ClustersStorage clustersStorage; private final ClustersStorage clustersStorage;
private final ObjectMapper objectMapper;
private Map<String, RecordDeserializer> clusterDeserializers; private Map<String, RecordDeserializer> clusterDeserializers;
@PostConstruct @PostConstruct
public void init() { public void init() {
this.clusterDeserializers = clustersStorage.getKafkaClusters().stream() this.clusterDeserializers = clustersStorage.getKafkaClusters().stream()
@ -30,11 +29,7 @@ public class DeserializationService {
} }
private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) { private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) {
if (StringUtils.isEmpty(cluster.getSchemaRegistry())) { return new SchemaRegistryRecordDeserializer(cluster, objectMapper);
return new SimpleRecordDeserializer();
} else {
return new SchemaRegistryRecordDeserializer(cluster);
}
} }
public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) { public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) {

View file

@ -1,55 +1,56 @@
package com.provectus.kafka.ui.cluster.deserialization; package com.provectus.kafka.ui.cluster.deserialization;
import lombok.RequiredArgsConstructor; import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.log4j.Log4j2; import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Collections;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@Log4j2 @Log4j2
@RequiredArgsConstructor
public class SchemaRegistryRecordDeserializer implements RecordDeserializer { public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100; private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;
private final KafkaCluster cluster; private final KafkaCluster cluster;
private final SchemaRegistryClient schemaRegistryClient; private final SchemaRegistryClient schemaRegistryClient;
private KafkaAvroDeserializer avroDeserializer; private final KafkaAvroDeserializer avroDeserializer;
private ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private StringDeserializer stringDeserializer; private final StringDeserializer stringDeserializer;
private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>(); private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>();
public SchemaRegistryRecordDeserializer(KafkaCluster cluster) { public SchemaRegistryRecordDeserializer(KafkaCluster cluster, ObjectMapper objectMapper) {
this.cluster = cluster; this.cluster = cluster;
this.objectMapper = objectMapper;
List<String> endpoints = Collections.singletonList(cluster.getSchemaRegistry()); this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e ->
List<SchemaProvider> providers = Collections.singletonList(new AvroSchemaProvider()); new CachedSchemaRegistryClient(
this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap()); Collections.singletonList(e),
CLIENT_IDENTITY_MAP_CAPACITY,
Collections.singletonList(new AvroSchemaProvider()),
Collections.emptyMap()
)
).orElse(null);
this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient); this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient)
this.objectMapper = new ObjectMapper(); .map(KafkaAvroDeserializer::new)
.orElse(null);
this.stringDeserializer = new StringDeserializer(); this.stringDeserializer = new StringDeserializer();
} }
@ -83,11 +84,13 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) { private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic()); String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic());
try { if (schemaRegistryClient != null) {
schemaRegistryClient.getAllVersions(avroSchema); try {
return MessageFormat.AVRO; schemaRegistryClient.getAllVersions(avroSchema);
} catch (RestClientException | IOException e) { return MessageFormat.AVRO;
log.info("Failed to get Avro schema for topic {}", record.topic()); } catch (RestClientException | IOException e) {
log.info("Failed to get Avro schema for topic {}", record.topic());
}
} }
try { try {
@ -102,7 +105,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException { private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
String topic = record.topic(); String topic = record.topic();
if (record.value()!=null) { if (record.value()!=null && avroDeserializer !=null) {
byte[] valueBytes = record.value().get(); byte[] valueBytes = record.value().get();
GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes); GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
byte[] bytes = AvroSchemaUtils.toJson(avroRecord); byte[] bytes = AvroSchemaUtils.toJson(avroRecord);

View file

@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>kafka-ui</artifactId> <artifactId>kafka-ui</artifactId>
<groupId>com.provectus</groupId> <groupId>com.provectus</groupId>
<version>0.0.6-SNAPSHOT</version> <version>0.0.8-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View file

@ -49,7 +49,7 @@
<groupId>com.provectus</groupId> <groupId>com.provectus</groupId>
<artifactId>kafka-ui</artifactId> <artifactId>kafka-ui</artifactId>
<version>0.0.6-SNAPSHOT</version> <version>0.0.8-SNAPSHOT</version>
<name>kafka-ui</name> <name>kafka-ui</name>
<description>Kafka metrics for UI panel</description> <description>Kafka metrics for UI panel</description>
</project> </project>