diff --git a/docker/kafka-ui.yaml b/docker/kafka-ui.yaml index b62dfd37da..d8e5fc7837 100644 --- a/docker/kafka-ui.yaml +++ b/docker/kafka-ui.yaml @@ -72,6 +72,8 @@ services: schemaregistry0: image: confluentinc/cp-schema-registry:5.1.0 + ports: + - 8085:8085 depends_on: - zookeeper0 - kafka0 diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 9235bcb983..ad43299fcf 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -87,6 +87,12 @@ kafka-avro-serializer ${confluent.version} + + io.confluent + kafka-protobuf-serializer + ${confluent.version} + + org.apache.avro avro diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java index 851714c4b0..b140de24d8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java @@ -21,6 +21,8 @@ public class ClustersProperties { String zookeeper; String schemaRegistry; String schemaNameTemplate = "%s-value"; + String protobufFile; + String protobufMessageName; int jmxPort; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java index 1fdeddca24..4ebf5dca39 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/DeserializationService.java @@ -29,7 +29,15 @@ public class DeserializationService { } private RecordDeserializer createRecordDeserializerForCluster(KafkaCluster cluster) { - return new SchemaRegistryRecordDeserializer(cluster, objectMapper); + try { + if (cluster.getProtobufFile()!=null) { + return new ProtobufFileRecordDeserializer(cluster.getProtobufFile(), cluster.getProtobufMessageName(), objectMapper); + } else { + return new SchemaRegistryRecordDeserializer(cluster, objectMapper); + } + } catch (Throwable e) { + throw new RuntimeException("Can't init deserializer", e); + } } public RecordDeserializer getRecordDeserializerForCluster(KafkaCluster cluster) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/ProtobufFileRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/ProtobufFileRecordDeserializer.java new file mode 100644 index 0000000000..60a2f17788 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/ProtobufFileRecordDeserializer.java @@ -0,0 +1,45 @@ +package com.provectus.kafka.ui.cluster.deserialization; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.DynamicMessage; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.stream.Collectors; + +public class ProtobufFileRecordDeserializer implements RecordDeserializer { + private final ProtobufSchema protobufSchema; + private final ObjectMapper objectMapper; + + public ProtobufFileRecordDeserializer(Path protobufSchemaPath, String messageName, ObjectMapper objectMapper) throws IOException { + this.objectMapper = objectMapper; + final String schemaString = Files.lines(protobufSchemaPath).collect(Collectors.joining()); + this.protobufSchema = new ProtobufSchema(schemaString).copy(messageName); + } + + @Override + public Object deserialize(ConsumerRecord record) { + try { + final DynamicMessage message = DynamicMessage.parseFrom( + protobufSchema.toDescriptor(), + new ByteArrayInputStream(record.value().get()) + ); + byte[] bytes = ProtobufSchemaUtils.toJson(message); + return parseJson(bytes); + } catch (Throwable e) { + throw new RuntimeException("Failed to parse record from topic " + record.topic(), e); + } + } + + private Object parseJson(byte[] bytes) throws IOException { + return objectMapper.readValue(bytes, new TypeReference>() {}); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java index 5b3393a39d..e8024b3017 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java @@ -2,13 +2,17 @@ package com.provectus.kafka.ui.cluster.deserialization; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Message; import com.provectus.kafka.ui.cluster.model.KafkaCluster; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import lombok.extern.log4j.Log4j2; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -16,10 +20,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; @Log4j2 @@ -30,6 +31,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { private final KafkaCluster cluster; private final SchemaRegistryClient schemaRegistryClient; private final KafkaAvroDeserializer avroDeserializer; + private final KafkaProtobufDeserializer protobufDeserializer; private final ObjectMapper objectMapper; private final StringDeserializer stringDeserializer; @@ -51,6 +53,9 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient) .map(KafkaAvroDeserializer::new) .orElse(null); + this.protobufDeserializer = Optional.ofNullable(this.schemaRegistryClient) + .map(KafkaProtobufDeserializer::new) + .orElse(null); this.stringDeserializer = new StringDeserializer(); } @@ -63,6 +68,9 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { case AVRO: parsedValue = parseAvroRecord(record); break; + case PROTOBUF: + parsedValue = parseProtobufRecord(record); + break; case JSON: parsedValue = parseJsonRecord(record); break; @@ -83,13 +91,38 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { } private MessageFormat detectFormat(ConsumerRecord record) { - String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic()); + String schemaName = String.format(cluster.getSchemaNameTemplate(), record.topic()); if (schemaRegistryClient != null) { try { - schemaRegistryClient.getAllVersions(avroSchema); - return MessageFormat.AVRO; + final List versions = schemaRegistryClient.getAllVersions(schemaName); + if (!versions.isEmpty()) { + final Integer version = versions.iterator().next(); + final Schema schema = schemaRegistryClient.getByVersion(record.topic(), version, false); + if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) { + try { + protobufDeserializer.deserialize(record.topic(), record.value().get()); + return MessageFormat.PROTOBUF; + } catch (Throwable e) { + log.info("Failed to get Protobuf schema for topic {}", record.topic(), e); + } + } else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) { + try { + avroDeserializer.deserialize(record.topic(), record.value().get()); + return MessageFormat.AVRO; + } catch (Throwable e) { + log.info("Failed to get Avro schema for topic {}", record.topic(), e); + } + } else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) { + try { + parseJsonRecord(record); + return MessageFormat.JSON; + } catch (IOException e) { + log.info("Failed to parse json from topic {}", record.topic()); + } + } + } } catch (RestClientException | IOException e) { - log.info("Failed to get Avro schema for topic {}", record.topic()); + log.warn("Failed to get Schema for topic {}", record.topic(), e); } } @@ -115,6 +148,18 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { } } + private Object parseProtobufRecord(ConsumerRecord record) throws IOException { + String topic = record.topic(); + if (record.value()!=null && protobufDeserializer !=null) { + byte[] valueBytes = record.value().get(); + final Message message = protobufDeserializer.deserialize(topic, valueBytes); + byte[] bytes = ProtobufSchemaUtils.toJson(message); + return parseJson(bytes); + } else { + return new HashMap(); + } + } + private Object parseJsonRecord(ConsumerRecord record) throws IOException { byte[] valueBytes = record.value().get(); return parseJson(valueBytes); @@ -134,6 +179,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { public enum MessageFormat { AVRO, JSON, - STRING + STRING, + PROTOBUF } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java index e85690ac95..2f3115e506 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java @@ -7,6 +7,7 @@ import org.mapstruct.Mapper; import org.mapstruct.Mapping; import java.math.BigDecimal; +import java.nio.file.Path; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -20,7 +21,7 @@ public interface ClusterMapper { @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec", qualifiedByName = "sumMetrics") @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec", qualifiedByName = "sumMetrics") Cluster toCluster(KafkaCluster cluster); - + @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName="resolvePath") KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", qualifiedByName="mapDiskUsage") ClusterStats toClusterStats(InternalClusterMetrics metrics); @@ -64,4 +65,8 @@ public interface ClusterMapper { return metrics.values().stream().reduce(BigDecimal.ZERO, BigDecimal::add); } + default Path resolvePath(String path) { + return Path.of(path); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java index a4d8ac5c5c..03a514fe84 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java @@ -4,12 +4,12 @@ import com.provectus.kafka.ui.model.ServerStatus; import lombok.Builder; import lombok.Data; +import java.nio.file.Path; import java.util.Map; @Data @Builder(toBuilder = true) public class KafkaCluster { - private final String name; private final Integer jmxPort; private final String bootstrapServers; @@ -22,4 +22,6 @@ public class KafkaCluster { private final Map topics; private final Throwable lastKafkaException; private final Throwable lastZookeeperException; + private final Path protobufFile; + private final String protobufMessageName; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java deleted file mode 100644 index 801ae2d47b..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/MetricDto.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.provectus.kafka.ui.cluster.model; - -import lombok.AllArgsConstructor; -import lombok.Getter; - -import java.math.BigDecimal; -import java.util.Map; - -@Getter -@AllArgsConstructor -public class MetricDto { - private String canonicalName; - private String metricName; - private Map params; - private BigDecimal value; -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaConstants.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaConstants.java index 0454ed57dc..ed22641407 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaConstants.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaConstants.java @@ -10,7 +10,7 @@ public final class KafkaConstants { private KafkaConstants() { } - public static Map TOPIC_DEFAULT_CONFIGS = Map.ofEntries( + public static final Map TOPIC_DEFAULT_CONFIGS = Map.ofEntries( new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE), new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"), new AbstractMap.SimpleEntry<>(DELETE_RETENTION_MS_CONFIG, "86400000"), diff --git a/pom.xml b/pom.xml index dec7fd3bdd..adba6bb5cd 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ 1.2.32 2.4.1 1.9.2 - 5.5.0 + 5.5.1 2.2