diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java index a64aded30a..00544a109d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java @@ -4,12 +4,14 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.Message; import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import io.confluent.kafka.schemaregistry.SchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; @@ -41,14 +43,17 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer { this.cluster = cluster; this.objectMapper = objectMapper; - this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e -> - new CachedSchemaRegistryClient( - Collections.singletonList(e), - CLIENT_IDENTITY_MAP_CAPACITY, - Collections.singletonList(new AvroSchemaProvider()), - Collections.emptyMap() - ) - ).orElse(null); + this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()) + .map(schemaRegistryUrl -> { + List schemaProviders = List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider()); + return new CachedSchemaRegistryClient( + Collections.singletonList(schemaRegistryUrl), + CLIENT_IDENTITY_MAP_CAPACITY, + schemaProviders, + Collections.emptyMap() + ); + } + ).orElse(null); this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient) .map(KafkaAvroDeserializer::new)