Protobuf support (#116)

* Added schema registry protobuf support

* Fixed detection

* Added support for local protobuf file

* Fixed minor issues
This commit is contained in:
German Osin 2020-11-10 09:43:28 +03:00 committed by GitHub
parent 3c196587a3
commit 238631ac0c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 130 additions and 30 deletions

View file

@ -72,6 +72,8 @@ services:
schemaregistry0: schemaregistry0:
image: confluentinc/cp-schema-registry:5.1.0 image: confluentinc/cp-schema-registry:5.1.0
ports:
- 8085:8085
depends_on: depends_on:
- zookeeper0 - zookeeper0
- kafka0 - kafka0

View file

@ -87,6 +87,12 @@
<artifactId>kafka-avro-serializer</artifactId> <artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version> <version>${confluent.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.avro</groupId> <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId> <artifactId>avro</artifactId>

View file

@ -21,6 +21,8 @@ public class ClustersProperties {
String zookeeper; String zookeeper;
String schemaRegistry; String schemaRegistry;
String schemaNameTemplate = "%s-value"; String schemaNameTemplate = "%s-value";
String protobufFile;
String protobufMessageName;
int jmxPort; int jmxPort;
} }
} }

View file

@ -29,7 +29,15 @@ public class DeserializationService {
} }
private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) { private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) {
return new SchemaRegistryRecordDeserializer(cluster, objectMapper); try {
if (cluster.getProtobufFile()!=null) {
return new ProtobufFileRecordDeserializer(cluster.getProtobufFile(), cluster.getProtobufMessageName(), objectMapper);
} else {
return new SchemaRegistryRecordDeserializer(cluster, objectMapper);
}
} catch (Throwable e) {
throw new RuntimeException("Can't init deserializer", e);
}
} }
public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) { public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) {

View file

@ -0,0 +1,45 @@
package com.provectus.kafka.ui.cluster.deserialization;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.DynamicMessage;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.stream.Collectors;
public class ProtobufFileRecordDeserializer implements RecordDeserializer {
private final ProtobufSchema protobufSchema;
private final ObjectMapper objectMapper;
public ProtobufFileRecordDeserializer(Path protobufSchemaPath, String messageName, ObjectMapper objectMapper) throws IOException {
this.objectMapper = objectMapper;
final String schemaString = Files.lines(protobufSchemaPath).collect(Collectors.joining());
this.protobufSchema = new ProtobufSchema(schemaString).copy(messageName);
}
@Override
public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
try {
final DynamicMessage message = DynamicMessage.parseFrom(
protobufSchema.toDescriptor(),
new ByteArrayInputStream(record.value().get())
);
byte[] bytes = ProtobufSchemaUtils.toJson(message);
return parseJson(bytes);
} catch (Throwable e) {
throw new RuntimeException("Failed to parse record from topic " + record.topic(), e);
}
}
private Object parseJson(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, new TypeReference<Map<String, Object>>() {});
}
}

View file

@ -2,13 +2,17 @@ package com.provectus.kafka.ui.cluster.deserialization;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Message;
import com.provectus.kafka.ui.cluster.model.KafkaCluster; import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -16,10 +20,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@Log4j2 @Log4j2
@ -30,6 +31,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
private final KafkaCluster cluster; private final KafkaCluster cluster;
private final SchemaRegistryClient schemaRegistryClient; private final SchemaRegistryClient schemaRegistryClient;
private final KafkaAvroDeserializer avroDeserializer; private final KafkaAvroDeserializer avroDeserializer;
private final KafkaProtobufDeserializer<?> protobufDeserializer;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final StringDeserializer stringDeserializer; private final StringDeserializer stringDeserializer;
@ -51,6 +53,9 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient) this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient)
.map(KafkaAvroDeserializer::new) .map(KafkaAvroDeserializer::new)
.orElse(null); .orElse(null);
this.protobufDeserializer = Optional.ofNullable(this.schemaRegistryClient)
.map(KafkaProtobufDeserializer::new)
.orElse(null);
this.stringDeserializer = new StringDeserializer(); this.stringDeserializer = new StringDeserializer();
} }
@ -63,6 +68,9 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
case AVRO: case AVRO:
parsedValue = parseAvroRecord(record); parsedValue = parseAvroRecord(record);
break; break;
case PROTOBUF:
parsedValue = parseProtobufRecord(record);
break;
case JSON: case JSON:
parsedValue = parseJsonRecord(record); parsedValue = parseJsonRecord(record);
break; break;
@ -83,13 +91,38 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
} }
private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) { private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic()); String schemaName = String.format(cluster.getSchemaNameTemplate(), record.topic());
if (schemaRegistryClient != null) { if (schemaRegistryClient != null) {
try { try {
schemaRegistryClient.getAllVersions(avroSchema); final List<Integer> versions = schemaRegistryClient.getAllVersions(schemaName);
return MessageFormat.AVRO; if (!versions.isEmpty()) {
final Integer version = versions.iterator().next();
final Schema schema = schemaRegistryClient.getByVersion(record.topic(), version, false);
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
try {
protobufDeserializer.deserialize(record.topic(), record.value().get());
return MessageFormat.PROTOBUF;
} catch (Throwable e) {
log.info("Failed to get Protobuf schema for topic {}", record.topic(), e);
}
} else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) {
try {
avroDeserializer.deserialize(record.topic(), record.value().get());
return MessageFormat.AVRO;
} catch (Throwable e) {
log.info("Failed to get Avro schema for topic {}", record.topic(), e);
}
} else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) {
try {
parseJsonRecord(record);
return MessageFormat.JSON;
} catch (IOException e) {
log.info("Failed to parse json from topic {}", record.topic());
}
}
}
} catch (RestClientException | IOException e) { } catch (RestClientException | IOException e) {
log.info("Failed to get Avro schema for topic {}", record.topic()); log.warn("Failed to get Schema for topic {}", record.topic(), e);
} }
} }
@ -115,6 +148,18 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
} }
} }
private Object parseProtobufRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
String topic = record.topic();
if (record.value()!=null && protobufDeserializer !=null) {
byte[] valueBytes = record.value().get();
final Message message = protobufDeserializer.deserialize(topic, valueBytes);
byte[] bytes = ProtobufSchemaUtils.toJson(message);
return parseJson(bytes);
} else {
return new HashMap<String,Object>();
}
}
private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException { private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
byte[] valueBytes = record.value().get(); byte[] valueBytes = record.value().get();
return parseJson(valueBytes); return parseJson(valueBytes);
@ -134,6 +179,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
public enum MessageFormat { public enum MessageFormat {
AVRO, AVRO,
JSON, JSON,
STRING STRING,
PROTOBUF
} }
} }

