From 7eeb5538d23cbd52414efd79e59cb2ec5cfefb7c Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 21 Jul 2023 20:01:32 +0400 Subject: [PATCH] wip --- documentation/compose/kafka-ui-arm64.yaml | 38 +++++----- kafka-ui-api/Dockerfile | 4 +- .../kafka/ui/config/ClustersProperties.java | 6 ++ .../kafka/ui/service/MessagesService.java | 7 +- .../metrics/scrape/IoRatesMetricsScanner.java | 2 +- .../metrics/scrape/MetricsScrapping.java | 1 - .../inferred/InferredMetricsScraper.java | 5 +- .../ui/service/metrics/sink/KafkaSink.java | 69 +++++++++++++++++++ .../ui/service/metrics/sink/MetricsSink.java | 43 ++++++++---- .../sink/PrometheusRemoteWriteSink.java | 16 +---- .../ui/util/KafkaServicesValidation.java | 2 +- 11 files changed, 139 insertions(+), 54 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java diff --git a/documentation/compose/kafka-ui-arm64.yaml b/documentation/compose/kafka-ui-arm64.yaml index 6b7467863d..8b7ac667f2 100644 --- a/documentation/compose/kafka-ui-arm64.yaml +++ b/documentation/compose/kafka-ui-arm64.yaml @@ -10,17 +10,17 @@ services: - 8080:8080 depends_on: - kafka0 - - schema-registry0 environment: - KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_NAME: cluster KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 + KAFKA_CLUSTERS_1_NAME: cluster2 + KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka0:29092 KAFKA_CLUSTERS_0_METRICS_STORE_PROMETHEUS_URL: "http://prometheus:9090" KAFKA_CLUSTERS_0_METRICS_STORE_PROMETHEUS_REMOTEWRITE: 'true' - KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085 - DYNAMIC_CONFIG_ENABLED: 'true' # not necessary, added for tests KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true' KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true' + KAFKA_CLUSTERS_0_METRICS_STORE_KAFKA_TOPIC: "kafka_metrics" prometheus: image: prom/prometheus:latest @@ -60,21 +60,21 @@ services: - ./scripts/update_run.sh:/tmp/update_run.sh command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" - schema-registry0: - image: confluentinc/cp-schema-registry:7.2.1.arm64 - ports: - - 8085:8085 - depends_on: - - kafka0 - environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 - SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT - SCHEMA_REGISTRY_HOST_NAME: schema-registry0 - SCHEMA_REGISTRY_LISTENERS: http://schema-registry0:8085 - - SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" - SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO - SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas +# schema-registry0: +# image: confluentinc/cp-schema-registry:7.2.1.arm64 +# ports: +# - 8085:8085 +# depends_on: +# - kafka0 +# environment: +# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 +# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT +# SCHEMA_REGISTRY_HOST_NAME: schema-registry0 +# SCHEMA_REGISTRY_LISTENERS: http://schema-registry0:8085 +# +# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" +# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO +# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas # kafka-connect0: # image: confluentinc/cp-kafka-connect:7.2.1.arm64 diff --git a/kafka-ui-api/Dockerfile b/kafka-ui-api/Dockerfile index 16a2de9bea..d364aaf297 100644 --- a/kafka-ui-api/Dockerfile +++ b/kafka-ui-api/Dockerfile @@ -1,5 +1,5 @@ #FROM azul/zulu-openjdk-alpine:17-jre-headless -FROM azul/zulu-openjdk-alpine:17-jre-headless +FROM azul/zulu-openjdk-alpine@sha256:a36679ac0d28cb835e2a8c00e1e0d95509c6c51c5081c7782b85edb1f37a771a RUN apk add --no-cache gcompat # need to make snappy codec work RUN addgroup -S kafkaui && adduser -S kafkaui -G kafkaui @@ -18,4 +18,4 @@ ENV JAVA_OPTS= EXPOSE 8080 # see JmxSslSocketFactory docs to understand why add-opens is needed -CMD java --add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED $JAVA_OPTS -jar kafka-ui-api.jar +CMD java --add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED $JAVA_OPTS -jar kafka-ui-api.jar diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index adbf5839eb..00e2c7e5b4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -82,6 +82,12 @@ public class ClustersProperties { @Data public static class MetricsStorage { PrometheusStorage prometheus; + KafkaMetricsStorage kafka; + } + + @Data + public static class KafkaMetricsStorage { + String topic; } @Data diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index dcc122ba28..11028b852f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -190,8 +190,13 @@ public class MessagesService { public static KafkaProducer createProducer(KafkaCluster cluster, Map additionalProps) { + return createProducer(cluster.getOriginalProperties(), additionalProps); + } + + public static KafkaProducer createProducer(ClustersProperties.Cluster cluster, + Map additionalProps) { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); + SslPropertiesUtil.addKafkaSslProperties(cluster.getSsl(), properties); properties.putAll(cluster.getProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScanner.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScanner.java index 961add77a6..b0935d8a21 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScanner.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScanner.java @@ -32,7 +32,7 @@ class IoRatesMetricsScanner { }); } - public Metrics.IoRates get() { + Metrics.IoRates get() { return Metrics.IoRates.builder() .topicBytesInPerSec(bytesInFifteenMinuteRate) .topicBytesOutPerSec(bytesOutFifteenMinuteRate) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java index be47571c50..42d7dcd994 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java @@ -74,7 +74,6 @@ public class MetricsScrapping { private void sendMetricsToSink(Metrics metrics) { sink.send(prepareMetricsForSending(metrics)) .doOnError(th -> log.warn("Error sending metrics to metrics sink", th)) - .doOnTerminate(() -> log.debug("Metrics sent to sink")) //todo .subscribe(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraper.java index 03e6932813..aaa38c290b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraper.java @@ -23,7 +23,7 @@ public class InferredMetricsScraper { public synchronized Mono scrape(ScrapedClusterState newState) { var inferred = infer(prevState, newState); - prevState = newState; + this.prevState = newState; return Mono.just(inferred); } @@ -34,7 +34,7 @@ public class InferredMetricsScraper { fillTopicMetrics(registry, newState); fillConsumerGroupsMetrics(registry, newState); List metrics = registry.metrics.values().stream().toList(); - log.debug("{} metrics inferred from cluster state", metrics.size()); + log.debug("{} metric families inferred from cluster state", metrics.size()); return new InferredMetrics(metrics); } @@ -94,7 +94,6 @@ public class InferredMetricsScraper { state.logDirSpaceStats().totalBytes() ); } - //TODO: maybe add per-directory stats also? } }); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java new file mode 100644 index 0000000000..3dbc459d8d --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java @@ -0,0 +1,69 @@ +package com.provectus.kafka.ui.service.metrics.sink; + +import static com.provectus.kafka.ui.service.MessagesService.*; +import static io.prometheus.client.Collector.*; + +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.google.common.base.Charsets; +import com.provectus.kafka.ui.config.ClustersProperties; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import reactor.core.publisher.Mono; + +/* + * Format and implementation are the same as in https://github.com/Telefonica/prometheus-kafka-adapter + */ +@RequiredArgsConstructor +class KafkaSink implements MetricsSink { + + record KafkaMetric(String timestamp, String value, String name, Map labels) { } + + private static final JsonMapper JSON_MAPPER = new JsonMapper(); + + private final String topic; + private final Producer producer; + + static KafkaSink create(ClustersProperties.Cluster cluster, String targetTopic) { + return new KafkaSink(targetTopic, createProducer(cluster, Map.of())); + } + + @Override + public Mono send(Stream metrics) { + return Mono.fromRunnable(() -> { + String ts = Instant.now() + .truncatedTo(ChronoUnit.SECONDS) + .atZone(ZoneOffset.UTC) + .format(DateTimeFormatter.ISO_DATE_TIME); + + metrics.flatMap(m -> createRecord(ts, m)).forEach(producer::send); + }); + } + + private Stream> createRecord(String ts, MetricFamilySamples metrics) { + return metrics.samples.stream() + .map(sample -> { + var lbls = new LinkedHashMap(); + lbls.put("__name__", sample.name); + for (int i = 0; i < sample.labelNames.size(); i++) { + lbls.put(sample.labelNames.get(i), sample.labelValues.get(i)); + } + var km = new KafkaMetric(ts, doubleToGoString(sample.value), sample.name, lbls); + return new ProducerRecord<>(topic, toJson(km)); + }); + } + + @SneakyThrows + private static byte[] toJson(KafkaMetric m) { + return JSON_MAPPER.writeValueAsString(m).getBytes(Charsets.UTF_8); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java index da27b05a22..04f8a56084 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java @@ -4,35 +4,52 @@ import static io.prometheus.client.Collector.MetricFamilySamples; import static org.springframework.util.StringUtils.hasText; import com.provectus.kafka.ui.config.ClustersProperties; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; import java.util.stream.Stream; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface MetricsSink { static MetricsSink create(ClustersProperties.Cluster cluster) { - return Optional.ofNullable(cluster.getMetrics()) + List sinks = new ArrayList<>(); + Optional.ofNullable(cluster.getMetrics()) .flatMap(metrics -> Optional.ofNullable(metrics.getStore())) .flatMap(store -> Optional.ofNullable(store.getPrometheus())) - .map(prometheusConf -> { + .ifPresent(prometheusConf -> { if (hasText(prometheusConf.getUrl()) && Boolean.TRUE.equals(prometheusConf.getRemoteWrite())) { - return new PrometheusRemoteWriteSink(prometheusConf.getUrl()); + sinks.add(new PrometheusRemoteWriteSink(prometheusConf.getUrl())); } if (hasText(prometheusConf.getPushGatewayUrl())) { - return PrometheusPushGatewaySink.create( - prometheusConf.getPushGatewayUrl(), - prometheusConf.getPushGatewayJobName(), - prometheusConf.getPushGatewayUsername(), - prometheusConf.getPushGatewayPassword() - ); + sinks.add( + PrometheusPushGatewaySink.create( + prometheusConf.getPushGatewayUrl(), + prometheusConf.getPushGatewayJobName(), + prometheusConf.getPushGatewayUsername(), + prometheusConf.getPushGatewayPassword() + )); } - return noop(); } - ).orElse(noop()); + ); + + Optional.ofNullable(cluster.getMetrics()) + .flatMap(metrics -> Optional.ofNullable(metrics.getStore())) + .flatMap(store -> Optional.ofNullable(store.getKafka())) + .flatMap(kafka -> Optional.ofNullable(kafka.getTopic())) + .ifPresent(topic -> sinks.add(KafkaSink.create(cluster, topic))); + + return compoundSink(sinks); } - static MetricsSink noop() { - return m -> Mono.empty(); + static MetricsSink compoundSink(List sinks) { + return metricsStream -> { + var materialized = metricsStream.toList(); + return Flux.fromIterable(sinks) + .flatMap(sink -> sink.send(materialized.stream())) + .then(); + }; } Mono send(Stream metrics); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java index c023228b4a..0e1cf736f3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java @@ -14,7 +14,7 @@ import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.springframework.util.unit.DataSize; import org.springframework.web.reactive.function.client.WebClient; -import org.xerial.snappy.pure.PureJavaSnappy; +import org.xerial.snappy.Snappy; import prometheus.Remote; import reactor.core.publisher.Mono; @@ -32,7 +32,7 @@ class PrometheusRemoteWriteSink implements MetricsSink { @SneakyThrows @Override public Mono send(Stream metrics) { - byte[] bytesToWrite = compressSnappy(createWriteRequest(metrics).toByteArray()); + byte[] bytesToWrite = Snappy.compress(createWriteRequest(metrics).toByteArray()); return webClient.post() .uri(writeEndpoint) .header("Content-Type", "application/x-protobuf") @@ -45,16 +45,6 @@ class PrometheusRemoteWriteSink implements MetricsSink { .then(); } - //TODO: rm this - private static byte[] compressSnappy(byte[] data) throws IOException { - PureJavaSnappy impl = new PureJavaSnappy(); - byte[] buf = new byte[impl.maxCompressedLength(data.length)]; - int compressedByteSize = impl.rawCompress(data, 0, data.length, buf, 0); - byte[] result = new byte[compressedByteSize]; - System.arraycopy(buf, 0, result, 0, compressedByteSize); - return result; - } - private static Remote.WriteRequest createWriteRequest(Stream metrics) { long currentTs = System.currentTimeMillis(); Remote.WriteRequest.Builder request = Remote.WriteRequest.newBuilder(); @@ -79,7 +69,7 @@ class PrometheusRemoteWriteSink implements MetricsSink { request.addTimeseries(timeSeriesBuilder); } }); - //TODO: how to pass Metadata???? + //TODO: how to pass Metadata ??? return request.build(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java index 516ad2266b..e1b725aa34 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java @@ -151,7 +151,7 @@ public final class KafkaServicesValidation { log.error("Error creating Prometheus client", e); return invalid("Error creating Prometheus client: " + e.getMessage()); } - return client.mono(c -> c.query("1", null, null)) //TODO: check params + return client.mono(c -> c.query("1", null, null)) .then(valid()) .onErrorResume(KafkaServicesValidation::invalid); }