diff --git a/README.md b/README.md index 1440cb7086..ab82932896 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,10 @@ For example, if you want to use an environment variable to set the `name` parame |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY` |SchemaRegistry's address |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME` |SchemaRegistry's basic authentication username |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD` |SchemaRegistry's basic authentication password +|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_KEYSTORELOCATION` |Path to the JKS keystore to communicate to SchemaRegistry +|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_KEYSTOREPASSWORD` |Password of the JKS keystore for SchemaRegistry +|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTORELOCATION` |Path to the JKS truststore to communicate to SchemaRegistry +|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTOREPASSWORD` |Password of the JKS truststore for SchemaRegistry |`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry |`KAFKA_CLUSTERS_0_METRICS_PORT` |Open metrics port of a broker |`KAFKA_CLUSTERS_0_METRICS_TYPE` |Type of metrics retriever to use. Valid values are JMX (default) or PROMETHEUS. If Prometheus, then metrics are read from prometheus-jmx-exporter instead of jmx diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 5b59800354..4313c3248e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -27,6 +27,7 @@ public class ClustersProperties { String bootstrapServers; String schemaRegistry; SchemaRegistryAuth schemaRegistryAuth; + WebClientSsl schemaRegistrySsl; String ksqldbServer; KsqldbServerAuth ksqldbServerAuth; List kafkaConnect; @@ -62,6 +63,14 @@ public class ClustersProperties { String password; } + @Data + public static class WebClientSsl { + String keystoreLocation; + String keystorePassword; + String truststoreLocation; + String truststorePassword; + } + @Data public static class SerdeConfig { String name; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index bf7cb33636..9dc3103fca 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -35,7 +35,6 @@ import com.provectus.kafka.ui.model.TopicDetailsDTO; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel; import com.provectus.kafka.ui.service.metrics.RawMetric; -import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -121,6 +120,13 @@ public interface ClusterMapper { internalSchemaRegistry.password(clusterProperties.getSchemaRegistryAuth().getPassword()); } + if (clusterProperties.getSchemaRegistrySsl() != null) { + internalSchemaRegistry.keystoreLocation(clusterProperties.getSchemaRegistrySsl().getKeystoreLocation()); + internalSchemaRegistry.keystorePassword(clusterProperties.getSchemaRegistrySsl().getKeystorePassword()); + internalSchemaRegistry.truststoreLocation(clusterProperties.getSchemaRegistrySsl().getTruststoreLocation()); + internalSchemaRegistry.truststorePassword(clusterProperties.getSchemaRegistrySsl().getTruststorePassword()); + } + return internalSchemaRegistry.build(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java index 1115bac105..b96952d569 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java @@ -10,6 +10,11 @@ public class InternalSchemaRegistry { private final String password; private final FailoverUrlList url; + private final String keystoreLocation; + private final String truststoreLocation; + private final String keystorePassword; + private final String truststorePassword; + public String getPrimaryNodeUri() { return url.get(0); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java index 787bec2808..9c43c7a63f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -18,6 +18,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; @@ -32,6 +33,7 @@ import java.util.concurrent.Callable; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; +import org.apache.kafka.common.config.SslConfigs; public class SchemaRegistrySerde implements BuiltInSerde { @@ -72,7 +74,20 @@ public class SchemaRegistrySerde implements BuiltInSerde { .orElse(null), serdeProperties.getProperty("password", String.class) .or(() -> kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class)) - .orElse(null) + .orElse(null), + + serdeProperties.getProperty("keystoreLocation", String.class) + .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.keystoreLocation", String.class)) + .orElse(null), + serdeProperties.getProperty("keystorePassword", String.class) + .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.keystorePassword", String.class)) + .orElse(null), + serdeProperties.getProperty("truststoreLocation", String.class) + .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.truststoreLocation", String.class)) + .orElse(null), + serdeProperties.getProperty("truststorePassword", String.class) + .or(() -> kafkaClusterProperties.getProperty("schemaRegistrySSL.truststorePassword", String.class)) + .orElse(null) ), serdeProperties.getProperty("keySchemaNameTemplate", String.class) .or(() -> kafkaClusterProperties.getProperty("keySchemaNameTemplate", String.class)) @@ -98,7 +113,12 @@ public class SchemaRegistrySerde implements BuiltInSerde { private static SchemaRegistryClient createSchemaRegistryClient(List urls, @Nullable String username, - @Nullable String password) { + @Nullable String password, + @Nullable String keyStoreLocation, + @Nullable String keyStorePassword, + @Nullable String trustStoreLocation, + @Nullable String trustStorePassword + ) { Map configs = new HashMap<>(); if (username != null && password != null) { configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); @@ -110,6 +130,24 @@ public class SchemaRegistrySerde implements BuiltInSerde { throw new ValidationException( "You specified password but do not specified username"); } + + // We require at least a truststore. The logic is done similar to SchemaRegistryService.securedWebClientOnTLS + if (trustStoreLocation != null && trustStorePassword != null) { + configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + trustStoreLocation); + configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + trustStorePassword); + + if (keyStoreLocation != null) { + configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + keyStoreLocation); + configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, + keyStorePassword); + configs.put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEY_PASSWORD_CONFIG, + keyStorePassword); + } + } + return new CachedSchemaRegistryClient( urls, 1_000, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index 2a3039b3fe..1cf3cb6ea5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -21,6 +21,7 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel; import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema; import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse; +import com.provectus.kafka.ui.util.SecuredWebClient; import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -68,8 +69,6 @@ public class SchemaRegistryService { private static final String INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA = "incompatible with an earlier schema"; private static final String INVALID_SCHEMA = "Invalid Schema"; - private final WebClient webClient; - public Mono> getAllLatestVersionSchemas(KafkaCluster cluster, List subjects) { return Flux.fromIterable(subjects) @@ -372,10 +371,23 @@ public class SchemaRegistryService { List uriVariables, MultiValueMap queryParams) { final var schemaRegistry = cluster.getSchemaRegistry(); - return webClient - .method(method) - .uri(buildUri(schemaRegistry, path, uriVariables, queryParams)) - .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers)); + + try { + WebClient.Builder schemaRegistryWebClient = SecuredWebClient.configure( + schemaRegistry.getKeystoreLocation(), + schemaRegistry.getKeystorePassword(), + schemaRegistry.getTruststoreLocation(), + schemaRegistry.getTruststorePassword() + ); + + return schemaRegistryWebClient.build() + .method(method) + .uri(buildUri(schemaRegistry, path, uriVariables, queryParams)) + .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers)); + } catch (Exception e) { + throw new IllegalStateException( + "cannot create TLS configuration for schema-registry in cluster " + cluster.getName(), e); + } } private URI buildUri(InternalSchemaRegistry schemaRegistry, String path, List uriVariables, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SecuredWebClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SecuredWebClient.java new file mode 100644 index 0000000000..3e826c855a --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SecuredWebClient.java @@ -0,0 +1,66 @@ +package com.provectus.kafka.ui.util; + +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.TrustManagerFactory; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.util.ResourceUtils; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +public class SecuredWebClient { + public static WebClient.Builder configure( + String keystoreLocation, + String keystorePassword, + String truststoreLocation, + String truststorePassword) + throws NoSuchAlgorithmException, IOException, KeyStoreException, CertificateException, UnrecoverableKeyException { + // If we want to customize our TLS configuration, we need at least a truststore + if (truststoreLocation == null || truststorePassword == null) { + return WebClient.builder(); + } + + SslContextBuilder contextBuilder = SslContextBuilder.forClient(); + + // Prepare truststore + KeyStore trustStore = KeyStore.getInstance("JKS"); + trustStore.load( + new FileInputStream((ResourceUtils.getFile(truststoreLocation))), + truststorePassword.toCharArray() + ); + + TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm() + ); + trustManagerFactory.init(trustStore); + contextBuilder.trustManager(trustManagerFactory); + + // Prepare keystore only if we got a keystore + if (keystoreLocation != null && keystorePassword != null) { + KeyStore keyStore = KeyStore.getInstance("JKS"); + keyStore.load( + new FileInputStream(ResourceUtils.getFile(keystoreLocation)), + keystorePassword.toCharArray() + ); + + KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + keyManagerFactory.init(keyStore, keystorePassword.toCharArray()); + contextBuilder.keyManager(keyManagerFactory); + } + + // Create webclient + SslContext context = contextBuilder.build(); + + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(HttpClient.create().secure(t -> t.sslContext(context)))); + } +} \ No newline at end of file