|
@@ -1,5 +1,12 @@
|
|
|
package com.provectus.kafka.ui.service;
|
|
|
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateClusterConnection;
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateConnect;
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateKsql;
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validatePrometheusStore;
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateSchemaRegistry;
|
|
|
+import static com.provectus.kafka.ui.util.KafkaServicesValidation.validateTruststore;
|
|
|
+
|
|
|
import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
|
|
|
import com.provectus.kafka.ui.config.ClustersProperties;
|
|
|
import com.provectus.kafka.ui.config.WebclientProperties;
|
|
@@ -25,8 +32,10 @@ import java.util.Properties;
|
|
|
import java.util.stream.Stream;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
import org.springframework.util.unit.DataSize;
|
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
|
+import prometheus.query.api.PrometheusClientApi;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
import reactor.util.function.Tuple2;
|
|
@@ -56,6 +65,7 @@ public class KafkaClusterFactory {
|
|
|
builder.bootstrapServers(clusterProperties.getBootstrapServers());
|
|
|
builder.properties(convertProperties(clusterProperties.getProperties()));
|
|
|
builder.readOnly(clusterProperties.isReadOnly());
|
|
|
+ builder.exposeMetricsViaPrometheusEndpoint(exposeMetricsViaPrometheusEndpoint(clusterProperties));
|
|
|
builder.masking(DataMasking.create(clusterProperties.getMasking()));
|
|
|
builder.pollingSettings(PollingSettings.create(clusterProperties, properties));
|
|
|
builder.metricsScrapping(MetricsScrapping.create(clusterProperties, jmxMetricsRetriever));
|
|
@@ -69,13 +79,16 @@ public class KafkaClusterFactory {
|
|
|
if (ksqlConfigured(clusterProperties)) {
|
|
|
builder.ksqlClient(ksqlClient(clusterProperties));
|
|
|
}
|
|
|
+ if (prometheusStorageConfigured(clusterProperties)) {
|
|
|
+ builder.prometheusStorageClient(prometheusStorageClient(clusterProperties));
|
|
|
+ }
|
|
|
builder.originalProperties(clusterProperties);
|
|
|
return builder.build();
|
|
|
}
|
|
|
|
|
|
public Mono<ClusterConfigValidationDTO> validate(ClustersProperties.Cluster clusterProperties) {
|
|
|
if (clusterProperties.getSsl() != null) {
|
|
|
- Optional<String> errMsg = KafkaServicesValidation.validateTruststore(clusterProperties.getSsl());
|
|
|
+ Optional<String> errMsg = validateTruststore(clusterProperties.getSsl());
|
|
|
if (errMsg.isPresent()) {
|
|
|
return Mono.just(new ClusterConfigValidationDTO()
|
|
|
.kafka(new ApplicationPropertyValidationDTO()
|
|
@@ -85,40 +98,51 @@ public class KafkaClusterFactory {
|
|
|
}
|
|
|
|
|
|
return Mono.zip(
|
|
|
- KafkaServicesValidation.validateClusterConnection(
|
|
|
+ validateClusterConnection(
|
|
|
clusterProperties.getBootstrapServers(),
|
|
|
convertProperties(clusterProperties.getProperties()),
|
|
|
clusterProperties.getSsl()
|
|
|
),
|
|
|
schemaRegistryConfigured(clusterProperties)
|
|
|
- ? KafkaServicesValidation.validateSchemaRegistry(
|
|
|
- () -> schemaRegistryClient(clusterProperties)).map(Optional::of)
|
|
|
+ ? validateSchemaRegistry(() -> schemaRegistryClient(clusterProperties)).map(Optional::of)
|
|
|
: Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),
|
|
|
|
|
|
ksqlConfigured(clusterProperties)
|
|
|
- ? KafkaServicesValidation.validateKsql(() -> ksqlClient(clusterProperties)).map(Optional::of)
|
|
|
+ ? validateKsql(() -> ksqlClient(clusterProperties)).map(Optional::of)
|
|
|
: Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),
|
|
|
|
|
|
connectClientsConfigured(clusterProperties)
|
|
|
?
|
|
|
Flux.fromIterable(clusterProperties.getKafkaConnect())
|
|
|
.flatMap(c ->
|
|
|
- KafkaServicesValidation.validateConnect(() -> connectClient(clusterProperties, c))
|
|
|
+ validateConnect(() -> connectClient(clusterProperties, c))
|
|
|
.map(r -> Tuples.of(c.getName(), r)))
|
|
|
.collectMap(Tuple2::getT1, Tuple2::getT2)
|
|
|
.map(Optional::of)
|
|
|
:
|
|
|
- Mono.<Optional<Map<String, ApplicationPropertyValidationDTO>>>just(Optional.empty())
|
|
|
+ Mono.<Optional<Map<String, ApplicationPropertyValidationDTO>>>just(Optional.empty()),
|
|
|
+
|
|
|
+ prometheusStorageConfigured(clusterProperties)
|
|
|
+ ? validatePrometheusStore(() -> prometheusStorageClient(clusterProperties)).map(Optional::of)
|
|
|
+ : Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty())
|
|
|
+
|
|
|
).map(tuple -> {
|
|
|
var validation = new ClusterConfigValidationDTO();
|
|
|
validation.kafka(tuple.getT1());
|
|
|
tuple.getT2().ifPresent(validation::schemaRegistry);
|
|
|
tuple.getT3().ifPresent(validation::ksqldb);
|
|
|
tuple.getT4().ifPresent(validation::kafkaConnects);
|
|
|
+ tuple.getT5().ifPresent(validation::prometheusStorage);
|
|
|
return validation;
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ private boolean exposeMetricsViaPrometheusEndpoint(ClustersProperties.Cluster clusterProperties) {
|
|
|
+ return Optional.ofNullable(clusterProperties.getMetrics())
|
|
|
+ .map(m -> Boolean.TRUE.equals(m.getPrometheusExpose()))
|
|
|
+ .orElse(true);
|
|
|
+ }
|
|
|
+
|
|
|
private Properties convertProperties(Map<String, Object> propertiesMap) {
|
|
|
Properties properties = new Properties();
|
|
|
if (propertiesMap != null) {
|
|
@@ -153,6 +177,28 @@ public class KafkaClusterFactory {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ private ReactiveFailover<PrometheusClientApi> prometheusStorageClient(ClustersProperties.Cluster cluster) {
|
|
|
+ WebClient webClient = new WebClientConfigurator()
|
|
|
+ .configureSsl(cluster.getSsl(), cluster.getSchemaRegistrySsl())
|
|
|
+ .configureBufferSize(webClientMaxBuffSize)
|
|
|
+ .build();
|
|
|
+ return ReactiveFailover.create(
|
|
|
+ parseUrlList(cluster.getMetrics().getStore().getPrometheus().getUrl()),
|
|
|
+ url -> new PrometheusClientApi(new prometheus.query.ApiClient(webClient, null, null).setBasePath(url)),
|
|
|
+ ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
|
|
|
+ "No live schemaRegistry instances available",
|
|
|
+ ReactiveFailover.DEFAULT_RETRY_GRACE_PERIOD_MS
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean prometheusStorageConfigured(ClustersProperties.Cluster cluster) {
|
|
|
+ return Optional.ofNullable(cluster.getMetrics())
|
|
|
+ .flatMap(m -> Optional.ofNullable(m.getStore()))
|
|
|
+ .flatMap(s -> Optional.of(s.getPrometheus()))
|
|
|
+ .map(p -> StringUtils.hasText(p.getUrl()))
|
|
|
+ .orElse(false);
|
|
|
+ }
|
|
|
+
|
|
|
private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterProperties) {
|
|
|
return clusterProperties.getSchemaRegistry() != null;
|
|
|
}
|