浏览代码

ISSUE-166 Pass ProtobufSchemaProvider to CachedSchemaRegistryClient to deserialize protobuf records (#178)

Ildar Almakaev 4 年之前
父节点
当前提交
ba4e1748ee

+ 13 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java

@@ -4,12 +4,14 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.Message;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import io.confluent.kafka.schemaregistry.SchemaProvider;
 import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
 import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
 import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
 import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
@@ -41,14 +43,17 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
 		this.cluster = cluster;
 		this.objectMapper = objectMapper;
 
-		this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e ->
-						new CachedSchemaRegistryClient(
-								Collections.singletonList(e),
-								CLIENT_IDENTITY_MAP_CAPACITY,
-								Collections.singletonList(new AvroSchemaProvider()),
-								Collections.emptyMap()
-						)
-		).orElse(null);
+		this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry())
+                .map(schemaRegistryUrl -> {
+                            List<SchemaProvider> schemaProviders = List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider());
+                            return new CachedSchemaRegistryClient(
+                                    Collections.singletonList(schemaRegistryUrl),
+                                    CLIENT_IDENTITY_MAP_CAPACITY,
+                                    schemaProviders,
+                                    Collections.emptyMap()
+                            );
+                        }
+                ).orElse(null);
 
 		this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient)
 				.map(KafkaAvroDeserializer::new)