|
@@ -1,6 +1,8 @@
|
|
|
package com.provectus.kafka.ui.serdes;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
+import com.google.common.base.Strings;
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
import com.provectus.kafka.ui.config.ClustersProperties;
|
|
|
import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
import com.provectus.kafka.ui.serde.api.PropertyResolver;
|
|
@@ -14,6 +16,7 @@ import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
|
|
|
import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
|
|
|
import com.provectus.kafka.ui.serdes.builtin.UuidBinarySerde;
|
|
|
import com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde;
|
|
|
+import java.io.Closeable;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
@@ -26,22 +29,22 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.core.env.Environment;
|
|
|
|
|
|
@Slf4j
|
|
|
-public class ClusterSerdes {
|
|
|
+public class ClusterSerdes implements Closeable {
|
|
|
|
|
|
private static final CustomSerdeLoader CUSTOM_SERDE_LOADER = new CustomSerdeLoader();
|
|
|
|
|
|
private static final Map<String, Class<? extends BuiltInSerde>> BUILT_IN_SERDES =
|
|
|
- Map.of(
|
|
|
- StringSerde.name(), StringSerde.class,
|
|
|
- Int32Serde.name(), Int32Serde.class,
|
|
|
- Int64Serde.name(), Int64Serde.class,
|
|
|
- UInt32Serde.name(), UInt32Serde.class,
|
|
|
- UInt64Serde.name(), UInt64Serde.class,
|
|
|
- UuidBinarySerde.name(), UuidBinarySerde.class,
|
|
|
- Base64Serde.name(), Base64Serde.class,
|
|
|
- SchemaRegistrySerde.name(), SchemaRegistrySerde.class,
|
|
|
- ProtobufFileSerde.name(), ProtobufFileSerde.class
|
|
|
- );
|
|
|
+ ImmutableMap.<String, Class<? extends BuiltInSerde>>builder()
|
|
|
+ .put(StringSerde.name(), StringSerde.class)
|
|
|
+ .put(Int32Serde.name(), Int32Serde.class)
|
|
|
+ .put(Int64Serde.name(), Int64Serde.class)
|
|
|
+ .put(UInt32Serde.name(), UInt32Serde.class)
|
|
|
+ .put(UInt64Serde.name(), UInt64Serde.class)
|
|
|
+ .put(Base64Serde.name(), Base64Serde.class)
|
|
|
+ .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class)
|
|
|
+ .put(ProtobufFileSerde.name(), ProtobufFileSerde.class)
|
|
|
+ .put(UuidBinarySerde.name(), UuidBinarySerde.class)
|
|
|
+ .build();
|
|
|
|
|
|
// using linked map to keep order of serdes added to it
|
|
|
private final Map<String, SerdeInstance> serdes = new LinkedHashMap<>();
|
|
@@ -64,6 +67,9 @@ public class ClusterSerdes {
|
|
|
ClustersProperties.Cluster clusterProp = clustersProperties.getClusters().get(clusterIndex);
|
|
|
for (int i = 0; i < clusterProp.getSerde().size(); i++) {
|
|
|
var sendeConf = clusterProp.getSerde().get(i);
|
|
|
+ if (Strings.isNullOrEmpty(sendeConf.getName())) {
|
|
|
+ throw new ValidationException("'name' property not set for serde: " + sendeConf);
|
|
|
+ }
|
|
|
if (serdes.containsKey(sendeConf.getName())) {
|
|
|
throw new ValidationException("Multiple serdes with same name: " + sendeConf.getName());
|
|
|
}
|
|
@@ -154,6 +160,14 @@ public class ClusterSerdes {
|
|
|
PropertyResolver serdeProps,
|
|
|
PropertyResolver clusterProps,
|
|
|
PropertyResolver globalProps) {
|
|
|
+ if (Strings.isNullOrEmpty(serdeConfig.getClassName())) {
|
|
|
+ throw new ValidationException(
|
|
|
+ "'className' property not set for custom serde " + serdeConfig.getName());
|
|
|
+ }
|
|
|
+ if (Strings.isNullOrEmpty(serdeConfig.getFilePath())) {
|
|
|
+ throw new ValidationException(
|
|
|
+ "'filePath' property not set for custom serde " + serdeConfig.getName());
|
|
|
+ }
|
|
|
var loaded = CUSTOM_SERDE_LOADER.loadAndConfigure(
|
|
|
serdeConfig.getClassName(), serdeConfig.getFilePath(), serdeProps, clusterProps, globalProps);
|
|
|
return new SerdeInstance(
|
|
@@ -215,4 +229,8 @@ public class ClusterSerdes {
|
|
|
.orElse(serdes.get(StringSerde.name()));
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void close() {
|
|
|
+ serdes.values().forEach(SerdeInstance::close);
|
|
|
+ }
|
|
|
}
|