iliax 1 yıl önce
ebeveyn
işleme
7eeb5538d2

+ 19 - 19
documentation/compose/kafka-ui-arm64.yaml

@@ -10,17 +10,17 @@ services:
       - 8080:8080
     depends_on:
       - kafka0
-      - schema-registry0
     environment:
-      KAFKA_CLUSTERS_0_NAME: local
+      KAFKA_CLUSTERS_0_NAME: cluster
       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_METRICS_PORT: 9997
+      KAFKA_CLUSTERS_1_NAME: cluster2
+      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka0:29092
       KAFKA_CLUSTERS_0_METRICS_STORE_PROMETHEUS_URL: "http://prometheus:9090"
       KAFKA_CLUSTERS_0_METRICS_STORE_PROMETHEUS_REMOTEWRITE: 'true'
-      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085
-      DYNAMIC_CONFIG_ENABLED: 'true'  # not necessary, added for tests
       KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
       KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
+      KAFKA_CLUSTERS_0_METRICS_STORE_KAFKA_TOPIC: "kafka_metrics"
 
   prometheus:
     image: prom/prometheus:latest
@@ -60,21 +60,21 @@ services:
       - ./scripts/update_run.sh:/tmp/update_run.sh
     command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
 
-  schema-registry0:
-    image: confluentinc/cp-schema-registry:7.2.1.arm64
-    ports:
-      - 8085:8085
-    depends_on:
-      - kafka0
-    environment:
-      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
-      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
-      SCHEMA_REGISTRY_HOST_NAME: schema-registry0
-      SCHEMA_REGISTRY_LISTENERS: http://schema-registry0:8085
-
-      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
-      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
-      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+#  schema-registry0:
+#    image: confluentinc/cp-schema-registry:7.2.1.arm64
+#    ports:
+#      - 8085:8085
+#    depends_on:
+#      - kafka0
+#    environment:
+#      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
+#      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+#      SCHEMA_REGISTRY_HOST_NAME: schema-registry0
+#      SCHEMA_REGISTRY_LISTENERS: http://schema-registry0:8085
+#
+#      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+#      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+#      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
 
 #  kafka-connect0:
 #    image: confluentinc/cp-kafka-connect:7.2.1.arm64

+ 2 - 2
kafka-ui-api/Dockerfile

@@ -1,5 +1,5 @@
 #FROM azul/zulu-openjdk-alpine:17-jre-headless
-FROM azul/zulu-openjdk-alpine:17-jre-headless
+FROM azul/zulu-openjdk-alpine@sha256:a36679ac0d28cb835e2a8c00e1e0d95509c6c51c5081c7782b85edb1f37a771a
 
 RUN apk add --no-cache gcompat # need to make snappy codec work
 RUN addgroup -S kafkaui && adduser -S kafkaui -G kafkaui
@@ -18,4 +18,4 @@ ENV JAVA_OPTS=
 EXPOSE 8080
 
 # see JmxSslSocketFactory docs to understand why add-opens is needed
-CMD java --add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED $JAVA_OPTS -jar kafka-ui-api.jar
+CMD java --add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED $JAVA_OPTS -jar kafka-ui-api.jar

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -82,6 +82,12 @@ public class ClustersProperties {
   @Data
   public static class MetricsStorage {
     PrometheusStorage prometheus;
+    KafkaMetricsStorage kafka;
+  }
+
+  @Data
+  public static class KafkaMetricsStorage  {
+    String topic;
   }
 
   @Data

+ 6 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -190,8 +190,13 @@ public class MessagesService {
 
   public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
                                                              Map<String, Object> additionalProps) {
+    return createProducer(cluster.getOriginalProperties(), additionalProps);
+  }
+
+  public static KafkaProducer<byte[], byte[]> createProducer(ClustersProperties.Cluster cluster,
+                                                             Map<String, Object> additionalProps) {
     Properties properties = new Properties();
-    SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
+    SslPropertiesUtil.addKafkaSslProperties(cluster.getSsl(), properties);
     properties.putAll(cluster.getProperties());
     properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
     properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/IoRatesMetricsScanner.java

@@ -32,7 +32,7 @@ class IoRatesMetricsScanner {
     });
   }
 
