Przeglądaj źródła

Serdes improvements and fixes (#2989)

Serdes improvements and fixes:
1. Built-in serdel's auto-configuration separated from explicit configuration (when properties set in serde configuration)
2. Serdes initialization logic extracted into SerdesInitializer.java
3. docker-compose example with serdes configuration added
4. Serialization.md document fixed
Ilya Kuramshin 2 lat temu
rodzic
commit
216b4acd3d
20 zmienionych plików z 708 dodań i 281 usunięć
  1. 111 0
      documentation/compose/kafka-ui-serdes.yaml
  2. 11 0
      documentation/compose/proto/key-types.proto
  3. 12 0
      documentation/compose/proto/values.proto
  4. 18 4
      documentation/guides/Protobuf.md
  5. 14 14
      documentation/guides/Serialization.md
  6. 17 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/BuiltInSerde.java
  7. 8 165
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/ClusterSerdes.java
  8. 3 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdeInstance.java
  9. 251 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java
  10. 0 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int32Serde.java
  11. 0 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java
  12. 28 15
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerde.java
  13. 4 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/StringSerde.java
  14. 0 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt32Serde.java
  15. 0 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java
  16. 3 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java
  17. 32 27
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java
  18. 3 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java
  19. 183 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/SerdesInitializerTest.java
  20. 10 10
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerdeTest.java

+ 111 - 0
documentation/compose/kafka-ui-serdes.yaml

@@ -0,0 +1,111 @@
+---
+version: '2'
+services:
+
+    kafka-ui:
+        container_name: kafka-ui
+        image: provectuslabs/kafka-ui:latest
+        ports:
+            - 8080:8080
+        depends_on:
+            - kafka0
+            - schemaregistry0
+        environment:
+            kafka.clusters.0.name: SerdeExampleCluster
+            kafka.clusters.0.bootstrapServers: kafka0:29092
+            kafka.clusters.0.schemaRegistry: http://schemaregistry0:8085
+            # optional auth and ssl properties for SR
+            #kafka.clusters.0.schemaRegistryAuth.username: "use"
+            #kafka.clusters.0.schemaRegistryAuth.password: "pswrd"
+            #kafka.clusters.0.schemaRegistrySSL.keystoreLocation: /kafka.keystore.jks
+            #kafka.clusters.0.schemaRegistrySSL.keystorePassword: "secret"
+            #kafka.clusters.0.schemaRegistrySSL.truststoreLocation: /kafka.truststore.jks
+            #kafka.clusters.0.schemaRegistrySSL.truststorePassword: "secret"
+
+            kafka.clusters.0.defaultKeySerde: Int32  #optional
+            kafka.clusters.0.defaultValueSerde: String #optional
+
+            kafka.clusters.0.serde.0.name: ProtobufFile
+            kafka.clusters.0.serde.0.topicKeysPattern: "topic1"
+            kafka.clusters.0.serde.0.topicValuesPattern: "topic1"
+            kafka.clusters.0.serde.0.properties.protobufFiles.0: /protofiles/key-types.proto
+            kafka.clusters.0.serde.0.properties.protobufFiles.1: /protofiles/values.proto
+            kafka.clusters.0.serde.0.properties.protobufMessageNameForKey: test.MyKey # default type for keys
+            kafka.clusters.0.serde.0.properties.protobufMessageName: test.MyValue # default type for values
+            kafka.clusters.0.serde.0.properties.protobufMessageNameForKeyByTopic.topic1: test.MySpecificTopicKey # keys type for topic "topic1"
+            kafka.clusters.0.serde.0.properties.protobufMessageNameByTopic.topic1: test.MySpecificTopicValue # values type for topic "topic1"
+
+            kafka.clusters.0.serde.1.name: String
+            #kafka.clusters.0.serde.1.properties.encoding: "UTF-16" #optional, default is UTF-8
+            kafka.clusters.0.serde.1.topicValuesPattern: "json-events|text-events"
+
+            kafka.clusters.0.serde.2.name: AsciiString
+            kafka.clusters.0.serde.2.className: com.provectus.kafka.ui.serdes.builtin.StringSerde
+            kafka.clusters.0.serde.2.properties.encoding: "ASCII"
+
+            kafka.clusters.0.serde.3.name: SchemaRegistry # will be configured automatically using cluster SR
+            kafka.clusters.0.serde.3.topicValuesPattern: "sr-topic.*"
+
+            kafka.clusters.0.serde.4.name: AnotherSchemaRegistry
+            kafka.clusters.0.serde.4.className: com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde
+            kafka.clusters.0.serde.4.properties.url: http://schemaregistry0:8085
+            kafka.clusters.0.serde.4.properties.keySchemaNameTemplate: "%s-key"
+            kafka.clusters.0.serde.4.properties.schemaNameTemplate: "%s-value"
+            #kafka.clusters.0.serde.4.topicValuesPattern: "sr2-topic.*"
+            # optional auth and ssl properties for SR:
+            #kafka.clusters.0.serde.4.properties.username: "user"
+            #kafka.clusters.0.serde.4.properties.password: "passw"
+            #kafka.clusters.0.serde.4.properties.keystoreLocation:  /kafka.keystore.jks
+            #kafka.clusters.0.serde.4.properties.keystorePassword: "secret"
+            #kafka.clusters.0.serde.4.properties.truststoreLocation: /kafka.truststore.jks
+            #kafka.clusters.0.serde.4.properties.truststorePassword: "secret"
+
+            kafka.clusters.0.serde.5.name: UInt64
+            kafka.clusters.0.serde.5.topicKeysPattern: "topic-with-uint64keys"
+        volumes:
+            - ./proto:/protofiles
+
+    kafka0:
+        image: confluentinc/cp-kafka:7.2.1
+        hostname: kafka0
+        container_name: kafka0
+        ports:
+            - "9092:9092"
+            - "9997:9997"
+        environment:
+            KAFKA_BROKER_ID: 1
+            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
+            KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092'
+            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+            KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+            KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+            KAFKA_JMX_PORT: 9997
+            KAFKA_JMX_HOSTNAME: localhost
+            KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
+            KAFKA_PROCESS_ROLES: 'broker,controller'
+            KAFKA_NODE_ID: 1
+            KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
+            KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092'
+            KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
+            KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
+            KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
+        volumes:
+            - ./scripts/update_run.sh:/tmp/update_run.sh
+        command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
+
+    schemaregistry0:
+        image: confluentinc/cp-schema-registry:7.2.1
+        ports:
+            - 8085:8085
+        depends_on:
+            - kafka0
+        environment:
+            SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
+            SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+            SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
+            SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
+
+            SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+            SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+            SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

+ 11 - 0
documentation/compose/proto/key-types.proto

@@ -0,0 +1,11 @@
+syntax = "proto3";
+package test;
+
+message MyKey {
+    string myKeyF1 = 1;
+}
+
+message MySpecificTopicKey {
+    string special_field1 = 1;
+    string special_field2 = 2;
+}

+ 12 - 0
documentation/compose/proto/values.proto

@@ -0,0 +1,12 @@
+syntax = "proto3";
+package test;
+
+message MySpecificTopicValue {
+    string f1 = 1;
+    string f2 = 2;
+}
+
+message MyValue {
+  int32 version = 1;
+  string payload = 2;
+}

+ 18 - 4
documentation/guides/Protobuf.md

@@ -1,5 +1,7 @@
 # Kafkaui Protobuf Support
 
+### This document is deprecated, please see examples in [Serialization document](Serialization.md).
+
 Kafkaui supports deserializing protobuf messages in two ways:
 1. Using Confluent Schema Registry's [protobuf support](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-protobuf.html).
 2. Supplying a protobuf file as well as a configuration that maps topic names to protobuf types.
@@ -15,11 +17,11 @@ kafka:
       protobufFile: path/to/my.proto
       # protobufFiles is the path to one or more protobuf schemas.
       protobufFiles: 
-          - path/to/my.proto
-          - path/to/another.proto
+        - /path/to/my.proto
+        - /path/to/another.proto
       # protobufMessageName is the default protobuf type that is used to deserilize
       # the message's value if the topic is not found in protobufMessageNameByTopic.
-      protobufMessageName: my.Type1
+      protobufMessageName: my.DefaultValType
       # protobufMessageNameByTopic is a mapping of topic names to protobuf types.
       # This mapping is required and is used to deserialize the Kafka message's value.
       protobufMessageNameByTopic:
@@ -27,11 +29,23 @@ kafka:
         topic2: my.Type2
       # protobufMessageNameForKey is the default protobuf type that is used to deserilize
       # the message's key if the topic is not found in protobufMessageNameForKeyByTopic.
-      protobufMessageNameForKey: my.Type1
+      protobufMessageNameForKey: my.DefaultKeyType
       # protobufMessageNameForKeyByTopic is a mapping of topic names to protobuf types.
       # This mapping is optional and is used to deserialize the Kafka message's key.
       # If a protobuf type is not found for a topic's key, the key is deserialized as a string,
       # unless protobufMessageNameForKey is specified.
       protobufMessageNameForKeyByTopic:
         topic1: my.KeyType1
+```
+
+Same config with flattened config (for docker-compose):
+
+```text
+kafka.clusters.0.protobufFiles.0: /path/to/my.proto
+kafka.clusters.0.protobufFiles.1: /path/to/another.proto
+kafka.clusters.0.protobufMessageName: my.DefaultValType
+kafka.clusters.0.protobufMessageNameByTopic.topic1: my.Type1
+kafka.clusters.0.protobufMessageNameByTopic.topic2: my.Type2
+kafka.clusters.0.protobufMessageNameForKey: my.DefaultKeyType
+kafka.clusters.0.protobufMessageNameForKeyByTopic.topic1: my.KeyType1
 ```

+ 14 - 14
documentation/guides/Serialization.md

@@ -34,7 +34,6 @@ kafka:
 ```
 
 ### Protobuf
-[Deprecated configuration is here](Protobuf.md)
 
 Class name: `com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde`
 
@@ -47,8 +46,10 @@ kafka:
       serdes:
         - name: ProtobufFile
           properties:
-            # path to the protobuf schema file
-            protobufFile: path/to/my.proto
+            # path to the protobuf schema files
+            protobufFiles:
+              - path/to/my.proto
+              - path/to/another.proto
             # default protobuf type that is used for KEY serialization/deserialization
             # optional
             protobufMessageNameForKey: my.Type1
@@ -64,12 +65,15 @@ kafka:
             # optional
             protobufMessageNameByTopic:
               topic1: my.Type1
-              topic2: my.Type2
+              "topic.2": my.Type2
 ```
+Docker-compose sample for Protobuf serialization is [here](../compose/kafka-ui-serdes.yaml).
+
+Legacy configuration for protobuf is [here](Protobuf.md).
 
 ### SchemaRegistry
 SchemaRegistry serde is automatically configured if schema registry properties set on cluster level.
-But you can override them or add new SchemaRegistry serde that will connect to another schema-registry instance. 
+But you can add new SchemaRegistry-typed serdes that will connect to another schema-registry instance. 
 
 Class name: `com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde`
 
@@ -78,24 +82,22 @@ Sample configuration:
 kafka:
   clusters:
     - name: Cluster1
-      # this schema-registry will be used by SchemaRegistrySerde by default
+      # this url will be used by "SchemaRegistry" by default
       schemaRegistry: http://main-schema-registry:8081
       serdes:
-        - name: SchemaRegistry
+        - name: AnotherSchemaRegistry
+          className: com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde
           properties:
-            # but you can override cluster-level properties
             url:  http://another-schema-registry:8081
             # auth properties, optional
             username: nameForAuth
             password: P@ssW0RdForAuth
         
           # and also add another SchemaRegistry serde
-        - name: AnotherOneSchemaRegistry
+        - name: ThirdSchemaRegistry
           className: com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde
           properties:
             url:  http://another-yet-schema-registry:8081
-            username: nameForAuth_2
-            password: P@ssW0RdForAuth_2
 ```
 
 ## Setting serdes for specific topics
@@ -115,8 +117,6 @@ kafka:
           topicKeysPattern: ".*-events"
         
         - name: SchemaRegistry
-          properties:
-            url:  http://schema-registry:8081
           topicValuesPattern: click-events|imp-events
 ```
 
@@ -129,7 +129,7 @@ Sample configuration:
 kafka:
   clusters:
     - name: Cluster1
-      defaultKeySerde: Int34
+      defaultKeySerde: Int32
       defaultValueSerde: String
       serdes:
         - name: Int32

+ 17 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/BuiltInSerde.java

@@ -5,8 +5,23 @@ import com.provectus.kafka.ui.serde.api.Serde;
 
 public interface BuiltInSerde extends Serde {
 
-  default boolean initOnStartup(PropertyResolver kafkaClusterProperties,
-                                PropertyResolver globalProperties) {
+  // returns true is serde has enough properties set on cluster&global levels to
+  // be configured without explicit config provide
+  default boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties,
+                                      PropertyResolver globalProperties) {
     return true;
   }
+
+  // will be called for build-in serdes that were not explicitly registered
+  // and that returned true on canBeAutoConfigured(..) call.
+  // NOTE: Serde.configure() method won't be called if serde is auto-configured!
+  default void autoConfigure(PropertyResolver kafkaClusterProperties,
+                             PropertyResolver globalProperties) {
+  }
+
+  @Override
+  default void configure(PropertyResolver serdeProperties,
+                         PropertyResolver kafkaClusterProperties,
+                         PropertyResolver globalProperties) {
+  }
 }

+ 8 - 165
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/ClusterSerdes.java

@@ -1,188 +1,31 @@
 package com.provectus.kafka.ui.serdes;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.Serde;
-import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
-import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
-import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
-import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
-import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
-import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
-import com.provectus.kafka.ui.serdes.builtin.UuidBinarySerde;
-import com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde;
 import java.io.Closeable;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Predicate;
-import java.util.regex.Pattern;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
-import lombok.SneakyThrows;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.core.env.Environment;
 
 @Slf4j
+@RequiredArgsConstructor
 public class ClusterSerdes implements Closeable {
 
-  private static final CustomSerdeLoader CUSTOM_SERDE_LOADER = new CustomSerdeLoader();
-
-  private static final Map<String, Class<? extends BuiltInSerde>> BUILT_IN_SERDES =
-      ImmutableMap.<String, Class<? extends BuiltInSerde>>builder()
-          .put(StringSerde.name(), StringSerde.class)
-          .put(Int32Serde.name(), Int32Serde.class)
-          .put(Int64Serde.name(), Int64Serde.class)
-          .put(UInt32Serde.name(), UInt32Serde.class)
-          .put(UInt64Serde.name(), UInt64Serde.class)
-          .put(Base64Serde.name(), Base64Serde.class)
-          .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class)
-          .put(ProtobufFileSerde.name(), ProtobufFileSerde.class)
-          .put(UuidBinarySerde.name(), UuidBinarySerde.class)
-          .build();
-
-  // using linked map to keep order of serdes added to it
-  private final Map<String, SerdeInstance> serdes = new LinkedHashMap<>();
+  final Map<String, SerdeInstance> serdes;
 
   @Nullable
-  private final SerdeInstance defaultKeySerde;
+  final SerdeInstance defaultKeySerde;
 
   @Nullable
-  private final SerdeInstance defaultValueSerde;
-
-  private final SerdeInstance fallbackSerde;
-
-  public ClusterSerdes(Environment env,
-                       ClustersProperties clustersProperties,
-                       int clusterIndex) {
-    var globalPropertiesResolver = new PropertyResolverImpl(env);
-    var clusterPropertiesResolver = new PropertyResolverImpl(env, "kafka.clusters." + clusterIndex);
-
-    // initializing serdes from config
-    ClustersProperties.Cluster clusterProp = clustersProperties.getClusters().get(clusterIndex);
-    for (int i = 0; i < clusterProp.getSerde().size(); i++) {
-      var sendeConf = clusterProp.getSerde().get(i);
-      if (Strings.isNullOrEmpty(sendeConf.getName())) {
-        throw new ValidationException("'name' property not set for serde: " + sendeConf);
-      }
-      if (serdes.containsKey(sendeConf.getName())) {
-        throw new ValidationException("Multiple serdes with same name: " + sendeConf.getName());
-      }
-      var instance = initSerdeFromConfig(
-          sendeConf,
-          new PropertyResolverImpl(env, "kafka.clusters." + clusterIndex + ".serde." + i + ".properties"),
-          clusterPropertiesResolver,
-          globalPropertiesResolver
-      );
-      serdes.put(sendeConf.getName(), instance);
-    }
-
-    // initializing built-in serdes if they haven't been already initialized
-    BUILT_IN_SERDES.forEach((name, clazz) -> {
-      if (!serdes.containsKey(name)) { // serde can be already initialized with custom config
-        var serde = createSerdeInstance(clazz);
-        if (serde.initOnStartup(clusterPropertiesResolver, globalPropertiesResolver)) {
-          serde.configure(
-              PropertyResolverImpl.empty(),
-              clusterPropertiesResolver,
-              globalPropertiesResolver
-          );
-          serdes.put(name, new SerdeInstance(name, serde, null, null, null));
-        }
-      }
-    });
-
-    defaultKeySerde = Optional.ofNullable(clusterProp.getDefaultKeySerde())
-        .map(name -> Preconditions.checkNotNull(serdes.get(name), "Default key serde not found"))
-        .or(() -> Optional.ofNullable(serdes.get(SchemaRegistrySerde.name())))
-        .or(() -> Optional.ofNullable(serdes.get(ProtobufFileSerde.name())))
-        .orElse(null);
-
-    defaultValueSerde = Optional.ofNullable(clusterProp.getDefaultValueSerde())
-        .map(name -> Preconditions.checkNotNull(serdes.get(name), "Default value serde not found"))
-        .or(() -> Optional.ofNullable(serdes.get(SchemaRegistrySerde.name())))
-        .or(() -> Optional.ofNullable(serdes.get(ProtobufFileSerde.name())))
-        .orElse(null);
-
-    fallbackSerde = createFallbackSerde();
-  }
-
-  private SerdeInstance createFallbackSerde() {
-    StringSerde serde = new StringSerde();
-    serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());
-    return new SerdeInstance("Fallback", serde, null, null, null);
-  }
+  final SerdeInstance defaultValueSerde;
 
-  @SneakyThrows
-  private SerdeInstance initSerdeFromConfig(ClustersProperties.SerdeConfig serdeConfig,
-                                            PropertyResolver serdeProps,
-                                            PropertyResolver clusterProps,
-                                            PropertyResolver globalProps) {
-    String name = serdeConfig.getName();
-    // configuring one of prebuilt serdes with custom params
-    if (BUILT_IN_SERDES.containsKey(name)) {
-      if (serdeConfig.getClassName() != null) {
-        throw new ValidationException("className can't be set for built-in serde");
-      }
-      if (serdeConfig.getFilePath() != null) {
-        throw new ValidationException("filePath can't be set for built-in serde");
-      }
-      var clazz = BUILT_IN_SERDES.get(name);
-      Serde serde = createSerdeInstance(clazz);
-      serde.configure(serdeProps, clusterProps, globalProps);
-      return new SerdeInstance(
-          name,
-          serde,
-          nullablePattern(serdeConfig.getTopicKeysPattern()),
-          nullablePattern(serdeConfig.getTopicValuesPattern()),
-          null
-      );
-    }
-    log.info("Loading custom serde {}", serdeConfig.getName());
-    return loadCustom(serdeConfig, serdeProps, clusterProps, globalProps);
-  }
-
-  @SneakyThrows
-  private <T extends Serde> T createSerdeInstance(Class<T> clazz) {
-    return clazz.getDeclaredConstructor().newInstance();
-  }
-
-  public SerdeInstance getFallbackSerde() {
-    return fallbackSerde;
-  }
-
-  private SerdeInstance loadCustom(ClustersProperties.SerdeConfig serdeConfig,
-                                   PropertyResolver serdeProps,
-                                   PropertyResolver clusterProps,
-                                   PropertyResolver globalProps) {
-    if (Strings.isNullOrEmpty(serdeConfig.getClassName())) {
-      throw new ValidationException(
-          "'className' property not set for custom serde " + serdeConfig.getName());
-    }
-    if (Strings.isNullOrEmpty(serdeConfig.getFilePath())) {
-      throw new ValidationException(
-          "'filePath' property not set for custom serde " + serdeConfig.getName());
-    }
-    var loaded = CUSTOM_SERDE_LOADER.loadAndConfigure(
-        serdeConfig.getClassName(), serdeConfig.getFilePath(), serdeProps, clusterProps, globalProps);
-    return new SerdeInstance(
-        serdeConfig.getName(),
-        loaded.getSerde(),
-        nullablePattern(serdeConfig.getTopicKeysPattern()),
-        nullablePattern(serdeConfig.getTopicValuesPattern()),
-        loaded.getClassLoader()
-    );
-  }
-
-  @Nullable
-  private Pattern nullablePattern(@Nullable String pattern) {
-    return pattern == null ? null : Pattern.compile(pattern);
-  }
+  @Getter
+  final SerdeInstance fallbackSerde;
 
   private Optional<SerdeInstance> findSerdeByPatternsOrDefault(String topic,
                                                                Serde.Target type,

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdeInstance.java

@@ -16,9 +16,9 @@ import lombok.extern.slf4j.Slf4j;
 public class SerdeInstance implements Closeable {
 
   @Getter
-  private final String name;
+  final String name;
 
-  private final Serde serde;
+  final Serde serde;
 
   @Nullable
   final Pattern topicKeyPattern;
@@ -27,7 +27,7 @@ public class SerdeInstance implements Closeable {
   final Pattern topicValuePattern;
 
   @Nullable // will be set for custom serdes
-  private final ClassLoader classLoader;
+  final ClassLoader classLoader;
 
   private <T> T wrapWithClassloader(Supplier<T> call) {
     if (classLoader == null) {

+ 251 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -0,0 +1,251 @@
+package com.provectus.kafka.ui.serdes;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.config.ClustersProperties.SerdeConfig;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.PropertyResolver;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
+import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
+import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
+import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
+import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
+import com.provectus.kafka.ui.serdes.builtin.UuidBinarySerde;
+import com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.env.Environment;
+
+@Slf4j
+public class SerdesInitializer {
+
+  private final Map<String, Class<? extends BuiltInSerde>> builtInSerdeClasses;
+  private final CustomSerdeLoader customSerdeLoader;
+
+  public SerdesInitializer() {
+    this(
+        ImmutableMap.<String, Class<? extends BuiltInSerde>>builder()
+            .put(StringSerde.name(), StringSerde.class)
+            .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class)
+            .put(ProtobufFileSerde.name(), ProtobufFileSerde.class)
+            .put(Int32Serde.name(), Int32Serde.class)
+            .put(Int64Serde.name(), Int64Serde.class)
+            .put(UInt32Serde.name(), UInt32Serde.class)
+            .put(UInt64Serde.name(), UInt64Serde.class)
+            .put(Base64Serde.name(), Base64Serde.class)
+            .put(UuidBinarySerde.name(), UuidBinarySerde.class)
+            .build(),
+        new CustomSerdeLoader()
+    );
+  }
+
+  @VisibleForTesting
+  SerdesInitializer(Map<String, Class<? extends BuiltInSerde>> builtInSerdeClasses,
+                    CustomSerdeLoader customSerdeLoader) {
+    this.builtInSerdeClasses = builtInSerdeClasses;
+    this.customSerdeLoader = customSerdeLoader;
+  }
+
+  /**
+   * Initialization algorithm:
+   * First, we iterate over explicitly configured serdes from cluster config:
+   * > if serde has name = one of built-in serde's names:
+   * - if serde's properties are empty, we treat it as serde should be
+   * auto-configured - we try to do that
+   * - if serde's properties not empty, we treat it as an intention to
+   * override default configuration, so we configuring it with specific config (calling configure(..))
+   * <p/>
+   * > if serde has className = one of built-in serde's classes:
+   * - initializing it with specific config and with default classloader
+   * <p/>
+   * > if serde has custom className != one of built-in serde's classes:
+   * - initializing it with specific config and with custom classloader (see CustomSerdeLoader)
+   * <p/>
+   * Second, we iterate over remaining built-in serdes (that we NOT explicitly configured by config)
+   * trying to auto-configure them and  registering with empty patterns - they will be present
+   * in Serde selection in UI, but not assigned to any topic k/v.
+   */
+  public ClusterSerdes init(Environment env,
+                            ClustersProperties clustersProperties,
+                            int clusterIndex) {
+    ClustersProperties.Cluster clusterProperties = clustersProperties.getClusters().get(clusterIndex);
+    log.debug("Configuring serdes for cluster {}", clusterProperties.getName());
+
+    var globalPropertiesResolver = new PropertyResolverImpl(env);
+    var clusterPropertiesResolver = new PropertyResolverImpl(env, "kafka.clusters." + clusterIndex);
+
+    Map<String, SerdeInstance> registeredSerdes = new LinkedHashMap<>();
+    // initializing serdes from config
+    for (int i = 0; i < clusterProperties.getSerde().size(); i++) {
+      SerdeConfig serdeConfig = clusterProperties.getSerde().get(i);
+      if (Strings.isNullOrEmpty(serdeConfig.getName())) {
+        throw new ValidationException("'name' property not set for serde: " + serdeConfig);
+      }
+      if (registeredSerdes.containsKey(serdeConfig.getName())) {
+        throw new ValidationException("Multiple serdes with same name: " + serdeConfig.getName());
+      }
+      var instance = createSerdeFromConfig(
+          serdeConfig,
+          new PropertyResolverImpl(env, "kafka.clusters." + clusterIndex + ".serde." + i + ".properties"),
+          clusterPropertiesResolver,
+          globalPropertiesResolver
+      );
+      registeredSerdes.put(serdeConfig.getName(), instance);
+    }
+
+    // initializing remaining built-in serdes with empty selection patters
+    builtInSerdeClasses.forEach((name, clazz) -> {
+      if (!registeredSerdes.containsKey(name)) {
+        BuiltInSerde serde = createSerdeInstance(clazz);
+        if (autoConfigureSerde(serde, clusterPropertiesResolver, globalPropertiesResolver)) {
+          registeredSerdes.put(name, new SerdeInstance(name, serde, null, null, null));
+        }
+      }
+    });
+
+    return new ClusterSerdes(
+        registeredSerdes,
+        Optional.ofNullable(clusterProperties.getDefaultKeySerde())
+            .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default key serde not found"))
+            .or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
+            .or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
+            .orElse(null),
+        Optional.ofNullable(clusterProperties.getDefaultValueSerde())
+            .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found"))
+            .or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
+            .or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
+            .orElse(null),
+        createFallbackSerde()
+    );
+  }
+
+  private SerdeInstance createFallbackSerde() {
+    StringSerde serde = new StringSerde();
+    serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());
+    return new SerdeInstance("Fallback", serde, null, null, null);
+  }
+
+  @SneakyThrows
+  private SerdeInstance createSerdeFromConfig(SerdeConfig serdeConfig,
+                                              PropertyResolver serdeProps,
+                                              PropertyResolver clusterProps,
+                                              PropertyResolver globalProps) {
+    if (builtInSerdeClasses.containsKey(serdeConfig.getName())) {
+      return createSerdeWithBuiltInSerdeName(serdeConfig, serdeProps, clusterProps, globalProps);
+    }
+    if (serdeConfig.getClassName() != null) {
+      var builtInSerdeClass = builtInSerdeClasses.values().stream()
+          .filter(c -> c.getName().equals(serdeConfig.getClassName()))
+          .findAny();
+      // built-in serde type with custom name
+      if (builtInSerdeClass.isPresent()) {
+        return createSerdeWithBuiltInClass(builtInSerdeClass.get(), serdeConfig, serdeProps, clusterProps, globalProps);
+      }
+    }
+    log.info("Loading custom serde {}", serdeConfig.getName());
+    return loadAndInitCustomSerde(serdeConfig, serdeProps, clusterProps, globalProps);
+  }
+
+  private SerdeInstance createSerdeWithBuiltInSerdeName(SerdeConfig serdeConfig,
+                                                        PropertyResolver serdeProps,
+                                                        PropertyResolver clusterProps,
+                                                        PropertyResolver globalProps) {
+    String name = serdeConfig.getName();
+    if (serdeConfig.getClassName() != null) {
+      throw new ValidationException("className can't be set for built-in serde");
+    }
+    if (serdeConfig.getFilePath() != null) {
+      throw new ValidationException("filePath can't be set for built-in serde types");
+    }
+    var clazz = builtInSerdeClasses.get(name);
+    BuiltInSerde serde = createSerdeInstance(clazz);
+    if (serdeConfig.getProperties().isEmpty()) {
+      if (!autoConfigureSerde(serde, serdeProps, globalProps)) {
+        // no properties provided and serde does not support auto-configuration
+        throw new ValidationException(name + " serde is not configured");
+      }
+    } else {
+      // configuring serde with explicitly set properties
+      serde.configure(serdeProps, clusterProps, globalProps);
+    }
+    return new SerdeInstance(
+        name,
+        serde,
+        nullablePattern(serdeConfig.getTopicKeysPattern()),
+        nullablePattern(serdeConfig.getTopicValuesPattern()),
+        null
+    );
+  }
+
+  private boolean autoConfigureSerde(BuiltInSerde serde, PropertyResolver clusterProps, PropertyResolver globalProps) {
+    if (serde.canBeAutoConfigured(clusterProps, globalProps)) {
+      serde.autoConfigure(clusterProps, globalProps);
+      return true;
+    }
+    return false;
+  }
+
+  @SneakyThrows
+  private SerdeInstance createSerdeWithBuiltInClass(Class<? extends BuiltInSerde> clazz,
+                                                    SerdeConfig serdeConfig,
+                                                    PropertyResolver serdeProps,
+                                                    PropertyResolver clusterProps,
+                                                    PropertyResolver globalProps) {
+    if (serdeConfig.getFilePath() != null) {
+      throw new ValidationException("filePath can't be set for built-in serde type");
+    }
+    BuiltInSerde serde = createSerdeInstance(clazz);
+    serde.configure(serdeProps, clusterProps, globalProps);
+    return new SerdeInstance(
+        serdeConfig.getName(),
+        serde,
+        nullablePattern(serdeConfig.getTopicKeysPattern()),
+        nullablePattern(serdeConfig.getTopicValuesPattern()),
+        null
+    );
+  }
+
+  @SneakyThrows
+  private <T extends Serde> T createSerdeInstance(Class<T> clazz) {
+    return clazz.getDeclaredConstructor().newInstance();
+  }
+
+  private SerdeInstance loadAndInitCustomSerde(SerdeConfig serdeConfig,
+                                               PropertyResolver serdeProps,
+                                               PropertyResolver clusterProps,
+                                               PropertyResolver globalProps) {
+    if (Strings.isNullOrEmpty(serdeConfig.getClassName())) {
+      throw new ValidationException(
+          "'className' property not set for custom serde " + serdeConfig.getName());
+    }
+    if (Strings.isNullOrEmpty(serdeConfig.getFilePath())) {
+      throw new ValidationException(
+          "'filePath' property not set for custom serde " + serdeConfig.getName());
+    }
+    var loaded = customSerdeLoader.loadAndConfigure(
+        serdeConfig.getClassName(), serdeConfig.getFilePath(), serdeProps, clusterProps, globalProps);
+    return new SerdeInstance(
+        serdeConfig.getName(),
+        loaded.getSerde(),
+        nullablePattern(serdeConfig.getTopicKeysPattern()),
+        nullablePattern(serdeConfig.getTopicValuesPattern()),
+        loaded.getClassLoader()
+    );
+  }
+
+  @Nullable
+  private Pattern nullablePattern(@Nullable String pattern) {
+    return pattern == null ? null : Pattern.compile(pattern);
+  }
+}

+ 0 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int32Serde.java

@@ -14,12 +14,6 @@ public class Int32Serde implements BuiltInSerde {
     return "Int32";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();

+ 0 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/Int64Serde.java

@@ -15,13 +15,6 @@ public class Int64Serde implements BuiltInSerde {
     return "Int64";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();

+ 28 - 15
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerde.java

@@ -3,8 +3,8 @@ package com.provectus.kafka.ui.serdes.builtin;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
+import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
 import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.RecordHeaders;
@@ -42,47 +42,55 @@ public class ProtobufFileSerde implements BuiltInSerde {
 
   private Map<Descriptor, Path> descriptorPaths = new HashMap<>();
 
+  @Nullable
   private Descriptor defaultMessageDescriptor;
 
   @Nullable
   private Descriptor defaultKeyMessageDescriptor;
 
   @Override
-  public boolean initOnStartup(PropertyResolver kafkaClusterProperties,
-                               PropertyResolver globalProperties) {
+  public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties,
+                                     PropertyResolver globalProperties) {
     Optional<String> protobufFile = kafkaClusterProperties.getProperty("protobufFile", String.class);
     Optional<List<String>> protobufFiles = kafkaClusterProperties.getListProperty("protobufFiles", String.class);
+    return protobufFile.isPresent() || protobufFiles.filter(files -> !files.isEmpty()).isPresent();
+  }
 
-    return protobufFile.isPresent() || protobufFiles.map(files -> files.isEmpty() ? null : files).isPresent();
+  @Override
+  public void autoConfigure(PropertyResolver kafkaClusterProperties,
+                            PropertyResolver globalProperties) {
+    configure(kafkaClusterProperties);
   }
 
-  @SneakyThrows
   @Override
   public void configure(PropertyResolver serdeProperties,
                         PropertyResolver kafkaClusterProperties,
                         PropertyResolver globalProperties) {
-    Map<Path, ProtobufSchema> protobufSchemas = joinPathProperties(kafkaClusterProperties).stream()
+    configure(serdeProperties);
+  }
+
+  private void configure(PropertyResolver properties) {
+    Map<Path, ProtobufSchema> protobufSchemas = joinPathProperties(properties).stream()
         .map(path -> Map.entry(path, new ProtobufSchema(readFileAsString(path))))
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
-
     // Load all referenced message schemas and store their source proto file with the descriptors
     Map<Descriptor, Path> descriptorPaths = new HashMap<>();
-    Optional<String> protobufMessageName = kafkaClusterProperties.getProperty("protobufMessageName", String.class);
+    Optional<String> protobufMessageName = properties.getProperty("protobufMessageName", String.class);
     protobufMessageName.ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
 
     Optional<String> protobufMessageNameForKey =
-        kafkaClusterProperties.getProperty("protobufMessageNameForKey", String.class);
+        properties.getProperty("protobufMessageNameForKey", String.class);
     protobufMessageNameForKey
         .ifPresent(messageName -> addProtobufSchema(descriptorPaths, protobufSchemas, messageName));
 
     Optional<Map<String, String>> protobufMessageNameByTopic =
-        kafkaClusterProperties.getMapProperty("protobufMessageNameByTopic", String.class, String.class);
+        properties.getMapProperty("protobufMessageNameByTopic", String.class, String.class);
     protobufMessageNameByTopic
         .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
 
     Optional<Map<String, String>> protobufMessageNameForKeyByTopic =
-        kafkaClusterProperties.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class);
+        properties.getMapProperty("protobufMessageNameForKeyByTopic", String.class, String.class);
     protobufMessageNameForKeyByTopic
         .ifPresent(messageNamesByTopic -> addProtobufSchemas(descriptorPaths, protobufSchemas, messageNamesByTopic));
 
@@ -91,8 +99,7 @@ public class ProtobufFileSerde implements BuiltInSerde {
         .collect(Collectors.toMap(Descriptor::getFullName, Function.identity()));
 
     configure(
-        // this is strange logic, but we need it to support serde's backward-compatibility
-        protobufMessageName.map(descriptorMap::get).orElseGet(Empty::getDescriptor),
+        protobufMessageName.map(descriptorMap::get).orElse(null),
         protobufMessageNameForKey.map(descriptorMap::get).orElse(null),
         descriptorPaths,
         protobufMessageNameByTopic.map(map -> populateDescriptors(descriptorMap, map)).orElse(Map.of()),
@@ -102,11 +109,17 @@ public class ProtobufFileSerde implements BuiltInSerde {
 
   @VisibleForTesting
   void configure(
-      Descriptor defaultMessageDescriptor,
+      @Nullable Descriptor defaultMessageDescriptor,
       @Nullable Descriptor defaultKeyMessageDescriptor,
       Map<Descriptor, Path> descriptorPaths,
       Map<String, Descriptor> messageDescriptorMap,
       Map<String, Descriptor> keyMessageDescriptorMap) {
+    if (defaultMessageDescriptor == null
+        && defaultKeyMessageDescriptor == null
+        && messageDescriptorMap.isEmpty()
+        && keyMessageDescriptorMap.isEmpty()) {
+      throw new ValidationException("Neither default, not per-topic descriptors defined for " + name() + " serde");
+    }
     this.defaultMessageDescriptor = defaultMessageDescriptor;
     this.defaultKeyMessageDescriptor = defaultKeyMessageDescriptor;
     this.descriptorPaths = descriptorPaths;
@@ -142,7 +155,7 @@ public class ProtobufFileSerde implements BuiltInSerde {
   private static Map.Entry<Descriptor, Path> getDescriptorAndPath(Map<Path, ProtobufSchema> protobufSchemas,
                                                                   String msgName) {
     return protobufSchemas.entrySet().stream()
-            .filter(schema -> schema.getValue() != null && schema.getValue().toDescriptor(msgName) != null)
+            .filter(schema -> schema.getValue().toDescriptor(msgName) != null)
             .map(schema -> Map.entry(schema.getValue().toDescriptor(msgName), schema.getKey()))
             .findFirst()
             .orElseThrow(() -> new NullPointerException(

+ 4 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/StringSerde.java

@@ -15,16 +15,15 @@ public class StringSerde implements BuiltInSerde {
     return "String";
   }
 
-  private Charset encoding;
+  private Charset encoding = StandardCharsets.UTF_8;
 
   @Override
   public void configure(PropertyResolver serdeProperties,
                         PropertyResolver kafkaClusterProperties,
                         PropertyResolver globalProperties) {
-    encoding = Charset.forName(
-        serdeProperties.getProperty("encoding", String.class)
-            .orElse(StandardCharsets.UTF_8.name())
-    );
+    serdeProperties.getProperty("encoding", String.class)
+        .map(Charset::forName)
+        .ifPresent(e -> StringSerde.this.encoding = e);
   }
 
   @Override

+ 0 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt32Serde.java

@@ -3,7 +3,6 @@ package com.provectus.kafka.ui.serdes.builtin;
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.UnsignedInteger;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Map;
@@ -15,12 +14,6 @@ public class UInt32Serde implements BuiltInSerde {
     return "UInt32";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();

+ 0 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UInt64Serde.java

@@ -4,13 +4,11 @@ import com.google.common.primitives.Longs;
 import com.google.common.primitives.UnsignedInteger;
 import com.google.common.primitives.UnsignedLong;
 import com.provectus.kafka.ui.serde.api.DeserializeResult;
-import com.provectus.kafka.ui.serde.api.PropertyResolver;
 import com.provectus.kafka.ui.serde.api.RecordHeaders;
 import com.provectus.kafka.ui.serde.api.SchemaDescription;
 import com.provectus.kafka.ui.serdes.BuiltInSerde;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.kafka.common.header.Headers;
 
 
 public class UInt64Serde implements BuiltInSerde {
@@ -19,13 +17,6 @@ public class UInt64Serde implements BuiltInSerde {
     return "UInt64";
   }
 
-  @Override
-  public void configure(PropertyResolver serdeProperties,
-                        PropertyResolver kafkaClusterProperties,
-                        PropertyResolver globalProperties) {
-
-  }
-
   @Override
   public Optional<String> getDescription() {
     return Optional.empty();

+ 3 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/UuidBinarySerde.java

@@ -10,7 +10,6 @@ import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
-import org.apache.kafka.common.header.Headers;
 
 
 public class UuidBinarySerde implements BuiltInSerde {
@@ -19,14 +18,14 @@ public class UuidBinarySerde implements BuiltInSerde {
     return "UUIDBinary";
   }
 
-  private boolean mostSignificantBitsFirst;
+  private boolean mostSignificantBitsFirst = true;
 
   @Override
   public void configure(PropertyResolver serdeProperties,
                         PropertyResolver kafkaClusterProperties,
                         PropertyResolver globalProperties) {
-    mostSignificantBitsFirst = serdeProperties.getProperty("mostSignificantBitsFirst", Boolean.class)
-        .orElse(true);
+    serdeProperties.getProperty("mostSignificantBitsFirst", Boolean.class)
+        .ifPresent(msb -> UuidBinarySerde.this.mostSignificantBitsFirst = msb);
   }
 
   @Override

+ 32 - 27
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

@@ -50,13 +50,35 @@ public class SchemaRegistrySerde implements BuiltInSerde {
   private Map<SchemaType, MessageFormatter> schemaRegistryFormatters;
 
   @Override
-  public boolean initOnStartup(PropertyResolver kafkaClusterProperties,
-                               PropertyResolver globalProperties) {
+  public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties,
+                                     PropertyResolver globalProperties) {
     return kafkaClusterProperties.getListProperty("schemaRegistry", String.class)
         .filter(lst -> !lst.isEmpty())
         .isPresent();
   }
 
+  @Override
+  public void autoConfigure(PropertyResolver kafkaClusterProperties,
+                            PropertyResolver globalProperties) {
+    var urls = kafkaClusterProperties.getListProperty("schemaRegistry", String.class)
+        .filter(lst -> !lst.isEmpty())
+        .orElseThrow(() -> new ValidationException("No urls provided for schema registry"));
+    configure(
+        urls,
+        createSchemaRegistryClient(
+            urls,
+            kafkaClusterProperties.getProperty("schemaRegistryAuth.username", String.class).orElse(null),
+            kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class).orElse(null),
+            kafkaClusterProperties.getProperty("schemaRegistrySSL.keystoreLocation", String.class).orElse(null),
+            kafkaClusterProperties.getProperty("schemaRegistrySSL.keystorePassword", String.class).orElse(null),
+            kafkaClusterProperties.getProperty("schemaRegistrySSL.truststoreLocation", String.class).orElse(null),
+            kafkaClusterProperties.getProperty("schemaRegistrySSL.truststorePassword", String.class).orElse(null)
+        ),
+        kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"),
+        kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value")
+    );
+  }
+
   @Override
   public void configure(PropertyResolver serdeProperties,
                         PropertyResolver kafkaClusterProperties,
@@ -69,32 +91,15 @@ public class SchemaRegistrySerde implements BuiltInSerde {
         urls,
         createSchemaRegistryClient(
             urls,
-            serdeProperties.getProperty("username", String.class)
-                .or(() -> kafkaClusterProperties.getProperty("schemaRegistryAuth.username", String.class))
-                .orElse(null),
-            serdeProperties.getProperty("password", String.class)
-                .or(() -> kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class))
-                .orElse(null),
-
-            serdeProperties.getProperty("keystoreLocation", String.class)
-                    .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.keystoreLocation", String.class))
-                    .orElse(null),
-            serdeProperties.getProperty("keystorePassword", String.class)
-                    .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.keystorePassword", String.class))
-                    .orElse(null),
-            serdeProperties.getProperty("truststoreLocation", String.class)
-                    .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.truststoreLocation", String.class))
-                    .orElse(null),
-            serdeProperties.getProperty("truststorePassword", String.class)
-                    .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.truststorePassword", String.class))
-                    .orElse(null)
+            serdeProperties.getProperty("username", String.class).orElse(null),
+            serdeProperties.getProperty("password", String.class).orElse(null),
+            serdeProperties.getProperty("keystoreLocation", String.class).orElse(null),
+            serdeProperties.getProperty("keystorePassword", String.class).orElse(null),
+            serdeProperties.getProperty("truststoreLocation", String.class).orElse(null),
+            serdeProperties.getProperty("truststorePassword", String.class).orElse(null)
         ),
-        serdeProperties.getProperty("keySchemaNameTemplate", String.class)
-            .or(() -> kafkaClusterProperties.getProperty("keySchemaNameTemplate", String.class))
-            .orElse("%s-key"),
-        serdeProperties.getProperty("schemaNameTemplate", String.class)
-            .or(() -> kafkaClusterProperties.getProperty("schemaNameTemplate", String.class))
-            .orElse("%s-value")
+        serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"),
+        serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value")
     );
   }
 

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java

@@ -9,6 +9,7 @@ import com.provectus.kafka.ui.serdes.ClusterSerdes;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.serdes.SerdeInstance;
+import com.provectus.kafka.ui.serdes.SerdesInitializer;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
@@ -16,11 +17,9 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import javax.validation.ValidationException;
-import lombok.extern.slf4j.Slf4j;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
-@Slf4j
 @Component
 public class DeserializationService implements Closeable {
 
@@ -29,10 +28,11 @@ public class DeserializationService implements Closeable {
   public DeserializationService(Environment env,
                                 ClustersStorage clustersStorage,
                                 ClustersProperties clustersProperties) {
+    var serdesInitializer = new SerdesInitializer();
     for (int i = 0; i < clustersProperties.getClusters().size(); i++) {
       var clusterProperties = clustersProperties.getClusters().get(i);
       var cluster = clustersStorage.getClusterByName(clusterProperties.getName()).get();
-      clusterSerdes.put(cluster.getName(), new ClusterSerdes(env, clustersProperties, i));
+      clusterSerdes.put(cluster.getName(), serdesInitializer.init(env, clustersProperties, i));
     }
   }
 

+ 183 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/SerdesInitializerTest.java

@@ -0,0 +1,183 @@
+package com.provectus.kafka.ui.serdes;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.PropertyResolver;
+import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+import org.springframework.core.env.Environment;
+import org.springframework.mock.env.MockEnvironment;
+
+class SerdesInitializerTest {
+
+  private final Environment env = new MockEnvironment();
+  private final CustomSerdeLoader customSerdeLoaderMock = mock(CustomSerdeLoader.class);
+
+  private final SerdesInitializer initializer = new SerdesInitializer(
+      Map.of(
+          "BuiltIn1", BuiltInSerdeWithAutoconfigure.class,
+          "BuiltIn2", BuiltInSerdeMock2NoAutoConfigure.class,
+          Int32Serde.name(), Int32Serde.class,
+          StringSerde.name(), StringSerde.class
+      ),
+      customSerdeLoaderMock
+  );
+
+  @Test
+  void pluggedSerdesInitializedByLoader() {
+    ClustersProperties.SerdeConfig customSerdeConfig = new ClustersProperties.SerdeConfig();
+    customSerdeConfig.setName("MyPluggedSerde");
+    customSerdeConfig.setFilePath("/custom.jar");
+    customSerdeConfig.setClassName("org.test.MyPluggedSerde");
+    customSerdeConfig.setTopicKeysPattern("keys");
+    customSerdeConfig.setTopicValuesPattern("values");
+
+    when(customSerdeLoaderMock.loadAndConfigure(anyString(), anyString(), any(), any(), any()))
+        .thenReturn(new CustomSerdeLoader.CustomSerde(new StringSerde(), new URLClassLoader(new URL[]{})));
+
+    var serdes = init(customSerdeConfig);
+
+    SerdeInstance customSerdeInstance = serdes.serdes.get("MyPluggedSerde");
+    verifyPatternsMatch(customSerdeConfig, customSerdeInstance);
+    assertThat(customSerdeInstance.classLoader).isNotNull();
+
+    verify(customSerdeLoaderMock).loadAndConfigure(
+        eq(customSerdeConfig.getClassName()),
+        eq(customSerdeConfig.getFilePath()),
+        any(), any(), any()
+    );
+  }
+
+  @Test
+  void serdeWithBuiltInNameAndNoPropertiesCantBeInitializedIfSerdeNotSupportAutoConfigure() {
+    ClustersProperties.SerdeConfig serdeConfig = new ClustersProperties.SerdeConfig();
+    serdeConfig.setName("BuiltIn2"); //auto-configuration not supported
+    serdeConfig.setTopicKeysPattern("keys");
+    serdeConfig.setTopicValuesPattern("vals");
+
+    assertThatCode(() -> initializer.init(env, createProperties(serdeConfig), 0))
+        .isInstanceOf(ValidationException.class);
+  }
+
+  @Test
+  void serdeWithBuiltInNameAndNoPropertiesIsAutoConfiguredIfPossible() {
+    ClustersProperties.SerdeConfig serdeConfig = new ClustersProperties.SerdeConfig();
+    serdeConfig.setName("BuiltIn1"); // supports auto-configuration
+    serdeConfig.setTopicKeysPattern("keys");
+    serdeConfig.setTopicValuesPattern("vals");
+
+    var serdes = init(serdeConfig);
+
+    SerdeInstance autoConfiguredSerde = serdes.serdes.get("BuiltIn1");
+    verifyAutoConfigured(autoConfiguredSerde);
+    verifyPatternsMatch(serdeConfig, autoConfiguredSerde);
+  }
+
+  @Test
+  void serdeWithBuiltInNameAndSetPropertiesAreExplicitlyConfigured() {
+    ClustersProperties.SerdeConfig serdeConfig = new ClustersProperties.SerdeConfig();
+    serdeConfig.setName("BuiltIn1");
+    serdeConfig.setProperties(Map.of("any", "property"));
+    serdeConfig.setTopicKeysPattern("keys");
+    serdeConfig.setTopicValuesPattern("vals");
+
+    var serdes = init(serdeConfig);
+
+    SerdeInstance explicitlyConfiguredSerde = serdes.serdes.get("BuiltIn1");
+    verifyExplicitlyConfigured(explicitlyConfiguredSerde);
+    verifyPatternsMatch(serdeConfig, explicitlyConfiguredSerde);
+  }
+
+  @Test
+  void serdeWithCustomNameAndBuiltInClassnameAreExplicitlyConfigured() {
+    ClustersProperties.SerdeConfig serdeConfig = new ClustersProperties.SerdeConfig();
+    serdeConfig.setName("SomeSerde");
+    serdeConfig.setClassName(BuiltInSerdeWithAutoconfigure.class.getName());
+    serdeConfig.setTopicKeysPattern("keys");
+    serdeConfig.setTopicValuesPattern("vals");
+
+    var serdes = init(serdeConfig);
+
+    SerdeInstance explicitlyConfiguredSerde = serdes.serdes.get("SomeSerde");
+    verifyExplicitlyConfigured(explicitlyConfiguredSerde);
+    verifyPatternsMatch(serdeConfig, explicitlyConfiguredSerde);
+  }
+
+  private ClusterSerdes init(ClustersProperties.SerdeConfig... serdeConfigs) {
+    return initializer.init(env, createProperties(serdeConfigs), 0);
+  }
+
+  private ClustersProperties createProperties(ClustersProperties.SerdeConfig... serdeConfigs) {
+    ClustersProperties.Cluster cluster = new ClustersProperties.Cluster();
+    cluster.setName("test");
+    cluster.setSerde(List.of(serdeConfigs));
+
+    ClustersProperties clustersProperties = new ClustersProperties();
+    clustersProperties.setClusters(List.of(cluster));
+    return clustersProperties;
+  }
+
+  private void verifyExplicitlyConfigured(SerdeInstance serde) {
+    assertThat(((BuiltInSerdeWithAutoconfigure) serde.serde).autoConfigureCheckCalled).isFalse();
+    assertThat(((BuiltInSerdeWithAutoconfigure) serde.serde).autoConfigured).isFalse();
+    assertThat(((BuiltInSerdeWithAutoconfigure) serde.serde).explicitlyConfigured).isTrue();
+  }
+
+  private void verifyAutoConfigured(SerdeInstance serde) {
+    assertThat(((BuiltInSerdeWithAutoconfigure) serde.serde).autoConfigureCheckCalled).isTrue();
+    assertThat(((BuiltInSerdeWithAutoconfigure) serde.serde).autoConfigured).isTrue();
+    assertThat(((BuiltInSerdeWithAutoconfigure) serde.serde).explicitlyConfigured).isFalse();
+  }
+
+  private void verifyPatternsMatch(ClustersProperties.SerdeConfig config, SerdeInstance serde) {
+    assertThat(serde.topicKeyPattern.pattern()).isEqualTo(config.getTopicKeysPattern());
+    assertThat(serde.topicValuePattern.pattern()).isEqualTo(config.getTopicValuesPattern());
+  }
+
+  static class BuiltInSerdeWithAutoconfigure extends StringSerde {
+
+    boolean explicitlyConfigured = false;
+    boolean autoConfigured = false;
+    boolean autoConfigureCheckCalled = false;
+
+    @Override
+    public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) {
+      this.autoConfigureCheckCalled = true;
+      return true;
+    }
+
+    @Override
+    public void autoConfigure(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) {
+      this.autoConfigured = true;
+    }
+
+    @Override
+    public void configure(PropertyResolver serdeProperties,
+                          PropertyResolver kafkaClusterProperties,
+                          PropertyResolver globalProperties) {
+      this.explicitlyConfigured = true;
+    }
+  }
+
+  static class BuiltInSerdeMock2NoAutoConfigure extends BuiltInSerdeWithAutoconfigure {
+    @Override
+    public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) {
+      this.autoConfigureCheckCalled = true;
+      return false;
+    }
+  }
+}

+ 10 - 10
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufFileSerdeTest.java

@@ -236,52 +236,52 @@ class ProtobufFileSerdeTest {
   }
 
   @Test
-  void initOnStartupReturnsFalseIfNoProtoFilesHaveBeenProvided() {
+  void canBeAutoConfiguredReturnsFalseIfNoProtoFilesHaveBeenProvided() {
     PropertyResolver resolver = mock(PropertyResolver.class);
 
     var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
     assertThat(startupSuccessful).isFalse();
   }
 
   @Test
-  void initOnStartupReturnsFalseIfProtoFilesListIsEmpty() {
+  void canBeAutoConfiguredReturnsFalseIfProtoFilesListIsEmpty() {
     PropertyResolver resolver = mock(PropertyResolver.class);
     when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of()));
 
     var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
     assertThat(startupSuccessful).isFalse();
   }
 
   @Test
-  void initOnStartupReturnsTrueIfNoProtoFileHasBeenProvided() {
+  void canBeAutoConfiguredReturnsTrueIfNoProtoFileHasBeenProvided() {
     PropertyResolver resolver = mock(PropertyResolver.class);
     when(resolver.getProperty("protobufFile", String.class)).thenReturn(Optional.of("file.proto"));
 
     var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
     assertThat(startupSuccessful).isTrue();
   }
 
   @Test
-  void initOnStartupReturnsTrueIfProtoFilesHasBeenProvided() {
+  void canBeAutoConfiguredReturnsTrueIfProtoFilesHasBeenProvided() {
     PropertyResolver resolver = mock(PropertyResolver.class);
     when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of("file.proto")));
 
     var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
     assertThat(startupSuccessful).isTrue();
   }
 
   @Test
-  void initOnStartupReturnsTrueIfProtoFileAndProtoFilesHaveBeenProvided() {
+  void canBeAutoConfiguredReturnsTrueIfProtoFileAndProtoFilesHaveBeenProvided() {
     PropertyResolver resolver = mock(PropertyResolver.class);
     when(resolver.getProperty("protobufFile", String.class)).thenReturn(Optional.of("file1.proto"));
     when(resolver.getListProperty("protobufFiles", String.class)).thenReturn(Optional.of(List.of("file2.proto")));
 
     var serde = new ProtobufFileSerde();
-    boolean startupSuccessful = serde.initOnStartup(resolver, resolver);
+    boolean startupSuccessful = serde.canBeAutoConfigured(resolver, resolver);
     assertThat(startupSuccessful).isTrue();
   }