|
@@ -15,6 +15,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
|
|
|
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
|
|
|
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
|
|
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
|
|
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
|
|
|
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
|
|
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
|
|
|
import java.net.URI;
|
|
@@ -22,70 +23,76 @@ import java.nio.ByteBuffer;
|
|
|
import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
import java.util.Optional;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import javax.annotation.Nullable;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
|
-import reactor.util.function.Tuple2;
|
|
|
-import reactor.util.function.Tuples;
|
|
|
|
|
|
@Log4j2
|
|
|
-public class SchemaRegistryRecordSerDe implements RecordSerDe {
|
|
|
+public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
|
|
|
|
|
|
private static final int CLIENT_IDENTITY_MAP_CAPACITY = 100;
|
|
|
|
|
|
private final KafkaCluster cluster;
|
|
|
- private final SchemaRegistryClient schemaRegistryClient;
|
|
|
private final Map<String, MessageFormatter> valueFormatMap = new ConcurrentHashMap<>();
|
|
|
private final Map<String, MessageFormatter> keyFormatMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
- private AvroMessageFormatter avroFormatter;
|
|
|
- private ProtobufMessageFormatter protobufFormatter;
|
|
|
- private final JsonMessageFormatter jsonFormatter;
|
|
|
+ @Nullable
|
|
|
+ private final SchemaRegistryClient schemaRegistryClient;
|
|
|
+
|
|
|
+ @Nullable
|
|
|
+ private final AvroMessageFormatter avroFormatter;
|
|
|
+
|
|
|
+ @Nullable
|
|
|
+ private final ProtobufMessageFormatter protobufFormatter;
|
|
|
+
|
|
|
private final StringMessageFormatter stringFormatter = new StringMessageFormatter();
|
|
|
private final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter();
|
|
|
private final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter();
|
|
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
|
|
|
|
|
- public SchemaRegistryRecordSerDe(KafkaCluster cluster, ObjectMapper objectMapper) {
|
|
|
- this.cluster = cluster;
|
|
|
-
|
|
|
- this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry())
|
|
|
- .map(schemaRegistryUrl -> {
|
|
|
- List<SchemaProvider> schemaProviders =
|
|
|
- List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider());
|
|
|
- return new CachedSchemaRegistryClient(
|
|
|
- Collections.singletonList(schemaRegistryUrl),
|
|
|
- CLIENT_IDENTITY_MAP_CAPACITY,
|
|
|
- schemaProviders,
|
|
|
- Collections.emptyMap()
|
|
|
- );
|
|
|
- }
|
|
|
- ).orElse(null);
|
|
|
-
|
|
|
- this.jsonFormatter = new JsonMessageFormatter(objectMapper);
|
|
|
+ private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) {
|
|
|
+ Objects.requireNonNull(cluster.getSchemaRegistry());
|
|
|
+ List<SchemaProvider> schemaProviders =
|
|
|
+ List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider());
|
|
|
+ //TODO add auth
|
|
|
+ return new CachedSchemaRegistryClient(
|
|
|
+ Collections.singletonList(cluster.getSchemaRegistry()),
|
|
|
+ CLIENT_IDENTITY_MAP_CAPACITY,
|
|
|
+ schemaProviders,
|
|
|
+ Collections.emptyMap()
|
|
|
+ );
|
|
|
+ }
|
|
|
|
|
|
+ public SchemaRegistryAwareRecordSerDe(KafkaCluster cluster) {
|
|
|
+ this.cluster = cluster;
|
|
|
+ this.schemaRegistryClient = cluster.getSchemaRegistry() != null
|
|
|
+ ? createSchemaRegistryClient(cluster)
|
|
|
+ : null;
|
|
|
if (schemaRegistryClient != null) {
|
|
|
- this.avroFormatter = new AvroMessageFormatter(schemaRegistryClient, objectMapper);
|
|
|
+ this.avroFormatter = new AvroMessageFormatter(schemaRegistryClient);
|
|
|
this.protobufFormatter = new ProtobufMessageFormatter(schemaRegistryClient);
|
|
|
+ } else {
|
|
|
+ this.avroFormatter = null;
|
|
|
+ this.protobufFormatter = null;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public Tuple2<String, Object> deserialize(ConsumerRecord<Bytes, Bytes> msg) {
|
|
|
- MessageFormatter valueFormatter = getMessageFormatter(msg, false);
|
|
|
- MessageFormatter keyFormatter = getMessageFormatter(msg, true);
|
|
|
+ public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
|
|
|
try {
|
|
|
- return Tuples.of(
|
|
|
+ return new DeserializedKeyValue(
|
|
|
msg.key() != null
|
|
|
- ? keyFormatter.format(msg.topic(), msg.key().get()).toString()
|
|
|
- : "",
|
|
|
- valueFormatter.format(
|
|
|
- msg.topic(),
|
|
|
- msg.value() != null ? msg.value().get() : null
|
|
|
- )
|
|
|
+ ? getMessageFormatter(msg, true).format(msg.topic(), msg.key().get())
|
|
|
+ : null,
|
|
|
+ msg.value() != null
|
|
|
+ ? getMessageFormatter(msg, false).format(msg.topic(), msg.value().get())
|
|
|
+ : null
|
|
|
);
|
|
|
} catch (Throwable e) {
|
|
|
throw new RuntimeException("Failed to parse record from topic " + msg.topic(), e);
|
|
@@ -93,47 +100,43 @@ public class SchemaRegistryRecordSerDe implements RecordSerDe {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- @SneakyThrows
|
|
|
- public ProducerRecord<byte[], byte[]> serialize(String topic, byte[] key, byte[] data,
|
|
|
- Optional<Integer> partition) {
|
|
|
+ public ProducerRecord<byte[], byte[]> serialize(String topic,
|
|
|
+ @Nullable String key,
|
|
|
+ @Nullable String data,
|
|
|
+ @Nullable Integer partition) {
|
|
|
final Optional<SchemaMetadata> maybeValueSchema = getSchemaBySubject(topic, false);
|
|
|
final Optional<SchemaMetadata> maybeKeySchema = getSchemaBySubject(topic, true);
|
|
|
|
|
|
- final Optional<byte[]> serializedValue = serialize(maybeValueSchema, topic, data);
|
|
|
- final Optional<byte[]> serializedKey = serialize(maybeKeySchema, topic, key);
|
|
|
+ final byte[] serializedValue = data != null
|
|
|
+ ? serialize(maybeValueSchema, topic, data, false)
|
|
|
+ : null;
|
|
|
+ final byte[] serializedKey = key != null
|
|
|
+ ? serialize(maybeKeySchema, topic, key, true)
|
|
|
+ : null;
|
|
|
|
|
|
- if (serializedValue.isPresent()) {
|
|
|
- return partition
|
|
|
- .map(p ->
|
|
|
- new ProducerRecord<>(topic, p, serializedKey.orElse(key), serializedValue.get())
|
|
|
- ).orElseGet(() ->
|
|
|
- new ProducerRecord<>(topic, serializedKey.orElse(key), serializedValue.get())
|
|
|
- );
|
|
|
- } else {
|
|
|
- throw new RuntimeException("Subject was not found for topic " + topic);
|
|
|
- }
|
|
|
+ return new ProducerRecord<>(topic, partition, serializedKey, serializedValue);
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
|
- private Optional<byte[]> serialize(
|
|
|
- Optional<SchemaMetadata> maybeSchema, String topic, byte[] value) {
|
|
|
+ private byte[] serialize(
|
|
|
+ Optional<SchemaMetadata> maybeSchema, String topic, String value, boolean isKey) {
|
|
|
if (maybeSchema.isPresent()) {
|
|
|
final SchemaMetadata schema = maybeSchema.get();
|
|
|
|
|
|
MessageReader<?> reader;
|
|
|
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
|
|
|
- reader = new ProtobufMessageReader(topic, false, schemaRegistryClient, schema);
|
|
|
+ reader = new ProtobufMessageReader(topic, isKey, schemaRegistryClient, schema);
|
|
|
} else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) {
|
|
|
- reader = new AvroMessageReader(topic, false, schemaRegistryClient, schema);
|
|
|
+ reader = new AvroMessageReader(topic, isKey, schemaRegistryClient, schema);
|
|
|
} else {
|
|
|
- reader = new JsonMessageReader(topic, false, schemaRegistryClient, schema);
|
|
|
+ throw new IllegalStateException("Unsupported schema type: " + schema.getSchemaType());
|
|
|
}
|
|
|
|
|
|
- return Optional.of(reader.read(value));
|
|
|
+ return reader.read(value);
|
|
|
} else {
|
|
|
- return Optional.empty();
|
|
|
+ // if no schema provided serialize input as raw string
|
|
|
+ return value.getBytes();
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -171,7 +174,8 @@ public class SchemaRegistryRecordSerDe implements RecordSerDe {
|
|
|
|
|
|
String jsonSchema;
|
|
|
URI basePath = new URI(cluster.getSchemaRegistry()).resolve(Integer.toString(schema.getId()));
|
|
|
- final ParsedSchema schemaById = schemaRegistryClient.getSchemaById(schema.getId());
|
|
|
+ final ParsedSchema schemaById = Objects.requireNonNull(schemaRegistryClient)
|
|
|
+ .getSchemaById(schema.getId());
|
|
|
|
|
|
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
|
|
|
final ProtobufSchema protobufSchema = (ProtobufSchema) schemaById;
|
|
@@ -207,38 +211,31 @@ public class SchemaRegistryRecordSerDe implements RecordSerDe {
|
|
|
.or(() -> getSchemaBySubject(msg.topic(), isKey).map(SchemaMetadata::getSchemaType));
|
|
|
if (type.isPresent()) {
|
|
|
if (type.get().equals(MessageFormat.PROTOBUF.name())) {
|
|
|
- if (tryFormatter(protobufFormatter, msg).isPresent()) {
|
|
|
+ if (tryFormatter(protobufFormatter, msg, isKey).isPresent()) {
|
|
|
return protobufFormatter;
|
|
|
}
|
|
|
} else if (type.get().equals(MessageFormat.AVRO.name())) {
|
|
|
- if (tryFormatter(avroFormatter, msg).isPresent()) {
|
|
|
+ if (tryFormatter(avroFormatter, msg, isKey).isPresent()) {
|
|
|
return avroFormatter;
|
|
|
}
|
|
|
- } else if (type.get().equals(MessageFormat.JSON.name())) {
|
|
|
- if (tryFormatter(jsonFormatter, msg).isPresent()) {
|
|
|
- return jsonFormatter;
|
|
|
- }
|
|
|
+ } else {
|
|
|
+ throw new IllegalStateException("Unsupported schema type: " + type.get());
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
log.warn("Failed to get Schema for topic {}", msg.topic(), e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- if (tryFormatter(jsonFormatter, msg).isPresent()) {
|
|
|
- return jsonFormatter;
|
|
|
- }
|
|
|
-
|
|
|
return stringFormatter;
|
|
|
}
|
|
|
|
|
|
private Optional<MessageFormatter> tryFormatter(
|
|
|
- MessageFormatter formatter, ConsumerRecord<Bytes, Bytes> msg) {
|
|
|
+ MessageFormatter formatter, ConsumerRecord<Bytes, Bytes> msg, boolean isKey) {
|
|
|
try {
|
|
|
- formatter.format(msg.topic(), msg.value().get());
|
|
|
+ formatter.format(msg.topic(), isKey ? msg.key().get() : msg.value().get());
|
|
|
return Optional.of(formatter);
|
|
|
} catch (Throwable e) {
|
|
|
- log.info("Failed to parse by {} from topic {}", formatter.getClass(), msg.topic());
|
|
|
+ log.warn("Failed to parse by {} from topic {}", formatter.getClass(), msg.topic(), e);
|
|
|
}
|
|
|
|
|
|
return Optional.empty();
|
|
@@ -252,9 +249,10 @@ public class SchemaRegistryRecordSerDe implements RecordSerDe {
|
|
|
ByteBuffer buffer = ByteBuffer.wrap(value.get());
|
|
|
if (buffer.get() == 0) {
|
|
|
int id = buffer.getInt();
|
|
|
- result = Optional.ofNullable(
|
|
|
- schemaRegistryClient.getSchemaById(id)
|
|
|
- ).map(ParsedSchema::schemaType);
|
|
|
+ result =
|
|
|
+ Optional.ofNullable(schemaRegistryClient)
|
|
|
+ .flatMap(client -> wrapClientCall(() -> client.getSchemaById(id)))
|
|
|
+ .map(ParsedSchema::schemaType);
|
|
|
}
|
|
|
}
|
|
|
return result;
|
|
@@ -262,11 +260,23 @@ public class SchemaRegistryRecordSerDe implements RecordSerDe {
|
|
|
|
|
|
@SneakyThrows
|
|
|
private Optional<SchemaMetadata> getSchemaBySubject(String topic, boolean isKey) {
|
|
|
- return Optional.ofNullable(
|
|
|
- schemaRegistryClient.getLatestSchemaMetadata(
|
|
|
- schemaSubject(topic, isKey)
|
|
|
- )
|
|
|
- );
|
|
|
+ return Optional.ofNullable(schemaRegistryClient)
|
|
|
+ .flatMap(client ->
|
|
|
+ wrapClientCall(() ->
|
|
|
+ client.getLatestSchemaMetadata(schemaSubject(topic, isKey))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @SneakyThrows
|
|
|
+ private <T> Optional<T> wrapClientCall(Callable<T> call) {
|
|
|
+ try {
|
|
|
+ return Optional.ofNullable(call.call());
|
|
|
+ } catch (RestClientException restClientException) {
|
|
|
+ if (restClientException.getStatus() == 404) {
|
|
|
+ return Optional.empty();
|
|
|
+ } else {
|
|
|
+ throw new RuntimeException("Error calling SchemaRegistryClient", restClientException);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private String schemaSubject(String topic, boolean isKey) {
|