|
@@ -1,5 +1,12 @@
|
|
package com.provectus.kafka.ui.service;
|
|
package com.provectus.kafka.ui.service;
|
|
|
|
|
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateClusterConnection;
|
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateConnect;
|
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateKsql;
|
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validatePrometheusStore;
|
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateSchemaRegistry;
|
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateTruststore;
|
|
|
|
+
|
|
import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
|
|
import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
|
|
import com.provectus.kafka.ui.config.ClustersProperties;
|
|
import com.provectus.kafka.ui.config.ClustersProperties;
|
|
import com.provectus.kafka.ui.config.WebclientProperties;
|
|
import com.provectus.kafka.ui.config.WebclientProperties;
|
|
@@ -8,9 +15,10 @@ import com.provectus.kafka.ui.emitter.PollingSettings;
|
|
import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
|
|
import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
|
|
import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
|
|
import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
-import com.provectus.kafka.ui.model.MetricsConfig;
|
|
|
|
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
|
|
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
|
|
import com.provectus.kafka.ui.service.masking.DataMasking;
|
|
import com.provectus.kafka.ui.service.masking.DataMasking;
|
|
|
|
+import com.provectus.kafka.ui.service.metrics.scrape.MetricsScrapping;
|
|
|
|
+import com.provectus.kafka.ui.service.metrics.scrape.jmx.JmxMetricsRetriever;
|
|
import com.provectus.kafka.ui.sr.ApiClient;
|
|
import com.provectus.kafka.ui.sr.ApiClient;
|
|
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
|
|
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
|
|
import com.provectus.kafka.ui.util.KafkaServicesValidation;
|
|
import com.provectus.kafka.ui.util.KafkaServicesValidation;
|
|
@@ -22,11 +30,12 @@ import java.util.Map;
|
|
import java.util.Optional;
|
|
import java.util.Optional;
|
|
import java.util.Properties;
|
|
import java.util.Properties;
|
|
import java.util.stream.Stream;
|
|
import java.util.stream.Stream;
|
|
-import javax.annotation.Nullable;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
+import org.springframework.util.StringUtils;
|
|
import org.springframework.util.unit.DataSize;
|
|
import org.springframework.util.unit.DataSize;
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
|
|
+import prometheus.query.api.PrometheusClientApi;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.util.function.Tuple2;
|
|
import reactor.util.function.Tuple2;
|
|
@@ -39,11 +48,13 @@ public class KafkaClusterFactory {
|
|
private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
|
|
private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
|
|
|
|
|
|
private final DataSize webClientMaxBuffSize;
|
|
private final DataSize webClientMaxBuffSize;
|
|
|
|
+ private final JmxMetricsRetriever jmxMetricsRetriever;
|
|
|
|
|
|
- public KafkaClusterFactory(WebclientProperties webclientProperties) {
|
|
|
|
|
|
+ public KafkaClusterFactory(WebclientProperties webclientProperties, JmxMetricsRetriever jmxMetricsRetriever) {
|
|
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
|
|
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
|
|
.map(DataSize::parse)
|
|
.map(DataSize::parse)
|
|
.orElse(DEFAULT_WEBCLIENT_BUFFER);
|
|
.orElse(DEFAULT_WEBCLIENT_BUFFER);
|
|
|
|
+ this.jmxMetricsRetriever = jmxMetricsRetriever;
|
|
}
|
|
}
|
|
|
|
|
|
public KafkaCluster create(ClustersProperties properties,
|
|
public KafkaCluster create(ClustersProperties properties,
|
|
@@ -54,8 +65,10 @@ public class KafkaClusterFactory {
|
|
builder.bootstrapServers(clusterProperties.getBootstrapServers());
|
|
builder.bootstrapServers(clusterProperties.getBootstrapServers());
|
|
builder.properties(convertProperties(clusterProperties.getProperties()));
|
|
builder.properties(convertProperties(clusterProperties.getProperties()));
|
|
builder.readOnly(clusterProperties.isReadOnly());
|
|
builder.readOnly(clusterProperties.isReadOnly());
|
|
|
|
+ builder.exposeMetricsViaPrometheusEndpoint(exposeMetricsViaPrometheusEndpoint(clusterProperties));
|
|
builder.masking(DataMasking.create(clusterProperties.getMasking()));
|
|
builder.masking(DataMasking.create(clusterProperties.getMasking()));
|
|
builder.pollingSettings(PollingSettings.create(clusterProperties, properties));
|
|
builder.pollingSettings(PollingSettings.create(clusterProperties, properties));
|
|
|
|
+ builder.metricsScrapping(MetricsScrapping.create(clusterProperties, jmxMetricsRetriever));
|
|
|
|
|
|
if (schemaRegistryConfigured(clusterProperties)) {
|
|
if (schemaRegistryConfigured(clusterProperties)) {
|
|
builder.schemaRegistryClient(schemaRegistryClient(clusterProperties));
|
|
builder.schemaRegistryClient(schemaRegistryClient(clusterProperties));
|
|
@@ -66,8 +79,8 @@ public class KafkaClusterFactory {
|
|
if (ksqlConfigured(clusterProperties)) {
|
|
if (ksqlConfigured(clusterProperties)) {
|
|
builder.ksqlClient(ksqlClient(clusterProperties));
|
|
builder.ksqlClient(ksqlClient(clusterProperties));
|
|
}
|
|
}
|
|
- if (metricsConfigured(clusterProperties)) {
|
|
|
|
- builder.metricsConfig(metricsConfigDataToMetricsConfig(clusterProperties.getMetrics()));
|
|
|
|
|
|
+ if (prometheusStorageConfigured(clusterProperties)) {
|
|
|
|
+ builder.prometheusStorageClient(prometheusStorageClient(clusterProperties));
|
|
}
|
|
}
|
|
builder.originalProperties(clusterProperties);
|
|
builder.originalProperties(clusterProperties);
|
|
return builder.build();
|
|
return builder.build();
|
|
@@ -75,7 +88,7 @@ public class KafkaClusterFactory {
|
|
|
|
|
|
public Mono<ClusterConfigValidationDTO> validate(ClustersProperties.Cluster clusterProperties) {
|
|
public Mono<ClusterConfigValidationDTO> validate(ClustersProperties.Cluster clusterProperties) {
|
|
if (clusterProperties.getSsl() != null) {
|
|
if (clusterProperties.getSsl() != null) {
|
|
- Optional<String> errMsg = KafkaServicesValidation.validateTruststore(clusterProperties.getSsl());
|
|
|
|
|
|
+ Optional<String> errMsg = validateTruststore(clusterProperties.getSsl());
|
|
if (errMsg.isPresent()) {
|
|
if (errMsg.isPresent()) {
|
|
return Mono.just(new ClusterConfigValidationDTO()
|
|
return Mono.just(new ClusterConfigValidationDTO()
|
|
.kafka(new ApplicationPropertyValidationDTO()
|
|
.kafka(new ApplicationPropertyValidationDTO()
|
|
@@ -85,40 +98,51 @@ public class KafkaClusterFactory {
|
|
}
|
|
}
|
|
|
|
|
|
return Mono.zip(
|
|
return Mono.zip(
|
|
- KafkaServicesValidation.validateClusterConnection(
|
|
|
|
|
|
+ validateClusterConnection(
|
|
clusterProperties.getBootstrapServers(),
|
|
clusterProperties.getBootstrapServers(),
|
|
convertProperties(clusterProperties.getProperties()),
|
|
convertProperties(clusterProperties.getProperties()),
|
|
clusterProperties.getSsl()
|
|
clusterProperties.getSsl()
|
|
),
|
|
),
|
|
schemaRegistryConfigured(clusterProperties)
|
|
schemaRegistryConfigured(clusterProperties)
|
|
- ? KafkaServicesValidation.validateSchemaRegistry(
|
|
|
|
- () -> schemaRegistryClient(clusterProperties)).map(Optional::of)
|
|
|
|
|
|
+ ? validateSchemaRegistry(() -> schemaRegistryClient(clusterProperties)).map(Optional::of)
|
|
: Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),
|
|
: Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),
|
|
|
|
|
|
ksqlConfigured(clusterProperties)
|
|
ksqlConfigured(clusterProperties)
|
|
- ? KafkaServicesValidation.validateKsql(() -> ksqlClient(clusterProperties)).map(Optional::of)
|
|
|
|
|
|
+ ? validateKsql(() -> ksqlClient(clusterProperties)).map(Optional::of)
|
|
: Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),
|
|
: Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),
|
|
|
|
|
|
connectClientsConfigured(clusterProperties)
|
|
connectClientsConfigured(clusterProperties)
|
|
?
|
|
?
|
|
Flux.fromIterable(clusterProperties.getKafkaConnect())
|
|
Flux.fromIterable(clusterProperties.getKafkaConnect())
|
|
.flatMap(c ->
|
|
.flatMap(c ->
|
|
- KafkaServicesValidation.validateConnect(() -> connectClient(clusterProperties, c))
|
|
|
|
|
|
+ validateConnect(() -> connectClient(clusterProperties, c))
|
|
.map(r -> Tuples.of(c.getName(), r)))
|
|
.map(r -> Tuples.of(c.getName(), r)))
|
|
.collectMap(Tuple2::getT1, Tuple2::getT2)
|
|
.collectMap(Tuple2::getT1, Tuple2::getT2)
|
|
.map(Optional::of)
|
|
.map(Optional::of)
|
|
:
|
|
:
|
|
- Mono.<Optional<Map<String, ApplicationPropertyValidationDTO>>>just(Optional.empty())
|
|
|
|
|
|
+ Mono.<Optional<Map<String, ApplicationPropertyValidationDTO>>>just(Optional.empty()),
|
|
|
|
+
|
|
|
|
+ prometheusStorageConfigured(clusterProperties)
|
|
|
|
+ ? validatePrometheusStore(() -> prometheusStorageClient(clusterProperties)).map(Optional::of)
|
|
|
|
+ : Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty())
|
|
|
|
+
|
|
).map(tuple -> {
|
|
).map(tuple -> {
|
|
var validation = new ClusterConfigValidationDTO();
|
|
var validation = new ClusterConfigValidationDTO();
|
|
validation.kafka(tuple.getT1());
|
|
validation.kafka(tuple.getT1());
|
|
tuple.getT2().ifPresent(validation::schemaRegistry);
|
|
tuple.getT2().ifPresent(validation::schemaRegistry);
|
|
tuple.getT3().ifPresent(validation::ksqldb);
|
|
tuple.getT3().ifPresent(validation::ksqldb);
|
|
tuple.getT4().ifPresent(validation::kafkaConnects);
|
|
tuple.getT4().ifPresent(validation::kafkaConnects);
|
|
|
|
+ tuple.getT5().ifPresent(validation::prometheusStorage);
|
|
return validation;
|
|
return validation;
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private boolean exposeMetricsViaPrometheusEndpoint(ClustersProperties.Cluster clusterProperties) {
|
|
|
|
+ return Optional.ofNullable(clusterProperties.getMetrics())
|
|
|
|
+ .map(m -> m.getPrometheusExpose() == null || m.getPrometheusExpose())
|
|
|
|
+ .orElse(true);
|
|
|
|
+ }
|
|
|
|
+
|
|
private Properties convertProperties(Map<String, Object> propertiesMap) {
|
|
private Properties convertProperties(Map<String, Object> propertiesMap) {
|
|
Properties properties = new Properties();
|
|
Properties properties = new Properties();
|
|
if (propertiesMap != null) {
|
|
if (propertiesMap != null) {
|
|
@@ -153,6 +177,28 @@ public class KafkaClusterFactory {
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private ReactiveFailover<PrometheusClientApi> prometheusStorageClient(ClustersProperties.Cluster cluster) {
|
|
|
|
+ WebClient webClient = new WebClientConfigurator()
|
|
|
|
+ .configureSsl(cluster.getSsl(), null)
|
|
|
|
+ .configureBufferSize(webClientMaxBuffSize)
|
|
|
|
+ .build();
|
|
|
|
+ return ReactiveFailover.create(
|
|
|
|
+ parseUrlList(cluster.getMetrics().getStore().getPrometheus().getUrl()),
|
|
|
|
+ url -> new PrometheusClientApi(new prometheus.query.ApiClient(webClient).setBasePath(url)),
|
|
|
|
+ ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
|
|
|
|
+ "No live schemaRegistry instances available",
|
|
|
|
+ ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private boolean prometheusStorageConfigured(ClustersProperties.Cluster cluster) {
|
|
|
|
+ return Optional.ofNullable(cluster.getMetrics())
|
|
|
|
+ .flatMap(m -> Optional.ofNullable(m.getStore()))
|
|
|
|
+ .flatMap(s -> Optional.ofNullable(s.getPrometheus()))
|
|
|
|
+ .map(p -> StringUtils.hasText(p.getUrl()))
|
|
|
|
+ .orElse(false);
|
|
|
|
+ }
|
|
|
|
+
|
|
private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterProperties) {
|
|
private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterProperties) {
|
|
return clusterProperties.getSchemaRegistry() != null;
|
|
return clusterProperties.getSchemaRegistry() != null;
|
|
}
|
|
}
|
|
@@ -202,20 +248,4 @@ public class KafkaClusterFactory {
|
|
return clusterProperties.getMetrics() != null;
|
|
return clusterProperties.getMetrics() != null;
|
|
}
|
|
}
|
|
|
|
|
|
- @Nullable
|
|
|
|
- private MetricsConfig metricsConfigDataToMetricsConfig(ClustersProperties.MetricsConfigData metricsConfigData) {
|
|
|
|
- if (metricsConfigData == null) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- MetricsConfig.MetricsConfigBuilder builder = MetricsConfig.builder();
|
|
|
|
- builder.type(metricsConfigData.getType());
|
|
|
|
- builder.port(metricsConfigData.getPort());
|
|
|
|
- builder.ssl(Optional.ofNullable(metricsConfigData.getSsl()).orElse(false));
|
|
|
|
- builder.username(metricsConfigData.getUsername());
|
|
|
|
- builder.password(metricsConfigData.getPassword());
|
|
|
|
- builder.keystoreLocation(metricsConfigData.getKeystoreLocation());
|
|
|
|
- builder.keystorePassword(metricsConfigData.getKeystorePassword());
|
|
|
|
- return builder.build();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
}
|
|
}
|