浏览代码

Support SSL for schema registry (#2832)

* feat(schema-registry): Implement schema-registry mTLS configuration

* chore(linter): Make checkstyle happy

* feat(schema-registry): Implement schema-registry mTLS configuration

* chore(linter): Make checkstyle happy

* chore(schema-registry): Refactor WebClient creation for PR #2832

Co-authored-by: Ilya Kuramshin <iliax@proton.me>
Boris Djurdjevic 2 年之前
父节点
当前提交
99e50f8814

+ 4 - 0
README.md

@@ -183,6 +183,10 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY`   	|SchemaRegistry's address
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME`   	|SchemaRegistry's basic authentication username
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD`   	|SchemaRegistry's basic authentication password
+|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_KEYSTORELOCATION`   	|Path to the JKS keystore to communicate to SchemaRegistry
+|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_KEYSTOREPASSWORD`   	|Password of the JKS keystore for SchemaRegistry
+|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTORELOCATION`   	|Path to the JKS truststore to communicate to SchemaRegistry
+|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTOREPASSWORD`   	|Password of the JKS truststore for SchemaRegistry
 |`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry
 |`KAFKA_CLUSTERS_0_METRICS_PORT`        	 |Open metrics port of a broker
 |`KAFKA_CLUSTERS_0_METRICS_TYPE`        	 |Type of metrics retriever to use. Valid values are JMX (default) or PROMETHEUS. If Prometheus, then metrics are read from prometheus-jmx-exporter instead of jmx

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -27,6 +27,7 @@ public class ClustersProperties {
     String bootstrapServers;
     String schemaRegistry;
     SchemaRegistryAuth schemaRegistryAuth;
+    WebClientSsl schemaRegistrySsl;
     String ksqldbServer;
     KsqldbServerAuth ksqldbServerAuth;
     List<ConnectCluster> kafkaConnect;
@@ -62,6 +63,14 @@ public class ClustersProperties {
     String password;
   }
 
+  @Data
+  public static class WebClientSsl {
+    String keystoreLocation;
+    String keystorePassword;
+    String truststoreLocation;
+    String truststorePassword;
+  }
+
   @Data
   public static class SerdeConfig {
     String name;

+ 7 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -35,7 +35,6 @@ import com.provectus.kafka.ui.model.TopicDetailsDTO;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
-import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -121,6 +120,13 @@ public interface ClusterMapper {
       internalSchemaRegistry.password(clusterProperties.getSchemaRegistryAuth().getPassword());
     }
 
+    if (clusterProperties.getSchemaRegistrySsl() != null) {
+      internalSchemaRegistry.keystoreLocation(clusterProperties.getSchemaRegistrySsl().getKeystoreLocation());
+      internalSchemaRegistry.keystorePassword(clusterProperties.getSchemaRegistrySsl().getKeystorePassword());
+      internalSchemaRegistry.truststoreLocation(clusterProperties.getSchemaRegistrySsl().getTruststoreLocation());
+      internalSchemaRegistry.truststorePassword(clusterProperties.getSchemaRegistrySsl().getTruststorePassword());
+    }
+
     return internalSchemaRegistry.build();
   }
 

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java

@@ -10,6 +10,11 @@ public class InternalSchemaRegistry {
   private final String password;
   private final FailoverUrlList url;
 
+  private final String keystoreLocation;
+  private final String truststoreLocation;
+  private final String keystorePassword;
+  private final String truststorePassword;
+
   public String getPrimaryNodeUri() {
     return url.get(0);
   }

+ 40 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java

@@ -18,6 +18,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
 import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
@@ -32,6 +33,7 @@ import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
+import org.apache.kafka.common.config.SslConfigs;
 
 
 public class SchemaRegistrySerde implements BuiltInSerde {
@@ -72,7 +74,20 @@ public class SchemaRegistrySerde implements BuiltInSerde {
                 .orElse(null),
             serdeProperties.getProperty("password", String.class)
                 .or(() -> kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class))
-                .orElse(null)
+                .orElse(null),
+
+            serdeProperties.getProperty("keystoreLocation", String.class)
+                    .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.keystoreLocation", String.class))
+                    .orElse(null),
+            serdeProperties.getProperty("keystorePassword", String.class)
+                    .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.keystorePassword", String.class))
+                    .orElse(null),
+            serdeProperties.getProperty("truststoreLocation", String.class)
+                    .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.truststoreLocation", String.class))
+                    .orElse(null),
+            serdeProperties.getProperty("truststorePassword", String.class)
+                    .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.truststorePassword", String.class))
+                    .orElse(null)
         ),
         serdeProperties.getProperty("keySchemaNameTemplate", String.class)
             .or(() -> kafkaClusterProperties.getProperty("keySchemaNameTemplate", String.class))
@@ -98,7 +113,12 @@ public class SchemaRegistrySerde implements BuiltInSerde {
 
   private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls,
                                                                  @Nullable String username,
-                                                                 @Nullable String password) {
+                                                                 @Nullable String password,
+                                                                 @Nullable String keyStoreLocation,
+                                                                 @Nullable String keyStorePassword,
+                                                                 @Nullable String trustStoreLocation,
+                                                                 @Nullable String trustStorePassword
+                                                                 ) {
     Map<String, String> configs = new HashMap<>();
     if (username != null && password != null) {
       configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
@@ -110,6 +130,24 @@ public class SchemaRegistrySerde implements BuiltInSerde {
       throw new ValidationException(
           "You specified password but do not specified username");
     }
+
+    // We require at least a truststore. The logic is done similar to SchemaRegistryService.securedWebClientOnTLS
+    if (trustStoreLocation != null && trustStorePassword != null) {
+      configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+          trustStoreLocation);
+      configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
+          trustStorePassword);
+
+      if (keyStoreLocation != null) {
+        configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+            keyStoreLocation);
+        configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
+            keyStorePassword);
+        configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEY_PASSWORD_CONFIG,
+            keyStorePassword);
+      }
+    }
+
     return new CachedSchemaRegistryClient(
         urls,
         1_000,

+ 18 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -21,6 +21,7 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
 import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema;
 import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse;
+import com.provectus.kafka.ui.util.SecuredWebClient;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collections;
@@ -68,8 +69,6 @@ public class SchemaRegistryService {
   private static final String INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA = "incompatible with an earlier schema";
   private static final String INVALID_SCHEMA = "Invalid Schema";
 
-  private final WebClient webClient;
-
   public Mono<List<SchemaSubjectDTO>> getAllLatestVersionSchemas(KafkaCluster cluster,
                                                                  List<String> subjects) {
     return Flux.fromIterable(subjects)
@@ -372,10 +371,23 @@ public class SchemaRegistryService {
                                                         List<String> uriVariables,
                                                         MultiValueMap<String, String> queryParams) {
     final var schemaRegistry = cluster.getSchemaRegistry();
-    return webClient
-        .method(method)
-        .uri(buildUri(schemaRegistry, path, uriVariables, queryParams))
-        .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers));
+
+    try {
+      WebClient.Builder schemaRegistryWebClient = SecuredWebClient.configure(
+          schemaRegistry.getKeystoreLocation(),
+          schemaRegistry.getKeystorePassword(),
+          schemaRegistry.getTruststoreLocation(),
+          schemaRegistry.getTruststorePassword()
+      );
+
+      return schemaRegistryWebClient.build()
+          .method(method)
+          .uri(buildUri(schemaRegistry, path, uriVariables, queryParams))
+          .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers));
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "cannot create TLS configuration for schema-registry in cluster " + cluster.getName(), e);
+    }
   }
 
   private URI buildUri(InternalSchemaRegistry schemaRegistry, String path, List<String> uriVariables,

+ 66 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SecuredWebClient.java

@@ -0,0 +1,66 @@
+package com.provectus.kafka.ui.util;
+
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.util.ResourceUtils;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.netty.http.client.HttpClient;
+
+public class SecuredWebClient {
+  public static WebClient.Builder configure(
+      String keystoreLocation,
+      String keystorePassword,
+      String truststoreLocation,
+      String truststorePassword)
+      throws NoSuchAlgorithmException, IOException, KeyStoreException, CertificateException, UnrecoverableKeyException {
+    // If we want to customize our TLS configuration, we need at least a truststore
+    if (truststoreLocation == null || truststorePassword == null) {
+      return WebClient.builder();
+    }
+
+    SslContextBuilder contextBuilder = SslContextBuilder.forClient();
+
+    // Prepare truststore
+    KeyStore trustStore = KeyStore.getInstance("JKS");
+    trustStore.load(
+        new FileInputStream((ResourceUtils.getFile(truststoreLocation))),
+        truststorePassword.toCharArray()
+    );
+
+    TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(
+        TrustManagerFactory.getDefaultAlgorithm()
+    );
+    trustManagerFactory.init(trustStore);
+    contextBuilder.trustManager(trustManagerFactory);
+
+    // Prepare keystore only if we got a keystore
+    if (keystoreLocation != null && keystorePassword != null) {
+      KeyStore keyStore = KeyStore.getInstance("JKS");
+      keyStore.load(
+          new FileInputStream(ResourceUtils.getFile(keystoreLocation)),
+          keystorePassword.toCharArray()
+      );
+
+      KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+      keyManagerFactory.init(keyStore, keystorePassword.toCharArray());
+      contextBuilder.keyManager(keyManagerFactory);
+    }
+
+    // Create webclient
+    SslContext context = contextBuilder.build();
+
+    return WebClient.builder()
+        .clientConnector(new ReactorClientHttpConnector(HttpClient.create().secure(t -> t.sslContext(context))));
+  }
+}