Forráskód Böngészése

Implement mTLS for KC (#2833)

* feat(kafka-connect): Implement kafka-connect mTLS configuration

* chore(kafka-connect): Use SecuredWebClient

* feat(ssl-demo): Configure SANs and recreate certificates

* feat(ssl-demo): Add docker-compose demo with TLS enabled components
Boris Djurdjevic 2 éve
szülő
commit
e26f9787d8

+ 4 - 0
README.md

@@ -196,6 +196,10 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS` |Address of the Kafka Connect service endpoint
 |`KAFKA_CLUSTERS_0_KAFKACONNECT_0_USERNAME`| Kafka Connect cluster's basic authentication username
 |`KAFKA_CLUSTERS_0_KAFKACONNECT_0_PASSWORD`| Kafka Connect cluster's basic authentication password
+|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_KEYSTORELOCATION`| Path to the JKS keystore to communicate to Kafka Connect
+|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_KEYSTOREPASSWORD`| Password of the JKS keystore for Kafka Connect
+|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_TRUSTSTORELOCATION`| Path to the JKS truststore to communicate to Kafka Connect
+|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_TRUSTSTOREPASSWORD`| Password of the JKS truststore for Kafka Connect
 |`KAFKA_CLUSTERS_0_METRICS_SSL`          |Enable SSL for Metrics? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
 |`KAFKA_CLUSTERS_0_METRICS_USERNAME` |Username for Metrics authentication
 |`KAFKA_CLUSTERS_0_METRICS_PASSWORD` |Password for Metrics authentication

+ 144 - 0
documentation/compose/kafka-ssl-components.yaml

@@ -0,0 +1,144 @@
+---
+version: '3.4'
+services:
+  kafka-ui:
+    container_name: kafka-ui
+    image: provectuslabs/kafka-ui:latest
+    ports:
+      - 8080:8080
+    depends_on:
+      - kafka0
+      - schemaregistry0
+      - kafka-connect0
+    environment:
+      KAFKA_CLUSTERS_0_NAME: local
+      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SSL
+      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 # SSL LISTENER!
+      KAFKA_CLUSTERS_0_PROPERTIES_SSL_TRUSTSTORE_LOCATION: /kafka.truststore.jks
+      KAFKA_CLUSTERS_0_PROPERTIES_SSL_TRUSTSTORE_PASSWORD: secret
+      KAFKA_CLUSTERS_0_PROPERTIES_SSL_KEYSTORE_LOCATION: /kafka.keystore.jks
+      KAFKA_CLUSTERS_0_PROPERTIES_SSL_KEYSTORE_PASSWORD: secret
+      KAFKA_CLUSTERS_0_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: '' # DISABLE COMMON NAME VERIFICATION
+      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: https://schemaregistry0:8085
+      KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_KEYSTORELOCATION: /kafka.keystore.jks
+      KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_KEYSTOREPASSWORD: "secret"
+      KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTORELOCATION: /kafka.truststore.jks
+      KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTOREPASSWORD: "secret"
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: local
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: https://kafka-connect0:8083
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_KEYSTORELOCATION: /kafka.keystore.jks
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_KEYSTOREPASSWORD: "secret"
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_TRUSTSTORELOCATION: /kafka.truststore.jks
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_TRUSTSTOREPASSWORD: "secret"
+    volumes:
+      - ./ssl/kafka.truststore.jks:/kafka.truststore.jks
+      - ./ssl/kafka.keystore.jks:/kafka.keystore.jks
+
+  kafka0:
+    image: confluentinc/cp-kafka:7.2.1
+    hostname: kafka0
+    container_name: kafka0
+    ports:
+      - "9092:9092"
+      - "9997:9997"
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SSL:SSL,PLAINTEXT_HOST:PLAINTEXT'
+      KAFKA_ADVERTISED_LISTENERS: 'SSL://kafka0:29092,PLAINTEXT_HOST://localhost:9092'
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+      KAFKA_JMX_PORT: 9997
+      KAFKA_JMX_HOSTNAME: localhost
+      KAFKA_PROCESS_ROLES: 'broker,controller'
+      KAFKA_NODE_ID: 1
+      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
+      KAFKA_LISTENERS: 'SSL://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'SSL'
+      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
+      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
+      KAFKA_SECURITY_PROTOCOL: SSL
+      KAFKA_SSL_ENABLED_MECHANISMS: PLAIN,SSL
+      KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
+      KAFKA_SSL_KEYSTORE_CREDENTIALS: creds
+      KAFKA_SSL_KEY_CREDENTIALS: creds
+      KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
+      KAFKA_SSL_TRUSTSTORE_CREDENTIALS: creds
+      #KAFKA_SSL_CLIENT_AUTH: 'required'
+      KAFKA_SSL_CLIENT_AUTH: 'requested'
+      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: '' # COMMON NAME VERIFICATION IS DISABLED SERVER-SIDE
+    volumes:
+      - ./scripts/update_run.sh:/tmp/update_run.sh
+      - ./ssl/creds:/etc/kafka/secrets/creds
+      - ./ssl/kafka.truststore.jks:/etc/kafka/secrets/kafka.truststore.jks
+      - ./ssl/kafka.keystore.jks:/etc/kafka/secrets/kafka.keystore.jks
+    command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
+
+  schemaregistry0:
+    image: confluentinc/cp-schema-registry:7.2.1
+    depends_on:
+      - kafka0
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: SSL://kafka0:29092
+      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SSL
+      SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: /kafka.truststore.jks
+      SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: secret
+      SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: /kafka.keystore.jks
+      SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: secret
+      SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: secret
+      SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
+      SCHEMA_REGISTRY_LISTENERS: https://schemaregistry0:8085
+      SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: https
+
+      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "https"
+      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+      SCHEMA_REGISTRY_SSL_CLIENT_AUTHENTICATION: "REQUIRED"
+      SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /kafka.truststore.jks
+      SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: secret
+      SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /kafka.keystore.jks
+      SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: secret
+      SCHEMA_REGISTRY_SSL_KEY_PASSWORD: secret
+    ports:
+      - 8085:8085
+    volumes:
+      - ./ssl/kafka.truststore.jks:/kafka.truststore.jks
+      - ./ssl/kafka.keystore.jks:/kafka.keystore.jks
+
+  kafka-connect0:
+    image: confluentinc/cp-kafka-connect:7.2.1
+    ports:
+      - 8083:8083
+    depends_on:
+      - kafka0
+      - schemaregistry0
+    environment:
+      CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
+      CONNECT_GROUP_ID: compose-connect-group
+      CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
+      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
+      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_STATUS_STORAGE_TOPIC: _connect_status
+      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: https://schemaregistry0:8085
+      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://schemaregistry0:8085
+      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
+      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
+      CONNECT_SECURITY_PROTOCOL: "SSL"
+      CONNECT_SSL_KEYSTORE_LOCATION: "/kafka.keystore.jks"
+      CONNECT_SSL_KEY_PASSWORD: "secret"
+      CONNECT_SSL_KEYSTORE_PASSWORD: "secret"
+      CONNECT_SSL_TRUSTSTORE_LOCATION: "/kafka.truststore.jks"
+      CONNECT_SSL_TRUSTSTORE_PASSWORD: "secret"
+      CONNECT_SSL_CLIENT_AUTH: "requested"
+      CONNECT_REST_ADVERTISED_LISTENER: "https"
+      CONNECT_LISTENERS: "https://kafka-connect0:8083"
+    volumes:
+      - ./ssl/kafka.truststore.jks:/kafka.truststore.jks
+      - ./ssl/kafka.keystore.jks:/kafka.keystore.jks

+ 2 - 1
documentation/compose/ssl/generate_certs.sh

@@ -144,7 +144,8 @@ echo "Now the trust store's private key (CA) will sign the keystore's certificat
 echo
 openssl x509 -req -CA $CA_CERT_FILE -CAkey $trust_store_private_key_file \
   -in $KEYSTORE_SIGN_REQUEST -out $KEYSTORE_SIGNED_CERT \
-  -days $VALIDITY_IN_DAYS -CAcreateserial
+  -days $VALIDITY_IN_DAYS -CAcreateserial \
+  -extensions kafka -extfile san.cnf
 # creates $KEYSTORE_SIGN_REQUEST_SRL which is never used or needed.
 
 echo

BIN
documentation/compose/ssl/kafka.keystore.jks


BIN
documentation/compose/ssl/kafka.truststore.jks


+ 2 - 0
documentation/compose/ssl/san.cnf

@@ -0,0 +1,2 @@
+[kafka]
+subjectAltName = DNS:kafka0,DNS:schemaregistry0,DNS:kafka-connect0

+ 28 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java

@@ -10,21 +10,32 @@ import com.provectus.kafka.ui.connect.model.Connector;
 import com.provectus.kafka.ui.connect.model.NewConnector;
 import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
 import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.InternalSchemaRegistry;
+import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaConnectCluster;
+import com.provectus.kafka.ui.util.SecuredWebClient;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import java.io.FileInputStream;
+import java.security.KeyStore;
 import java.text.DateFormat;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
 import lombok.extern.slf4j.Slf4j;
 import org.openapitools.jackson.nullable.JsonNullableModule;
 import org.springframework.core.ParameterizedTypeReference;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpMethod;
 import org.springframework.http.MediaType;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
 import org.springframework.http.codec.json.Jackson2JsonDecoder;
 import org.springframework.http.codec.json.Jackson2JsonEncoder;
 import org.springframework.util.MultiValueMap;
+import org.springframework.util.ResourceUtils;
 import org.springframework.util.unit.DataSize;
 import org.springframework.web.client.RestClientException;
 import org.springframework.web.reactive.function.client.ExchangeStrategies;
@@ -32,6 +43,7 @@ import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
 import reactor.util.retry.Retry;
 
 @Slf4j
@@ -89,7 +101,7 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
     private static final ObjectMapper mapper = buildObjectMapper(dateFormat);
 
     public RetryingApiClient(KafkaConnectCluster config, DataSize maxBuffSize) {
-      super(buildWebClient(mapper, maxBuffSize), mapper, dateFormat);
+      super(buildWebClient(mapper, maxBuffSize, config), mapper, dateFormat);
       setBasePath(config.getAddress());
       setUsername(config.getUserName());
       setPassword(config.getPassword());
@@ -101,7 +113,7 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
       return dateFormat;
     }
 
-    public static WebClient buildWebClient(ObjectMapper mapper, DataSize maxBuffSize) {
+    public static WebClient buildWebClient(ObjectMapper mapper, DataSize maxBuffSize, KafkaConnectCluster config) {
       ExchangeStrategies strategies = ExchangeStrategies
               .builder()
               .codecs(clientDefaultCodecsConfigurer -> {
@@ -113,8 +125,20 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
                         .maxInMemorySize((int) maxBuffSize.toBytes());
               })
               .build();
-      WebClient.Builder webClient = WebClient.builder().exchangeStrategies(strategies);
-      return webClient.build();
+
+      try {
+        WebClient.Builder webClient = SecuredWebClient.configure(
+            config.getKeystoreLocation(),
+            config.getKeystorePassword(),
+            config.getTruststoreLocation(),
+            config.getTruststorePassword()
+        );
+
+        return webClient.exchangeStrategies(strategies).build();
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            "cannot create TLS configuration for kafka-connect cluster " + config.getName(), e);
+      }
     }
 
     public static ObjectMapper buildObjectMapper(DateFormat dateFormat) {

+ 4 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -55,6 +55,10 @@ public class ClustersProperties {
     String address;
     String userName;
     String password;
+    String keystoreLocation;
+    String keystorePassword;
+    String truststoreLocation;
+    String truststorePassword;
   }
 
   @Data

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaConnectCluster.java

@@ -13,4 +13,9 @@ public class KafkaConnectCluster {
   private final String address;
   private final String userName;
   private final String password;
+
+  private final String keystoreLocation;
+  private final String truststoreLocation;
+  private final String keystorePassword;
+  private final String truststorePassword;
 }