|
@@ -24,25 +24,28 @@ import io.confluent.kafka.serializers.KafkaAvroDeserializer;
|
|
|
|
|
|
import com.fasterxml.jackson.core.type.TypeReference;
|
|
import com.fasterxml.jackson.core.type.TypeReference;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
|
|
|
@Log4j2
|
|
@Log4j2
|
|
@RequiredArgsConstructor
|
|
@RequiredArgsConstructor
|
|
public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
|
|
|
private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;
|
|
private final static int CLIENT_IDENTITY_MAP_CAPACITY = 100;
|
|
- private final static String AVRO_SCHEMA_TEMPLATE = "%s-value";
|
|
|
|
|
|
|
|
- private SchemaRegistryClient schemaRegistryClient;
|
|
|
|
|
|
+ private final KafkaCluster cluster;
|
|
|
|
+ private final SchemaRegistryClient schemaRegistryClient;
|
|
private KafkaAvroDeserializer avroDeserializer;
|
|
private KafkaAvroDeserializer avroDeserializer;
|
|
private ObjectMapper objectMapper;
|
|
private ObjectMapper objectMapper;
|
|
private StringDeserializer stringDeserializer;
|
|
private StringDeserializer stringDeserializer;
|
|
|
|
|
|
private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>();
|
|
private final Map<String, MessageFormat> topicFormatMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
- public SchemaRegistryRecordDeserializer(String schemaRegistryUrl) {
|
|
|
|
- List<String> endpoints = Collections.singletonList(schemaRegistryUrl);
|
|
|
|
|
|
+ public SchemaRegistryRecordDeserializer(KafkaCluster cluster) {
|
|
|
|
+ this.cluster = cluster;
|
|
|
|
+
|
|
|
|
+ List<String> endpoints = Collections.singletonList(cluster.getSchemaRegistry());
|
|
List<SchemaProvider> providers = Collections.singletonList(new AvroSchemaProvider());
|
|
List<SchemaProvider> providers = Collections.singletonList(new AvroSchemaProvider());
|
|
- this. schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap());
|
|
|
|
|
|
+ this.schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, CLIENT_IDENTITY_MAP_CAPACITY, providers, Collections.emptyMap());
|
|
|
|
|
|
this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
|
|
this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
|
|
this.objectMapper = new ObjectMapper();
|
|
this.objectMapper = new ObjectMapper();
|
|
@@ -78,7 +81,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
}
|
|
}
|
|
|
|
|
|
private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
|
|
private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
|
|
- String avroSchema = String.format(AVRO_SCHEMA_TEMPLATE, record.topic());
|
|
|
|
|
|
+ String avroSchema = String.format(cluster.getSchemaNameTemplate(), record.topic());
|
|
try {
|
|
try {
|
|
schemaRegistryClient.getAllVersions(avroSchema);
|
|
schemaRegistryClient.getAllVersions(avroSchema);
|
|
return MessageFormat.AVRO;
|
|
return MessageFormat.AVRO;
|