فهرست منبع

[KSQL] Implement mTLS configuration (#2984)

* feat(ksqldb): Add mTLS support

* doc(ksqldb): Add SSL configuration

* chore(compose): Add ksqldb0 to key material

* feat(ksqldb): Add SSL-enabled demo ksqlDB instance

Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Boris Djurdjevic 2 سال پیش
والد
کامیت
b87665a12c

+ 4 - 0
README.md

@@ -181,6 +181,10 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_KSQLDBSERVER` 	| KSQL DB server address
 |`KAFKA_CLUSTERS_0_KSQLDBSERVERAUTH_USERNAME` 	| KSQL DB server's basic authentication username
 |`KAFKA_CLUSTERS_0_KSQLDBSERVERAUTH_PASSWORD` 	| KSQL DB server's basic authentication password
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_KEYSTORELOCATION`   	|Path to the JKS keystore to communicate to KSQL DB
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_KEYSTOREPASSWORD`   	|Password of the JKS keystore for KSQL DB
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_TRUSTSTORELOCATION`   	|Path to the JKS truststore to communicate to KSQL DB
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_TRUSTSTOREPASSWORD`   	|Password of the JKS truststore for KSQL DB
 |`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` 	|Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY`   	|SchemaRegistry's address
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME`   	|SchemaRegistry's basic authentication username

+ 36 - 0
documentation/compose/kafka-ssl-components.yaml

@@ -10,6 +10,7 @@ services:
       - kafka0
       - schemaregistry0
       - kafka-connect0
+      - ksqldb0
     environment:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SSL
@@ -24,6 +25,11 @@ services:
       KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_KEYSTOREPASSWORD: "secret"
       KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTORELOCATION: /kafka.truststore.jks
       KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTOREPASSWORD: "secret"
+      KAFKA_CLUSTERS_0_KSQLDBSERVER: https://ksqldb0:8088
+      KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_KEYSTORELOCATION: /kafka.keystore.jks
+      KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_KEYSTOREPASSWORD: "secret"
+      KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_TRUSTSTORELOCATION: /kafka.truststore.jks
+      KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_TRUSTSTOREPASSWORD: "secret"
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: local
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: https://kafka-connect0:8083
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_KEYSTORELOCATION: /kafka.keystore.jks
@@ -142,3 +148,33 @@ services:
     volumes:
       - ./ssl/kafka.truststore.jks:/kafka.truststore.jks
       - ./ssl/kafka.keystore.jks:/kafka.keystore.jks
+
+  ksqldb0:
+    image: confluentinc/ksqldb-server:0.18.0
+    depends_on:
+      - kafka0
+      - kafka-connect0
+      - schemaregistry0
+    ports:
+      - 8088:8088
+    environment:
+      KSQL_CUB_KAFKA_TIMEOUT: 120
+      KSQL_LISTENERS: https://0.0.0.0:8088
+      KSQL_BOOTSTRAP_SERVERS: SSL://kafka0:29092
+      KSQL_SECURITY_PROTOCOL: SSL
+      KSQL_SSL_TRUSTSTORE_LOCATION: /kafka.truststore.jks
+      KSQL_SSL_TRUSTSTORE_PASSWORD: secret
+      KSQL_SSL_KEYSTORE_LOCATION: /kafka.keystore.jks
+      KSQL_SSL_KEYSTORE_PASSWORD: secret
+      KSQL_SSL_KEY_PASSWORD: secret
+      KSQL_SSL_CLIENT_AUTHENTICATION: REQUIRED
+      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
+      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
+      KSQL_KSQL_CONNECT_URL: https://kafka-connect0:8083
+      KSQL_KSQL_SCHEMA_REGISTRY_URL: https://schemaregistry0:8085
+      KSQL_KSQL_SERVICE_ID: my_ksql_1
+      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
+      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
+    volumes:
+      - ./ssl/kafka.truststore.jks:/kafka.truststore.jks
+      - ./ssl/kafka.keystore.jks:/kafka.keystore.jks

BIN
documentation/compose/ssl/kafka.keystore.jks


BIN
documentation/compose/ssl/kafka.truststore.jks


+ 1 - 1
documentation/compose/ssl/san.cnf

@@ -1,2 +1,2 @@
 [kafka]
-subjectAltName = DNS:kafka0,DNS:schemaregistry0,DNS:kafka-connect0
+subjectAltName = DNS:kafka0,DNS:schemaregistry0,DNS:kafka-connect0,DNS:ksqldb0

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -30,6 +30,7 @@ public class ClustersProperties {
     WebClientSsl schemaRegistrySsl;
     String ksqldbServer;
     KsqldbServerAuth ksqldbServerAuth;
+    WebClientSsl ksqldbServerSsl;
     List<ConnectCluster> kafkaConnect;
     MetricsConfigData metrics;
     Properties properties;

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -146,6 +146,13 @@ public interface ClusterMapper {
       internalKsqlServerBuilder.password(clusterProperties.getKsqldbServerAuth().getPassword());
     }
 
+    if (clusterProperties.getKsqldbServerSsl() != null) {
+      internalKsqlServerBuilder.keystoreLocation(clusterProperties.getKsqldbServerSsl().getKeystoreLocation());
+      internalKsqlServerBuilder.keystorePassword(clusterProperties.getKsqldbServerSsl().getKeystorePassword());
+      internalKsqlServerBuilder.truststoreLocation(clusterProperties.getKsqldbServerSsl().getTruststoreLocation());
+      internalKsqlServerBuilder.truststorePassword(clusterProperties.getKsqldbServerSsl().getTruststorePassword());
+    }
+
     return internalKsqlServerBuilder.build();
   }
 

+ 6 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java

@@ -5,10 +5,15 @@ import lombok.Data;
 import lombok.ToString;
 
 @Data
-@ToString(exclude = "password")
+@ToString(exclude = {"password", "keystorePassword", "truststorePassword"})
 @Builder(toBuilder = true)
 public class InternalKsqlServer {
   private final String url;
   private final String username;
   private final String password;
+
+  private final String keystoreLocation;
+  private final String truststoreLocation;
+  private final String keystorePassword;
+  private final String truststorePassword;
 }

+ 19 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java

@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.node.TextNode;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.ksql.response.ResponseParser;
+import com.provectus.kafka.ui.util.SecuredWebClient;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -79,11 +80,24 @@ public class KsqlApiClient {
                       MimeTypeUtils.APPLICATION_OCTET_STREAM));
         })
         .build();
-    return WebClient.builder()
-        .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
-        .defaultHeaders(httpHeaders -> setBasicAuthIfEnabled(httpHeaders, cluster))
-        .exchangeStrategies(exchangeStrategies)
-        .build();
+
+    try {
+      WebClient.Builder securedWebClient = SecuredWebClient.configure(
+          cluster.getKsqldbServer().getKeystoreLocation(),
+          cluster.getKsqldbServer().getKeystorePassword(),
+          cluster.getKsqldbServer().getTruststoreLocation(),
+          cluster.getKsqldbServer().getTruststorePassword()
+      );
+
+      return securedWebClient
+          .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
+          .defaultHeaders(httpHeaders -> setBasicAuthIfEnabled(httpHeaders, cluster))
+          .exchangeStrategies(exchangeStrategies)
+          .build();
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "cannot create TLS configuration for ksqlDB in cluster " + cluster.getName(), e);
+    }
   }
 
   public static void setBasicAuthIfEnabled(HttpHeaders headers, KafkaCluster cluster) {