|
@@ -50,7 +50,6 @@ import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
|
|
import java.io.ByteArrayInputStream;
|
|
import java.io.ByteArrayInputStream;
|
|
import java.nio.file.Files;
|
|
import java.nio.file.Files;
|
|
import java.nio.file.Path;
|
|
import java.nio.file.Path;
|
|
-import java.nio.file.Paths;
|
|
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -204,17 +203,13 @@ public class ProtobufFileSerde implements BuiltInSerde {
|
|
Map<String, Descriptor> keyMessageDescriptorMap) {
|
|
Map<String, Descriptor> keyMessageDescriptorMap) {
|
|
|
|
|
|
static boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties) {
|
|
static boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties) {
|
|
- Optional<String> protobufFile = kafkaClusterProperties.getProperty("protobufFile", String.class);
|
|
|
|
Optional<List<String>> protobufFiles = kafkaClusterProperties.getListProperty("protobufFiles", String.class);
|
|
Optional<List<String>> protobufFiles = kafkaClusterProperties.getListProperty("protobufFiles", String.class);
|
|
Optional<String> protobufFilesDir = kafkaClusterProperties.getProperty("protobufFilesDir", String.class);
|
|
Optional<String> protobufFilesDir = kafkaClusterProperties.getProperty("protobufFilesDir", String.class);
|
|
- return protobufFilesDir.isPresent()
|
|
|
|
- || protobufFile.isPresent()
|
|
|
|
- || protobufFiles.filter(files -> !files.isEmpty()).isPresent();
|
|
|
|
|
|
+ return protobufFilesDir.isPresent() || protobufFiles.filter(files -> !files.isEmpty()).isPresent();
|
|
}
|
|
}
|
|
|
|
|
|
static Configuration create(PropertyResolver properties) {
|
|
static Configuration create(PropertyResolver properties) {
|
|
var protobufSchemas = loadSchemas(
|
|
var protobufSchemas = loadSchemas(
|
|
- properties.getProperty("protobufFile", String.class),
|
|
|
|
properties.getListProperty("protobufFiles", String.class),
|
|
properties.getListProperty("protobufFiles", String.class),
|
|
properties.getProperty("protobufFilesDir", String.class)
|
|
properties.getProperty("protobufFilesDir", String.class)
|
|
);
|
|
);
|
|
@@ -272,12 +267,11 @@ public class ProtobufFileSerde implements BuiltInSerde {
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- static Map<Path, ProtobufSchema> loadSchemas(Optional<String> protobufFile,
|
|
|
|
- Optional<List<String>> protobufFiles,
|
|
|
|
|
|
+ static Map<Path, ProtobufSchema> loadSchemas(Optional<List<String>> protobufFiles,
|
|
Optional<String> protobufFilesDir) {
|
|
Optional<String> protobufFilesDir) {
|
|
if (protobufFilesDir.isPresent()) {
|
|
if (protobufFilesDir.isPresent()) {
|
|
- if (protobufFile.isPresent() || protobufFiles.isPresent()) {
|
|
|
|
- log.warn("protobufFile and protobufFiles properties will be ignored, since protobufFilesDir provided");
|
|
|
|
|
|
+ if (protobufFiles.isPresent()) {
|
|
|
|
+ log.warn("protobufFiles properties will be ignored, since protobufFilesDir provided");
|
|
}
|
|
}
|
|
List<ProtoFile> loadedFiles = new ProtoSchemaLoader(protobufFilesDir.get()).load();
|
|
List<ProtoFile> loadedFiles = new ProtoSchemaLoader(protobufFilesDir.get()).load();
|
|
Map<String, ProtoFileElement> allPaths = loadedFiles.stream()
|
|
Map<String, ProtoFileElement> allPaths = loadedFiles.stream()
|
|
@@ -288,10 +282,8 @@ public class ProtobufFileSerde implements BuiltInSerde {
|
|
f -> new ProtobufSchema(f.toElement(), List.of(), allPaths)));
|
|
f -> new ProtobufSchema(f.toElement(), List.of(), allPaths)));
|
|
}
|
|
}
|
|
//Supporting for backward-compatibility. Normally, protobufFilesDir setting should be used
|
|
//Supporting for backward-compatibility. Normally, protobufFilesDir setting should be used
|
|
- return Stream.concat(
|
|
|
|
- protobufFile.stream(),
|
|
|
|
- protobufFiles.stream().flatMap(Collection::stream)
|
|
|
|
- )
|
|
|
|
|
|
+ return protobufFiles.stream()
|
|
|
|
+ .flatMap(Collection::stream)
|
|
.distinct()
|
|
.distinct()
|
|
.map(Path::of)
|
|
.map(Path::of)
|
|
.collect(Collectors.toMap(path -> path, path -> new ProtobufSchema(readFileAsString(path))));
|
|
.collect(Collectors.toMap(path -> path, path -> new ProtobufSchema(readFileAsString(path))));
|