View file

@ -7,6 +7,7 @@ import org.mapstruct.Mapper;
import org.mapstruct.Mapping; import org.mapstruct.Mapping;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -20,7 +21,7 @@ public interface ClusterMapper {
@Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec", qualifiedByName = "sumMetrics") @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec", qualifiedByName = "sumMetrics")
@Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics") @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics")
Cluster toCluster(KafkaCluster cluster); Cluster toCluster(KafkaCluster cluster);
@Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName="resolvePath")
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
@Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage") @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage")
ClusterStats toClusterStats(InternalClusterMetrics metrics); ClusterStats toClusterStats(InternalClusterMetrics metrics);
@ -64,4 +65,8 @@ public interface ClusterMapper {
return metrics.values().stream().reduce(BigDecimal.ZERO, BigDecimal::add); return metrics.values().stream().reduce(BigDecimal.ZERO, BigDecimal::add);
} }
default Path resolvePath(String path) {
return Path.of(path);
}
} }

View file

@ -4,12 +4,12 @@ import com.provectus.kafka.ui.model.ServerStatus;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import java.nio.file.Path;
import java.util.Map; import java.util.Map;
@Data @Data
@Builder(toBuilder = true) @Builder(toBuilder = true)
public class KafkaCluster { public class KafkaCluster {
private final String name; private final String name;
private final Integer jmxPort; private final Integer jmxPort;
private final String bootstrapServers; private final String bootstrapServers;
@ -22,4 +22,6 @@ public class KafkaCluster {
private final Map<String, InternalTopic> topics; private final Map<String, InternalTopic> topics;
private final Throwable lastKafkaException; private final Throwable lastKafkaException;
private final Throwable lastZookeeperException; private final Throwable lastZookeeperException;
private final Path protobufFile;
private final String protobufMessageName;
} }

View file

@ -1,16 +0,0 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.math.BigDecimal;
import java.util.Map;
@Getter
@AllArgsConstructor
public class MetricDto {
private String canonicalName;
private String metricName;
private Map<String,String> params;
private BigDecimal value;
}

View file

@ -10,7 +10,7 @@ public final class KafkaConstants {
private KafkaConstants() { private KafkaConstants() {
} }
public static Map<String, String> TOPIC_DEFAULT_CONFIGS = Map.ofEntries( public static final Map<String, String> TOPIC_DEFAULT_CONFIGS = Map.ofEntries(
new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE), new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE),
new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"), new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"),
new AbstractMap.SimpleEntry<>(DELETE_RETENTION_MS_CONFIG, "86400000"), new AbstractMap.SimpleEntry<>(DELETE_RETENTION_MS_CONFIG, "86400000"),

View file

@ -29,7 +29,7 @@
<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version> <springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
<kafka.version>2.4.1</kafka.version> <kafka.version>2.4.1</kafka.version>
<avro.version>1.9.2</avro.version> <avro.version>1.9.2</avro.version>
<confluent.version>5.5.0</confluent.version> <confluent.version>5.5.1</confluent.version>
<apache.commons.version>2.2</apache.commons.version> <apache.commons.version>2.2</apache.commons.version>
</properties> </properties>