-  public Metrics.IoRates get() {
+  Metrics.IoRates get() {
     return Metrics.IoRates.builder()
         .topicBytesInPerSec(bytesInFifteenMinuteRate)
         .topicBytesOutPerSec(bytesOutFifteenMinuteRate)

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/MetricsScrapping.java

@@ -74,7 +74,6 @@ public class MetricsScrapping {
   private void sendMetricsToSink(Metrics metrics) {
     sink.send(prepareMetricsForSending(metrics))
         .doOnError(th -> log.warn("Error sending metrics to metrics sink", th))
-        .doOnTerminate(() -> log.debug("Metrics sent to sink")) //todo
         .subscribe();
   }
 

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/scrape/inferred/InferredMetricsScraper.java

@@ -23,7 +23,7 @@ public class InferredMetricsScraper {
 
   public synchronized Mono<InferredMetrics> scrape(ScrapedClusterState newState) {
     var inferred = infer(prevState, newState);
-    prevState = newState;
+    this.prevState = newState;
     return Mono.just(inferred);
   }
 
@@ -34,7 +34,7 @@ public class InferredMetricsScraper {
     fillTopicMetrics(registry, newState);
     fillConsumerGroupsMetrics(registry, newState);
     List<MetricFamilySamples> metrics = registry.metrics.values().stream().toList();
-    log.debug("{} metrics inferred from cluster state", metrics.size());
+    log.debug("{} metric families inferred from cluster state", metrics.size());
     return new InferredMetrics(metrics);
   }
 
@@ -94,7 +94,6 @@ public class InferredMetricsScraper {
               state.logDirSpaceStats().totalBytes()
           );
         }
-        //TODO: maybe add per-directory stats also?
       }
     });
   }

+ 69 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/KafkaSink.java

@@ -0,0 +1,69 @@
+package com.provectus.kafka.ui.service.metrics.sink;
+
+import static com.provectus.kafka.ui.service.MessagesService.*;
+import static io.prometheus.client.Collector.*;
+
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.google.common.base.Charsets;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import reactor.core.publisher.Mono;
+
+/*
+ * Format and implementation are the same as in https://github.com/Telefonica/prometheus-kafka-adapter
+ */
+@RequiredArgsConstructor
+class KafkaSink implements MetricsSink {
+
+  record KafkaMetric(String timestamp, String value, String name, Map<String, String> labels) { }
+
+  private static final JsonMapper JSON_MAPPER = new JsonMapper();
+
+  private final String topic;
+  private final Producer<byte[], byte[]> producer;
+
+  static KafkaSink create(ClustersProperties.Cluster cluster, String targetTopic) {
+    return new KafkaSink(targetTopic, createProducer(cluster, Map.of()));
+  }
+
+  @Override
+  public Mono<Void> send(Stream<MetricFamilySamples> metrics) {
+    return Mono.fromRunnable(() -> {
+      String ts = Instant.now()
+          .truncatedTo(ChronoUnit.SECONDS)
+          .atZone(ZoneOffset.UTC)
+          .format(DateTimeFormatter.ISO_DATE_TIME);
+
+      metrics.flatMap(m -> createRecord(ts, m)).forEach(producer::send);
+    });
+  }
+
+  private Stream<ProducerRecord<byte[], byte[]>> createRecord(String ts, MetricFamilySamples metrics) {
+    return metrics.samples.stream()
+        .map(sample -> {
+          var lbls = new LinkedHashMap<String, String>();
+          lbls.put("__name__", sample.name);
+          for (int i = 0; i < sample.labelNames.size(); i++) {
+            lbls.put(sample.labelNames.get(i), sample.labelValues.get(i));
+          }
+          var km = new KafkaMetric(ts, doubleToGoString(sample.value), sample.name, lbls);
+          return new ProducerRecord<>(topic, toJson(km));
+        });
+  }
+
+  @SneakyThrows
+  private static byte[] toJson(KafkaMetric m) {
+    return JSON_MAPPER.writeValueAsString(m).getBytes(Charsets.UTF_8);
+  }
+
+}

+ 30 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/MetricsSink.java

