|
@@ -1,55 +1,56 @@
|
|
|
package com.provectus.kafka.ui.cluster.deserialization;
|
|
|
|
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
-import lombok.extern.log4j.Log4j2;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-
|
|
|
-import org.apache.avro.generic.GenericRecord;
|
|
|
-import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
-import org.apache.kafka.common.serialization.StringDeserializer;
|
|
|
-import org.apache.kafka.common.utils.Bytes;
|
|
|
-
|
|
|
-import io.confluent.kafka.schemaregistry.SchemaProvider;
|
|
|
+import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
|
|
|
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
|
|
|
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
|
|
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
|
|
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
|
|
|
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
|
|
|
+import lombok.extern.log4j.Log4j2;
|
|
|
+import org.apache.avro.generic.GenericRecord;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.apache.kafka.common.serialization.StringDeserializer;
|
|
|
+import org.apache.kafka.common.utils.Bytes;
|
|
|
|
|
|
-import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
-import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Optional;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
@Log4j2
|
|
|
-@RequiredArgsConstructor
|
|
|
public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
|
|
|
private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;
|
|
|
|
|
|
private final KafkaCluster cluster;
|
|
|
- private final SchemaRegistryClient schemaRegistryClient;
|
|
|
- private KafkaAvroDeserializer avroDeserializer;
|
|
|
- private ObjectMapper objectMapper;
|
|
|
- private StringDeserializer stringDeserializer;
|
|
|
+ private final SchemaRegistryClient schemaRegistryClient;
|
|
|
+ private final KafkaAvroDeserializer avroDeserializer;
|
|
|
+ private final ObjectMapper objectMapper;
|
|
|
+ private final StringDeserializer stringDeserializer;
|
|
|
|
|
|
private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
- public SchemaRegistryRecordDeserializer(KafkaCluster cluster) {
|
|
|
+ public SchemaRegistryRecordDeserializer(KafkaCluster cluster, ObjectMapper objectMapper) {
|
|
|
this.cluster = cluster;
|
|
|
-
|
|
|
- List<String> endpoints = Collections.singletonList(cluster.getSchemaRegistry());
|
|
|
- List<SchemaProvider> providers = Collections.singletonList(new AvroSchemaProvider());
|
|
|
- this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap());
|
|
|
-
|
|
|
- this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
|
|
|
- this.objectMapper = new ObjectMapper();
|
|
|
+ this.objectMapper = objectMapper;
|
|
|
+
|
|
|
+ this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e ->
|
|
|
+ new CachedSchemaRegistryClient(
|
|
|
+ Collections.singletonList(e),
|
|
|
+ CLIENT_IDENTITY_MAP_CAPACITY,
|
|
|
+ Collections.singletonList(new AvroSchemaProvider()),
|
|
|
+ Collections.emptyMap()
|
|
|
+ )
|
|
|
+ ).orElse(null);
|
|
|
+
|
|
|
+ this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient)
|
|
|
+ .map(KafkaAvroDeserializer::new)
|
|
|
+ .orElse(null);
|
|
|
this.stringDeserializer = new StringDeserializer();
|
|
|
}
|
|
|
|
|
@@ -83,11 +84,13 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
|
|
|
private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
|
|
|
String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic());
|
|
|
- try {
|
|
|
- schemaRegistryClient.getAllVersions(avroSchema);
|
|
|
- return MessageFormat.AVRO;
|
|
|
- } catch (RestClientException | IOException e) {
|
|
|
- log.info("Failed to get Avro schema for topic {}", record.topic());
|
|
|
+ if (schemaRegistryClient != null) {
|
|
|
+ try {
|
|
|
+ schemaRegistryClient.getAllVersions(avroSchema);
|
|
|
+ return MessageFormat.AVRO;
|
|
|
+ } catch (RestClientException | IOException e) {
|
|
|
+ log.info("Failed to get Avro schema for topic {}", record.topic());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
try {
|
|
@@ -102,7 +105,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
|
|
|
private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
|
|
|
String topic = record.topic();
|
|
|
- if (record.value()!=null) {
|
|
|
+ if (record.value()!=null && avroDeserializer !=null) {
|
|
|
byte[] valueBytes = record.value().get();
|
|
|
GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
|
|
|
byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
|