|
@@ -46,6 +46,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|
private List<String> schemaRegistryUrls;
|
|
private List<String> schemaRegistryUrls;
|
|
private String valueSchemaNameTemplate;
|
|
private String valueSchemaNameTemplate;
|
|
private String keySchemaNameTemplate;
|
|
private String keySchemaNameTemplate;
|
|
|
|
+ private boolean checkSchemaExistenceForDeserialize;
|
|
|
|
|
|
private Map<SchemaType, MessageFormatter> schemaRegistryFormatters;
|
|
private Map<SchemaType, MessageFormatter> schemaRegistryFormatters;
|
|
|
|
|
|
@@ -75,7 +76,9 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|
kafkaClusterProperties.getProperty("schemaRegistrySSL.truststorePassword", String.class).orElse(null)
|
|
kafkaClusterProperties.getProperty("schemaRegistrySSL.truststorePassword", String.class).orElse(null)
|
|
),
|
|
),
|
|
kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"),
|
|
kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"),
|
|
- kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value")
|
|
|
|
|
|
+ kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"),
|
|
|
|
+ kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class)
|
|
|
|
+ .orElse(false)
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -99,7 +102,9 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|
serdeProperties.getProperty("truststorePassword", String.class).orElse(null)
|
|
serdeProperties.getProperty("truststorePassword", String.class).orElse(null)
|
|
),
|
|
),
|
|
serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"),
|
|
serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"),
|
|
- serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value")
|
|
|
|
|
|
+ serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"),
|
|
|
|
+ kafkaClusterProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class)
|
|
|
|
+ .orElse(false)
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -108,12 +113,14 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|
List<String> schemaRegistryUrls,
|
|
List<String> schemaRegistryUrls,
|
|
SchemaRegistryClient schemaRegistryClient,
|
|
SchemaRegistryClient schemaRegistryClient,
|
|
String keySchemaNameTemplate,
|
|
String keySchemaNameTemplate,
|
|
- String valueSchemaNameTemplate) {
|
|
|
|
|
|
+ String valueSchemaNameTemplate,
|
|
|
|
+ boolean checkTopicSchemaExistenceForDeserialize) {
|
|
this.schemaRegistryUrls = schemaRegistryUrls;
|
|
this.schemaRegistryUrls = schemaRegistryUrls;
|
|
this.schemaRegistryClient = schemaRegistryClient;
|
|
this.schemaRegistryClient = schemaRegistryClient;
|
|
this.keySchemaNameTemplate = keySchemaNameTemplate;
|
|
this.keySchemaNameTemplate = keySchemaNameTemplate;
|
|
this.valueSchemaNameTemplate = valueSchemaNameTemplate;
|
|
this.valueSchemaNameTemplate = valueSchemaNameTemplate;
|
|
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient);
|
|
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient);
|
|
|
|
+ this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize;
|
|
}
|
|
}
|
|
|
|
|
|
private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls,
|
|
private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls,
|
|
@@ -122,8 +129,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|
@Nullable String keyStoreLocation,
|
|
@Nullable String keyStoreLocation,
|
|
@Nullable String keyStorePassword,
|
|
@Nullable String keyStorePassword,
|
|
@Nullable String trustStoreLocation,
|
|
@Nullable String trustStoreLocation,
|
|
- @Nullable String trustStorePassword
|
|
|
|
- ) {
|
|
|
|
|
|
+ @Nullable String trustStorePassword) {
|
|
Map<String, String> configs = new HashMap<>();
|
|
Map<String, String> configs = new HashMap<>();
|
|
if (username != null && password != null) {
|
|
if (username != null && password != null) {
|
|
configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
|
|
configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
|
|
@@ -169,7 +175,8 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|
@Override
|
|
@Override
|
|
public boolean canDeserialize(String topic, Target type) {
|
|
public boolean canDeserialize(String topic, Target type) {
|
|
String subject = schemaSubject(topic, type);
|
|
String subject = schemaSubject(topic, type);
|
|
- return getSchemaBySubject(subject).isPresent();
|
|
|
|
|
|
+ return !checkSchemaExistenceForDeserialize
|
|
|
|
+ || getSchemaBySubject(subject).isPresent();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|