diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 33763ed9b8..63e9e74c14 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -35,22 +35,30 @@ public class ClustersProperties { public static class Cluster { String name; String bootstrapServers; + + TruststoreConfig ssl; + String schemaRegistry; SchemaRegistryAuth schemaRegistryAuth; KeystoreConfig schemaRegistrySsl; + String ksqldbServer; KsqldbServerAuth ksqldbServerAuth; KeystoreConfig ksqldbServerSsl; + List kafkaConnect; - MetricsConfigData metrics; - Map properties; - boolean readOnly = false; + List serde; String defaultKeySerde; String defaultValueSerde; - List masking; + + MetricsConfigData metrics; + Map properties; + boolean readOnly = false; Long pollingThrottleRate; - TruststoreConfig ssl; + + List masking; + AuditProperties audit; } @@ -101,6 +109,16 @@ public class ClustersProperties { public static class TruststoreConfig { String truststoreLocation; String truststorePassword; + boolean verifySSL = true; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + @ToString(exclude = {"keystorePassword"}) + public static class KeystoreConfig { + String keystoreLocation; + String keystorePassword; } @Data @@ -120,15 +138,6 @@ public class ClustersProperties { String password; } - @Data - @NoArgsConstructor - @AllArgsConstructor - @ToString(exclude = {"keystorePassword"}) - public static class KeystoreConfig { - String keystoreLocation; - String keystorePassword; - } - @Data public static class Masking { Type type; @@ -178,6 +187,7 @@ public class ClustersProperties { } } + @SuppressWarnings("unchecked") private Map flattenClusterProperties(@Nullable String prefix, @Nullable Map propertiesMap) { Map flattened = new HashMap<>(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java index 1bd4d7e33e..9695bf2dbd 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java @@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.util.SslPropertiesUtil; +import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil; import java.io.Closeable; import java.time.Instant; import java.util.Map; @@ -42,7 +42,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable { private Mono createAdminClient(KafkaCluster cluster) { return Mono.fromSupplier(() -> { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java index 9764664d6a..ccb2a18691 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java @@ -10,7 +10,7 @@ import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.SortOrderDTO; import com.provectus.kafka.ui.service.rbac.AccessControlService; import com.provectus.kafka.ui.util.ApplicationMetrics; -import com.provectus.kafka.ui.util.SslPropertiesUtil; +import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -254,7 +254,7 @@ public class ConsumerGroupService { public EnhancedConsumer createConsumer(KafkaCluster cluster, Map properties) { Properties props = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props); props.putAll(cluster.getProperties()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index dcc122ba28..7ab4b60b5b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -20,7 +20,7 @@ import com.provectus.kafka.ui.model.TopicMessageDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serde.api.Serde; import com.provectus.kafka.ui.serdes.ProducerRecordCreator; -import com.provectus.kafka.ui.util.SslPropertiesUtil; +import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; @@ -191,7 +191,7 @@ public class MessagesService { public static KafkaProducer createProducer(KafkaCluster cluster, Map additionalProps) { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java index e8f4954bf0..4673ee1b03 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java @@ -130,8 +130,8 @@ public class KsqlApiClient { * Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like

* [{"header":{"queryId":"...","schema":"..."}}, ] * which will cause json parsing error and will be propagated to UI. - * This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed. - * To workaround this we need to check DecodingException err msg. + * This is a know issue(...), but we don't know when it will be fixed. + * To work around this we need to check DecodingException err msg. */ private boolean isUnexpectedJsonArrayEndCharException(Throwable th) { return th instanceof DecodingException diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/InsecureSslEngineFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/InsecureSslEngineFactory.java new file mode 100644 index 0000000000..3a1e65054c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/InsecureSslEngineFactory.java @@ -0,0 +1,67 @@ +package com.provectus.kafka.ui.util; + +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.Map; +import java.util.Set; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import org.apache.kafka.common.security.auth.SslEngineFactory; + +public class InsecureSslEngineFactory implements SslEngineFactory { + + private SSLContext sslContext; + + @Override + public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) { + var trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers(); + try { + this.sslContext = SSLContext.getInstance("SSL"); + sslContext.init(null, trustManagers, new SecureRandom()); + SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort); + sslEngine.setUseClientMode(true); + return sslEngine; + } catch (NoSuchAlgorithmException | KeyManagementException e) { + throw new RuntimeException(e); + } + } + + @Override + public SSLEngine createServerSslEngine(String peerHost, int peerPort) { + return null; + } + + @Override + public boolean shouldBeRebuilt(Map nextConfigs) { + return false; + } + + @Override + public Set reconfigurableConfigs() { + return null; + } + + @Override + public KeyStore keystore() { + return null; + } + + @Override + public KeyStore truststore() { + return null; + } + + @Override + public void close() { + this.sslContext = null; + } + + @Override + public void configure(Map configs) { + + } +} + diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaClientSslPropertiesUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaClientSslPropertiesUtil.java new file mode 100644 index 0000000000..b8a934e3ce --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaClientSslPropertiesUtil.java @@ -0,0 +1,37 @@ +package com.provectus.kafka.ui.util; + +import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG; + +import com.provectus.kafka.ui.config.ClustersProperties; +import java.util.Properties; +import javax.annotation.Nullable; +import org.apache.kafka.common.config.SslConfigs; + +public final class KafkaClientSslPropertiesUtil { + + private KafkaClientSslPropertiesUtil() { + } + + public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, + Properties sink) { + if (truststoreConfig == null) { + return; + } + + if (!truststoreConfig.isVerifySSL()) { + sink.put(SSL_ENGINE_FACTORY_CLASS_CONFIG, InsecureSslEngineFactory.class); + return; + } + + if (truststoreConfig.getTruststoreLocation() == null) { + return; + } + + sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation()); + + if (truststoreConfig.getTruststorePassword() != null) { + sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword()); + } + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java index 4b8af81f85..914a3ce92d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java @@ -65,7 +65,7 @@ public final class KafkaServicesValidation { @Nullable TruststoreConfig ssl) { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(ssl, properties); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties); properties.putAll(clusterProps); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // editing properties to make validation faster diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SslPropertiesUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SslPropertiesUtil.java deleted file mode 100644 index 4d157fbcb5..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/SslPropertiesUtil.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.provectus.kafka.ui.util; - -import com.provectus.kafka.ui.config.ClustersProperties; -import java.util.Properties; -import javax.annotation.Nullable; -import org.apache.kafka.common.config.SslConfigs; - -public final class SslPropertiesUtil { - - private SslPropertiesUtil() { - } - - public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, - Properties sink) { - if (truststoreConfig != null && truststoreConfig.getTruststoreLocation() != null) { - sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation()); - if (truststoreConfig.getTruststorePassword() != null) { - sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword()); - } - } - } - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java index c5aca5ad71..1e91a2c316 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/WebClientConfigurator.java @@ -7,6 +7,7 @@ import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.exception.ValidationException; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.FileInputStream; import java.security.KeyStore; import java.util.function.Consumer; @@ -45,6 +46,10 @@ public class WebClientConfigurator { public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, @Nullable ClustersProperties.KeystoreConfig keystoreConfig) { + if (truststoreConfig != null && !truststoreConfig.isVerifySSL()) { + return configureNoSSL(); + } + return configureSsl( keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null, keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null, @@ -97,6 +102,17 @@ public class WebClientConfigurator { return this; } + @SneakyThrows + public WebClientConfigurator configureNoSSL() { + var contextBuilder = SslContextBuilder.forClient(); + contextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); + + SslContext context = contextBuilder.build(); + + httpClient = httpClient.secure(t -> t.sslContext(context)); + return this; + } + public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) { if (username != null && password != null) { builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password)); diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index 7848f1fdc4..149e414acf 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -136,7 +136,7 @@ rbac: actions: all - resource: connect - value: "*" + value: ".*" actions: all - resource: ksql