|
@@ -8,7 +8,6 @@ import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Optional;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
import org.apache.avro.generic.GenericRecord;
|
|
@@ -21,6 +20,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
|
|
|
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
|
|
|
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
|
|
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
|
|
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
|
|
|
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
|
|
|
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
|
|
|
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
|
|
@@ -32,6 +32,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
@RequiredArgsConstructor
|
|
|
public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
|
|
|
+ private final static String AVRO_SCHEMA_TEMPLATE = "%s-value";
|
|
|
+
|
|
|
+ private SchemaRegistryClient schemaRegistryClient;
|
|
|
private KafkaAvroDeserializer avroDeserializer;
|
|
|
private ObjectMapper objectMapper;
|
|
|
private StringDeserializer stringDeserializer;
|
|
@@ -41,7 +44,7 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
public SchemaRegistryRecordDeserializer(String schemaRegistryUrl) {
|
|
|
List<String> endpoints = Collections.singletonList(schemaRegistryUrl);
|
|
|
List<SchemaProvider> providers = Arrays.asList(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider());
|
|
|
- SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, 100, providers, Collections.emptyMap());
|
|
|
+ this. schemaRegistryClient = new CachedSchemaRegistryClient(endpoints, 100, providers, Collections.emptyMap());
|
|
|
|
|
|
this.avroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
|
|
|
this.objectMapper = new ObjectMapper();
|
|
@@ -51,22 +54,25 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
|
|
|
MessageFormat format = getMessageFormat(record);
|
|
|
|
|
|
- Optional<Object> parsedValue;
|
|
|
- switch (format) {
|
|
|
- case AVRO:
|
|
|
- parsedValue = tryParseAvroRecord(record);
|
|
|
- break;
|
|
|
- case JSON:
|
|
|
- parsedValue = tryParseJsonRecord(record);
|
|
|
- break;
|
|
|
- case STRING:
|
|
|
- parsedValue = parseStringRecord(record);
|
|
|
- break;
|
|
|
- default:
|
|
|
- parsedValue = Optional.empty();
|
|
|
+ try {
|
|
|
+ Object parsedValue;
|
|
|
+ switch (format) {
|
|
|
+ case AVRO:
|
|
|
+ parsedValue = parseAvroRecord(record);
|
|
|
+ break;
|
|
|
+ case JSON:
|
|
|
+ parsedValue = parseJsonRecord(record);
|
|
|
+ break;
|
|
|
+ case STRING:
|
|
|
+ parsedValue = parseStringRecord(record);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ throw new IllegalArgumentException("Unknown message format " + format + " for topic " + record.topic());
|
|
|
+ }
|
|
|
+ return parsedValue;
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new RuntimeException("Failed to parse record from topic " + record.topic(), e);
|
|
|
}
|
|
|
-
|
|
|
- return parsedValue.orElseThrow(() -> new IllegalArgumentException("Unknown message format from topic " + record.topic()));
|
|
|
}
|
|
|
|
|
|
private MessageFormat getMessageFormat(ConsumerRecord<Bytes, Bytes> record) {
|
|
@@ -74,41 +80,35 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
}
|
|
|
|
|
|
private MessageFormat detectFormat(ConsumerRecord<Bytes, Bytes> record) {
|
|
|
- Optional<Object> parsedValue = tryParseAvroRecord(record);
|
|
|
- if (parsedValue.isPresent()) {
|
|
|
+ String avroSchema = String.format(AVRO_SCHEMA_TEMPLATE, record.topic());
|
|
|
+ try {
|
|
|
+ schemaRegistryClient.getAllVersions(avroSchema);
|
|
|
return MessageFormat.AVRO;
|
|
|
+ } catch (RestClientException | IOException e) {
|
|
|
+ log.info("Failed to get Avro schema for topic {}", record.topic());
|
|
|
}
|
|
|
|
|
|
- parsedValue = tryParseJsonRecord(record);
|
|
|
- if (parsedValue.isPresent()) {
|
|
|
+ try {
|
|
|
+ parseJsonRecord(record);
|
|
|
return MessageFormat.JSON;
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.info("Failed to parse json from topic {}", record.topic());
|
|
|
}
|
|
|
|
|
|
return MessageFormat.STRING;
|
|
|
}
|
|
|
|
|
|
- private Optional<Object> tryParseAvroRecord(ConsumerRecord<Bytes, Bytes> record) {
|
|
|
+ private Object parseAvroRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
|
|
|
String topic = record.topic();
|
|
|
byte[] valueBytes = record.value().get();
|
|
|
- try {
|
|
|
- GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
|
|
|
- byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
|
|
|
- return Optional.of(parseJson(bytes));
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("Record from topic {} isn't Avro record", topic);
|
|
|
- return Optional.empty();
|
|
|
- }
|
|
|
+ GenericRecord avroRecord = (GenericRecord) avroDeserializer.deserialize(topic, valueBytes);
|
|
|
+ byte[] bytes = AvroSchemaUtils.toJson(avroRecord);
|
|
|
+ return parseJson(bytes);
|
|
|
}
|
|
|
|
|
|
- private Optional<Object> tryParseJsonRecord(ConsumerRecord<Bytes, Bytes> record) {
|
|
|
- String topic = record.topic();
|
|
|
+ private Object parseJsonRecord(ConsumerRecord<Bytes, Bytes> record) throws IOException {
|
|
|
byte[] valueBytes = record.value().get();
|
|
|
- try {
|
|
|
- return Optional.of(parseJson(valueBytes));
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("Record from topic {} isn't json record", topic);
|
|
|
- return Optional.empty();
|
|
|
- }
|
|
|
+ return parseJson(valueBytes);
|
|
|
}
|
|
|
|
|
|
private Object parseJson(byte[] bytes) throws IOException {
|
|
@@ -116,10 +116,10 @@ public class SchemaRegistryRecordDeserializer implements RecordDeserializer {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private Optional<Object> parseStringRecord(ConsumerRecord<Bytes, Bytes> record) {
|
|
|
+ private Object parseStringRecord(ConsumerRecord<Bytes, Bytes> record) {
|
|
|
String topic = record.topic();
|
|
|
byte[] valueBytes = record.value().get();
|
|
|
- return Optional.of(stringDeserializer.deserialize(topic, valueBytes));
|
|
|
+ return stringDeserializer.deserialize(topic, valueBytes);
|
|
|
}
|
|
|
|
|
|
public enum MessageFormat {
|