Implement a mechanism to skip SSL verification
This commit is contained in:
parent
8126607b91
commit
1c781175e4
11 changed files with 154 additions and 47 deletions
|
@ -35,22 +35,30 @@ public class ClustersProperties {
|
||||||
public static class Cluster {
|
public static class Cluster {
|
||||||
String name;
|
String name;
|
||||||
String bootstrapServers;
|
String bootstrapServers;
|
||||||
|
|
||||||
|
TruststoreConfig ssl;
|
||||||
|
|
||||||
String schemaRegistry;
|
String schemaRegistry;
|
||||||
SchemaRegistryAuth schemaRegistryAuth;
|
SchemaRegistryAuth schemaRegistryAuth;
|
||||||
KeystoreConfig schemaRegistrySsl;
|
KeystoreConfig schemaRegistrySsl;
|
||||||
|
|
||||||
String ksqldbServer;
|
String ksqldbServer;
|
||||||
KsqldbServerAuth ksqldbServerAuth;
|
KsqldbServerAuth ksqldbServerAuth;
|
||||||
KeystoreConfig ksqldbServerSsl;
|
KeystoreConfig ksqldbServerSsl;
|
||||||
|
|
||||||
List<ConnectCluster> kafkaConnect;
|
List<ConnectCluster> kafkaConnect;
|
||||||
MetricsConfigData metrics;
|
|
||||||
Map<String, Object> properties;
|
|
||||||
boolean readOnly = false;
|
|
||||||
List<SerdeConfig> serde;
|
List<SerdeConfig> serde;
|
||||||
String defaultKeySerde;
|
String defaultKeySerde;
|
||||||
String defaultValueSerde;
|
String defaultValueSerde;
|
||||||
List<Masking> masking;
|
|
||||||
|
MetricsConfigData metrics;
|
||||||
|
Map<String, Object> properties;
|
||||||
|
boolean readOnly = false;
|
||||||
Long pollingThrottleRate;
|
Long pollingThrottleRate;
|
||||||
TruststoreConfig ssl;
|
|
||||||
|
List<Masking> masking;
|
||||||
|
|
||||||
AuditProperties audit;
|
AuditProperties audit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,6 +109,16 @@ public class ClustersProperties {
|
||||||
public static class TruststoreConfig {
|
public static class TruststoreConfig {
|
||||||
String truststoreLocation;
|
String truststoreLocation;
|
||||||
String truststorePassword;
|
String truststorePassword;
|
||||||
|
boolean verifySSL = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
@ToString(exclude = {"keystorePassword"})
|
||||||
|
public static class KeystoreConfig {
|
||||||
|
String keystoreLocation;
|
||||||
|
String keystorePassword;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
|
@ -120,15 +138,6 @@ public class ClustersProperties {
|
||||||
String password;
|
String password;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Data
|
|
||||||
@NoArgsConstructor
|
|
||||||
@AllArgsConstructor
|
|
||||||
@ToString(exclude = {"keystorePassword"})
|
|
||||||
public static class KeystoreConfig {
|
|
||||||
String keystoreLocation;
|
|
||||||
String keystorePassword;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
public static class Masking {
|
public static class Masking {
|
||||||
Type type;
|
Type type;
|
||||||
|
@ -178,6 +187,7 @@ public class ClustersProperties {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private Map<String, Object> flattenClusterProperties(@Nullable String prefix,
|
private Map<String, Object> flattenClusterProperties(@Nullable String prefix,
|
||||||
@Nullable Map<String, Object> propertiesMap) {
|
@Nullable Map<String, Object> propertiesMap) {
|
||||||
Map<String, Object> flattened = new HashMap<>();
|
Map<String, Object> flattened = new HashMap<>();
|
||||||
|
|
|
@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -42,7 +42,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
|
||||||
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
|
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
|
||||||
return Mono.fromSupplier(() -> {
|
return Mono.fromSupplier(() -> {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||||
properties.putAll(cluster.getProperties());
|
properties.putAll(cluster.getProperties());
|
||||||
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||||
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
||||||
|
|
|
@ -10,7 +10,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
import com.provectus.kafka.ui.model.SortOrderDTO;
|
import com.provectus.kafka.ui.model.SortOrderDTO;
|
||||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||||
import com.provectus.kafka.ui.util.ApplicationMetrics;
|
import com.provectus.kafka.ui.util.ApplicationMetrics;
|
||||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -254,7 +254,7 @@ public class ConsumerGroupService {
|
||||||
public EnhancedConsumer createConsumer(KafkaCluster cluster,
|
public EnhancedConsumer createConsumer(KafkaCluster cluster,
|
||||||
Map<String, Object> properties) {
|
Map<String, Object> properties) {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
|
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
|
||||||
props.putAll(cluster.getProperties());
|
props.putAll(cluster.getProperties());
|
||||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
|
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
|
||||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||||
|
|
|
@ -20,7 +20,7 @@ import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||||
import com.provectus.kafka.ui.serde.api.Serde;
|
import com.provectus.kafka.ui.serde.api.Serde;
|
||||||
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
|
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
|
||||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.OffsetDateTime;
|
import java.time.OffsetDateTime;
|
||||||
import java.time.ZoneOffset;
|
import java.time.ZoneOffset;
|
||||||
|
@ -191,7 +191,7 @@ public class MessagesService {
|
||||||
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
|
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
|
||||||
Map<String, Object> additionalProps) {
|
Map<String, Object> additionalProps) {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||||
properties.putAll(cluster.getProperties());
|
properties.putAll(cluster.getProperties());
|
||||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||||
|
|
|
@ -130,7 +130,7 @@ public class KsqlApiClient {
|
||||||
* Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like <p/>
|
* Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like <p/>
|
||||||
* <code>[{"header":{"queryId":"...","schema":"..."}}, ]</code>
|
* <code>[{"header":{"queryId":"...","schema":"..."}}, ]</code>
|
||||||
* which will cause json parsing error and will be propagated to UI.
|
* which will cause json parsing error and will be propagated to UI.
|
||||||
* This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed.
|
* This is a know issue(<a href="https://github.com/confluentinc/ksql/issues/8746">...</a>), but we don't know when it will be fixed.
|
||||||
* To work around this we need to check DecodingException err msg.
|
* To work around this we need to check DecodingException err msg.
|
||||||
*/
|
*/
|
||||||
private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
|
private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
package com.provectus.kafka.ui.util;
|
||||||
|
|
||||||
|
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.KeyStore;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.security.SecureRandom;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
import javax.net.ssl.SSLEngine;
|
||||||
|
import org.apache.kafka.common.security.auth.SslEngineFactory;
|
||||||
|
|
||||||
|
public class InsecureSslEngineFactory implements SslEngineFactory {
|
||||||
|
|
||||||
|
private SSLContext sslContext;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) {
|
||||||
|
var trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();
|
||||||
|
try {
|
||||||
|
this.sslContext = SSLContext.getInstance("SSL");
|
||||||
|
sslContext.init(null, trustManagers, new SecureRandom());
|
||||||
|
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
|
||||||
|
sslEngine.setUseClientMode(true);
|
||||||
|
return sslEngine;
|
||||||
|
} catch (NoSuchAlgorithmException | KeyManagementException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SSLEngine createServerSslEngine(String peerHost, int peerPort) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> reconfigurableConfigs() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public KeyStore keystore() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public KeyStore truststore() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
this.sslContext = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Map<String, ?> configs) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
package com.provectus.kafka.ui.util;
|
||||||
|
|
||||||
|
import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG;
|
||||||
|
|
||||||
|
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||||
|
import java.util.Properties;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import org.apache.kafka.common.config.SslConfigs;
|
||||||
|
|
||||||
|
public final class KafkaClientSslPropertiesUtil {
|
||||||
|
|
||||||
|
private KafkaClientSslPropertiesUtil() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
|
||||||
|
Properties sink) {
|
||||||
|
if (truststoreConfig == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!truststoreConfig.isVerifySSL()) {
|
||||||
|
sink.put(SSL_ENGINE_FACTORY_CLASS_CONFIG, InsecureSslEngineFactory.class);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (truststoreConfig.getTruststoreLocation() == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());
|
||||||
|
|
||||||
|
if (truststoreConfig.getTruststorePassword() != null) {
|
||||||
|
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -65,7 +65,7 @@ public final class KafkaServicesValidation {
|
||||||
@Nullable
|
@Nullable
|
||||||
TruststoreConfig ssl) {
|
TruststoreConfig ssl) {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
SslPropertiesUtil.addKafkaSslProperties(ssl, properties);
|
KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties);
|
||||||
properties.putAll(clusterProps);
|
properties.putAll(clusterProps);
|
||||||
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||||
// editing properties to make validation faster
|
// editing properties to make validation faster
|
||||||
|
|
|
@ -1,23 +0,0 @@
|
||||||
package com.provectus.kafka.ui.util;
|
|
||||||
|
|
||||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
|
||||||
import java.util.Properties;
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import org.apache.kafka.common.config.SslConfigs;
|
|
||||||
|
|
||||||
public final class SslPropertiesUtil {
|
|
||||||
|
|
||||||
private SslPropertiesUtil() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
|
|
||||||
Properties sink) {
|
|
||||||
if (truststoreConfig != null && truststoreConfig.getTruststoreLocation() != null) {
|
|
||||||
sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());
|
|
||||||
if (truststoreConfig.getTruststorePassword() != null) {
|
|
||||||
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -7,6 +7,7 @@ import com.provectus.kafka.ui.config.ClustersProperties;
|
||||||
import com.provectus.kafka.ui.exception.ValidationException;
|
import com.provectus.kafka.ui.exception.ValidationException;
|
||||||
import io.netty.handler.ssl.SslContext;
|
import io.netty.handler.ssl.SslContext;
|
||||||
import io.netty.handler.ssl.SslContextBuilder;
|
import io.netty.handler.ssl.SslContextBuilder;
|
||||||
|
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.security.KeyStore;
|
import java.security.KeyStore;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
@ -45,6 +46,10 @@ public class WebClientConfigurator {
|
||||||
|
|
||||||
public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
|
public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
|
||||||
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
|
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
|
||||||
|
if (truststoreConfig != null && !truststoreConfig.isVerifySSL()) {
|
||||||
|
return configureNoSSL();
|
||||||
|
}
|
||||||
|
|
||||||
return configureSsl(
|
return configureSsl(
|
||||||
keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
|
keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
|
||||||
keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null,
|
keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null,
|
||||||
|
@ -97,6 +102,17 @@ public class WebClientConfigurator {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
public WebClientConfigurator configureNoSSL() {
|
||||||
|
var contextBuilder = SslContextBuilder.forClient();
|
||||||
|
contextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
|
||||||
|
|
||||||
|
SslContext context = contextBuilder.build();
|
||||||
|
|
||||||
|
httpClient = httpClient.secure(t -> t.sslContext(context));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) {
|
public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) {
|
||||||
if (username != null && password != null) {
|
if (username != null && password != null) {
|
||||||
builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password));
|
builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password));
|
||||||
|
|
|
@ -136,7 +136,7 @@ rbac:
|
||||||
actions: all
|
actions: all
|
||||||
|
|
||||||
- resource: connect
|
- resource: connect
|
||||||
value: "*"
|
value: ".*"
|
||||||
actions: all
|
actions: all
|
||||||
|
|
||||||
- resource: ksql
|
- resource: ksql
|
||||||
|
|
Loading…
Add table
Reference in a new issue