Compare commits

...
Sign in to create a new pull request.

6 commits

Author SHA1 Message Date
Roman Zabaluev
2433c5aa1f test 2023-08-08 20:14:56 +08:00
Roman Zabaluev
86ee3fc898 Simplify kafka verification 2023-08-04 22:20:51 +08:00
Roman Zabaluev
2030489183 Fix formatting 2023-08-02 20:50:16 +08:00
Roman Zabaluev
c4273e5c94
Merge branch 'master' into issues/4082 2023-08-02 19:44:27 +07:00
Roman Zabaluev
58000135b5 Fix formatting 2023-08-02 20:44:02 +08:00
Roman Zabaluev
1c781175e4 Implement a mechanism to skip SSL verification 2023-08-02 20:41:40 +08:00
10 changed files with 84 additions and 47 deletions

View file

@ -35,22 +35,30 @@ public class ClustersProperties {
public static class Cluster {
String name;
String bootstrapServers;
TruststoreConfig ssl;
String schemaRegistry;
SchemaRegistryAuth schemaRegistryAuth;
KeystoreConfig schemaRegistrySsl;
String ksqldbServer;
KsqldbServerAuth ksqldbServerAuth;
KeystoreConfig ksqldbServerSsl;
List<ConnectCluster> kafkaConnect;
MetricsConfigData metrics;
Map<String, Object> properties;
boolean readOnly = false;
List<SerdeConfig> serde;
String defaultKeySerde;
String defaultValueSerde;
List<Masking> masking;
MetricsConfigData metrics;
Map<String, Object> properties;
boolean readOnly = false;
Long pollingThrottleRate;
TruststoreConfig ssl;
List<Masking> masking;
AuditProperties audit;
}
@ -101,6 +109,16 @@ public class ClustersProperties {
public static class TruststoreConfig {
String truststoreLocation;
String truststorePassword;
boolean verifySsl = true;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"keystorePassword"})
public static class KeystoreConfig {
String keystoreLocation;
String keystorePassword;
}
@Data
@ -120,15 +138,6 @@ public class ClustersProperties {
String password;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"keystorePassword"})
public static class KeystoreConfig {
String keystoreLocation;
String keystorePassword;
}
@Data
public static class Masking {
Type type;
@ -178,6 +187,7 @@ public class ClustersProperties {
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> flattenClusterProperties(@Nullable String prefix,
@Nullable Map<String, Object> propertiesMap) {
Map<String, Object> flattened = new HashMap<>();

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
import java.io.Closeable;
import java.time.Instant;
import java.util.Map;
@ -42,7 +42,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
return Mono.fromSupplier(() -> {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);

View file

@ -10,7 +10,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@ -254,7 +254,7 @@ public class ConsumerGroupService {
public EnhancedConsumer createConsumer(KafkaCluster cluster,
Map<String, Object> properties) {
Properties props = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());

View file

@ -20,7 +20,7 @@ import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
@ -191,7 +191,7 @@ public class MessagesService {
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
Map<String, Object> additionalProps) {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

View file

@ -130,8 +130,8 @@ public class KsqlApiClient {
* Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like <p/>
* <code>[{"header":{"queryId":"...","schema":"..."}}, ]</code>
* which will cause json parsing error and will be propagated to UI.
* This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed.
* To workaround this we need to check DecodingException err msg.
* This is a know issue(<a href="https://github.com/confluentinc/ksql/issues/8746">...</a>), but we don't know when it will be fixed.
* To work around this we need to check DecodingException err msg.
*/
private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
return th instanceof DecodingException

View file

@ -0,0 +1,34 @@
package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.config.ClustersProperties;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.kafka.common.config.SslConfigs;
public final class KafkaClientSslPropertiesUtil {
private KafkaClientSslPropertiesUtil() {
}
public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
Properties sink) {
if (truststoreConfig == null) {
return;
}
if (truststoreConfig.getTruststoreLocation() == null) {
return;
}
sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());
if (truststoreConfig.getTruststorePassword() != null) {
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
}
if (!truststoreConfig.isVerifySsl()) {
sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
}
}
}

View file

@ -65,7 +65,7 @@ public final class KafkaServicesValidation {
@Nullable
TruststoreConfig ssl) {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(ssl, properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties);
properties.putAll(clusterProps);
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// editing properties to make validation faster

View file

@ -1,23 +0,0 @@
package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.config.ClustersProperties;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.kafka.common.config.SslConfigs;
public final class SslPropertiesUtil {
private SslPropertiesUtil() {
}
public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
Properties sink) {
if (truststoreConfig != null && truststoreConfig.getTruststoreLocation() != null) {
sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());
if (truststoreConfig.getTruststorePassword() != null) {
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
}
}
}
}

View file

@ -7,6 +7,7 @@ import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.ValidationException;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.function.Consumer;
@ -45,6 +46,10 @@ public class WebClientConfigurator {
public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
return configureNoSsl();
}
return configureSsl(
keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null,
@ -97,6 +102,17 @@ public class WebClientConfigurator {
return this;
}
@SneakyThrows
public WebClientConfigurator configureNoSsl() {
var contextBuilder = SslContextBuilder.forClient();
contextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
SslContext context = contextBuilder.build();
httpClient = httpClient.secure(t -> t.sslContext(context));
return this;
}
public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) {
if (username != null && password != null) {
builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password));

View file

@ -136,7 +136,7 @@ rbac:
actions: all
- resource: connect
value: "*"
value: ".*"
actions: all
- resource: ksql