Compare commits
6 commits
master
...
issues/408
Author | SHA1 | Date | |
---|---|---|---|
![]() |
2433c5aa1f | ||
![]() |
86ee3fc898 | ||
![]() |
2030489183 | ||
![]() |
c4273e5c94 | ||
![]() |
58000135b5 | ||
![]() |
1c781175e4 |
10 changed files with 84 additions and 47 deletions
|
@ -35,22 +35,30 @@ public class ClustersProperties {
|
|||
public static class Cluster {
|
||||
String name;
|
||||
String bootstrapServers;
|
||||
|
||||
TruststoreConfig ssl;
|
||||
|
||||
String schemaRegistry;
|
||||
SchemaRegistryAuth schemaRegistryAuth;
|
||||
KeystoreConfig schemaRegistrySsl;
|
||||
|
||||
String ksqldbServer;
|
||||
KsqldbServerAuth ksqldbServerAuth;
|
||||
KeystoreConfig ksqldbServerSsl;
|
||||
|
||||
List<ConnectCluster> kafkaConnect;
|
||||
MetricsConfigData metrics;
|
||||
Map<String, Object> properties;
|
||||
boolean readOnly = false;
|
||||
|
||||
List<SerdeConfig> serde;
|
||||
String defaultKeySerde;
|
||||
String defaultValueSerde;
|
||||
List<Masking> masking;
|
||||
|
||||
MetricsConfigData metrics;
|
||||
Map<String, Object> properties;
|
||||
boolean readOnly = false;
|
||||
Long pollingThrottleRate;
|
||||
TruststoreConfig ssl;
|
||||
|
||||
List<Masking> masking;
|
||||
|
||||
AuditProperties audit;
|
||||
}
|
||||
|
||||
|
@ -101,6 +109,16 @@ public class ClustersProperties {
|
|||
public static class TruststoreConfig {
|
||||
String truststoreLocation;
|
||||
String truststorePassword;
|
||||
boolean verifySsl = true;
|
||||
}
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@ToString(exclude = {"keystorePassword"})
|
||||
public static class KeystoreConfig {
|
||||
String keystoreLocation;
|
||||
String keystorePassword;
|
||||
}
|
||||
|
||||
@Data
|
||||
|
@ -120,15 +138,6 @@ public class ClustersProperties {
|
|||
String password;
|
||||
}
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@ToString(exclude = {"keystorePassword"})
|
||||
public static class KeystoreConfig {
|
||||
String keystoreLocation;
|
||||
String keystorePassword;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Masking {
|
||||
Type type;
|
||||
|
@ -178,6 +187,7 @@ public class ClustersProperties {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, Object> flattenClusterProperties(@Nullable String prefix,
|
||||
@Nullable Map<String, Object> propertiesMap) {
|
||||
Map<String, Object> flattened = new HashMap<>();
|
||||
|
|
|
@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service;
|
|||
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
||||
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
|
||||
import java.io.Closeable;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
@ -42,7 +42,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
|
|||
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
|
||||
return Mono.fromSupplier(() -> {
|
||||
Properties properties = new Properties();
|
||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
properties.putAll(cluster.getProperties());
|
||||
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
||||
|
|
|
@ -10,7 +10,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
|
|||
import com.provectus.kafka.ui.model.SortOrderDTO;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import com.provectus.kafka.ui.util.ApplicationMetrics;
|
||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
||||
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
|
@ -254,7 +254,7 @@ public class ConsumerGroupService {
|
|||
public EnhancedConsumer createConsumer(KafkaCluster cluster,
|
||||
Map<String, Object> properties) {
|
||||
Properties props = new Properties();
|
||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
|
||||
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
|
||||
props.putAll(cluster.getProperties());
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||
|
|
|
@ -20,7 +20,7 @@ import com.provectus.kafka.ui.model.TopicMessageDTO;
|
|||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
|
||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
||||
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
|
@ -191,7 +191,7 @@ public class MessagesService {
|
|||
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
|
||||
Map<String, Object> additionalProps) {
|
||||
Properties properties = new Properties();
|
||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
properties.putAll(cluster.getProperties());
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
|
|
|
@ -130,8 +130,8 @@ public class KsqlApiClient {
|
|||
* Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like <p/>
|
||||
* <code>[{"header":{"queryId":"...","schema":"..."}}, ]</code>
|
||||
* which will cause json parsing error and will be propagated to UI.
|
||||
* This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed.
|
||||
* To workaround this we need to check DecodingException err msg.
|
||||
* This is a know issue(<a href="https://github.com/confluentinc/ksql/issues/8746">...</a>), but we don't know when it will be fixed.
|
||||
* To work around this we need to check DecodingException err msg.
|
||||
*/
|
||||
private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
|
||||
return th instanceof DecodingException
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
package com.provectus.kafka.ui.util;
|
||||
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import java.util.Properties;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
|
||||
public final class KafkaClientSslPropertiesUtil {
|
||||
|
||||
private KafkaClientSslPropertiesUtil() {
|
||||
}
|
||||
|
||||
public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
|
||||
Properties sink) {
|
||||
if (truststoreConfig == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (truststoreConfig.getTruststoreLocation() == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());
|
||||
|
||||
if (truststoreConfig.getTruststorePassword() != null) {
|
||||
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
|
||||
}
|
||||
|
||||
if (!truststoreConfig.isVerifySsl()) {
|
||||
sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -65,7 +65,7 @@ public final class KafkaServicesValidation {
|
|||
@Nullable
|
||||
TruststoreConfig ssl) {
|
||||
Properties properties = new Properties();
|
||||
SslPropertiesUtil.addKafkaSslProperties(ssl, properties);
|
||||
KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties);
|
||||
properties.putAll(clusterProps);
|
||||
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
// editing properties to make validation faster
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
package com.provectus.kafka.ui.util;
|
||||
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import java.util.Properties;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
|
||||
public final class SslPropertiesUtil {
|
||||
|
||||
private SslPropertiesUtil() {
|
||||
}
|
||||
|
||||
public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
|
||||
Properties sink) {
|
||||
if (truststoreConfig != null && truststoreConfig.getTruststoreLocation() != null) {
|
||||
sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());
|
||||
if (truststoreConfig.getTruststorePassword() != null) {
|
||||
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -7,6 +7,7 @@ import com.provectus.kafka.ui.config.ClustersProperties;
|
|||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
|
||||
import java.io.FileInputStream;
|
||||
import java.security.KeyStore;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -45,6 +46,10 @@ public class WebClientConfigurator {
|
|||
|
||||
public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
|
||||
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
|
||||
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
|
||||
return configureNoSsl();
|
||||
}
|
||||
|
||||
return configureSsl(
|
||||
keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
|
||||
keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null,
|
||||
|
@ -97,6 +102,17 @@ public class WebClientConfigurator {
|
|||
return this;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public WebClientConfigurator configureNoSsl() {
|
||||
var contextBuilder = SslContextBuilder.forClient();
|
||||
contextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
|
||||
|
||||
SslContext context = contextBuilder.build();
|
||||
|
||||
httpClient = httpClient.secure(t -> t.sslContext(context));
|
||||
return this;
|
||||
}
|
||||
|
||||
public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) {
|
||||
if (username != null && password != null) {
|
||||
builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password));
|
||||
|
|
|
@ -136,7 +136,7 @@ rbac:
|
|||
actions: all
|
||||
|
||||
- resource: connect
|
||||
value: "*"
|
||||
value: ".*"
|
||||
actions: all
|
||||
|
||||
- resource: ksql
|
||||
|
|
Loading…
Add table
Reference in a new issue