@@ -4,35 +4,52 @@ import static io.prometheus.client.Collector.MetricFamilySamples;
 import static org.springframework.util.StringUtils.hasText;
 
 import com.provectus.kafka.ui.config.ClustersProperties;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import java.util.stream.Stream;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface MetricsSink {
 
   static MetricsSink create(ClustersProperties.Cluster cluster) {
-    return Optional.ofNullable(cluster.getMetrics())
+    List<MetricsSink> sinks = new ArrayList<>();
+    Optional.ofNullable(cluster.getMetrics())
         .flatMap(metrics -> Optional.ofNullable(metrics.getStore()))
         .flatMap(store -> Optional.ofNullable(store.getPrometheus()))
-        .map(prometheusConf -> {
+        .ifPresent(prometheusConf -> {
               if (hasText(prometheusConf.getUrl()) && Boolean.TRUE.equals(prometheusConf.getRemoteWrite())) {
-                return new PrometheusRemoteWriteSink(prometheusConf.getUrl());
+                sinks.add(new PrometheusRemoteWriteSink(prometheusConf.getUrl()));
               }
               if (hasText(prometheusConf.getPushGatewayUrl())) {
-                return PrometheusPushGatewaySink.create(
-                    prometheusConf.getPushGatewayUrl(),
-                    prometheusConf.getPushGatewayJobName(),
-                    prometheusConf.getPushGatewayUsername(),
-                    prometheusConf.getPushGatewayPassword()
-                );
+                sinks.add(
+                    PrometheusPushGatewaySink.create(
+                        prometheusConf.getPushGatewayUrl(),
+                        prometheusConf.getPushGatewayJobName(),
+                        prometheusConf.getPushGatewayUsername(),
+                        prometheusConf.getPushGatewayPassword()
+                    ));
               }
-              return noop();
             }
-        ).orElse(noop());
+        );
+
+    Optional.ofNullable(cluster.getMetrics())
+        .flatMap(metrics -> Optional.ofNullable(metrics.getStore()))
+        .flatMap(store -> Optional.ofNullable(store.getKafka()))
+        .flatMap(kafka -> Optional.ofNullable(kafka.getTopic()))
+        .ifPresent(topic -> sinks.add(KafkaSink.create(cluster, topic)));
+
+    return compoundSink(sinks);
   }
 
-  static MetricsSink noop() {
-    return m -> Mono.empty();
+  static MetricsSink compoundSink(List<MetricsSink> sinks) {
+    return metricsStream -> {
+      var materialized = metricsStream.toList();
+      return Flux.fromIterable(sinks)
+          .flatMap(sink -> sink.send(materialized.stream()))
+          .then();
+    };
   }
 
   Mono<Void> send(Stream<MetricFamilySamples> metrics);

+ 3 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/sink/PrometheusRemoteWriteSink.java

@@ -14,7 +14,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import org.springframework.util.unit.DataSize;
 import org.springframework.web.reactive.function.client.WebClient;
-import org.xerial.snappy.pure.PureJavaSnappy;
+import org.xerial.snappy.Snappy;
 import prometheus.Remote;
 import reactor.core.publisher.Mono;
 
@@ -32,7 +32,7 @@ class PrometheusRemoteWriteSink implements MetricsSink {
   @SneakyThrows
   @Override
   public Mono<Void> send(Stream<MetricFamilySamples> metrics) {
-    byte[] bytesToWrite = compressSnappy(createWriteRequest(metrics).toByteArray());
+    byte[] bytesToWrite = Snappy.compress(createWriteRequest(metrics).toByteArray());
     return webClient.post()
         .uri(writeEndpoint)
         .header("Content-Type", "application/x-protobuf")
@@ -45,16 +45,6 @@ class PrometheusRemoteWriteSink implements MetricsSink {
         .then();
   }
 
-  //TODO: rm this
-  private static byte[] compressSnappy(byte[] data) throws IOException {
-    PureJavaSnappy impl = new PureJavaSnappy();
-    byte[] buf = new byte[impl.maxCompressedLength(data.length)];
-    int compressedByteSize = impl.rawCompress(data, 0, data.length, buf, 0);
-    byte[] result = new byte[compressedByteSize];
-    System.arraycopy(buf, 0, result, 0, compressedByteSize);
-    return result;
-  }
-
   private static Remote.WriteRequest createWriteRequest(Stream<MetricFamilySamples> metrics) {
     long currentTs = System.currentTimeMillis();
     Remote.WriteRequest.Builder request = Remote.WriteRequest.newBuilder();
@@ -79,7 +69,7 @@ class PrometheusRemoteWriteSink implements MetricsSink {
         request.addTimeseries(timeSeriesBuilder);
       }
     });
-    //TODO: how to pass Metadata????
+    //TODO: how to pass Metadata ???
     return request.build();
   }
 

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaServicesValidation.java

@@ -151,7 +151,7 @@ public final class KafkaServicesValidation {
       log.error("Error creating Prometheus client", e);
       return invalid("Error creating Prometheus client: " + e.getMessage());
     }
-    return client.mono(c -> c.query("1", null, null)) //TODO: check params
+    return client.mono(c -> c.query("1", null, null))
         .then(valid())
         .onErrorResume(KafkaServicesValidation::invalid);
   }