iliax hace 1 año
padre
commit
0dd25eb507

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java

@@ -13,6 +13,7 @@ abstract class AbstractAuthSecurityConfig {
       "/resources/**",
       "/actuator/health/**",
       "/actuator/info",
+      "/actuator/prometheus",
       "/auth",
       "/login",
       "/logout",

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.emitter;
 
-import com.provectus.kafka.ui.emitter.EnhancedConsumer.PolledRecords;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerRecord;

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -11,7 +11,7 @@ class ConsumingStats {
   private long elapsed = 0;
 
   void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
-                        EnhancedConsumer.PolledRecords polledRecords,
+                        PolledRecords polledRecords,
                         int filterApplyErrors) {
     bytes += polledRecords.bytes();
     this.records += polledRecords.count();

+ 33 - 38
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EnhancedConsumer.java

@@ -1,68 +1,63 @@
 package com.provectus.kafka.ui.emitter;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import java.time.Duration;
-import java.util.Iterator;
-import java.util.List;
+import java.util.Properties;
 import lombok.RequiredArgsConstructor;
 import lombok.experimental.Delegate;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.Bytes;
 
 
 @RequiredArgsConstructor
 public class EnhancedConsumer implements Consumer<Bytes, Bytes> {
 
-  public record PolledRecords(int count, int bytes, Duration elapsed, ConsumerRecords<Bytes, Bytes> records)
-      implements Iterable<ConsumerRecord<Bytes, Bytes>> {
-
-    static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
-      return new PolledRecords(
-          polled.count(),
-          calculatePolledRecSize(polled),
-          pollDuration,
-          polled
-      );
-    }
+  @Delegate
+  private final Consumer<Bytes, Bytes> consumer;
+  private final PollingThrottler throttler;
+  private final ApplicationMetrics metrics;
 
-    public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
-      return records.records(tp);
-    }
+  public static EnhancedConsumer create(Properties properties,
+                                        PollingThrottler throttler,
+                                        ApplicationMetrics metrics) {
+    return new EnhancedConsumer(createInternalConsumer(properties, metrics), throttler, metrics);
+  }
 
-    @Override
-    public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
-      return records.iterator();
+  private static KafkaConsumer<Bytes, Bytes> createInternalConsumer(Properties properties, ApplicationMetrics metrics) {
+    metrics.activeConsumers().incrementAndGet();
+    try {
+      return new KafkaConsumer<>(properties) {
+        @Override
+        public void close(Duration timeout) {
+          metrics.activeConsumers().decrementAndGet();
+          super.close(timeout);
+        }
+      };
+    } catch (Exception e) {
+      metrics.activeConsumers().decrementAndGet();
+      throw e;
     }
   }
 
-  @Delegate
-  private final Consumer<Bytes, Bytes> consumer;
-  private final PollingThrottler throttler;
-
   public PolledRecords pollEnhanced(Duration dur) {
     var stopwatch = Stopwatch.createStarted();
     ConsumerRecords<Bytes, Bytes> polled = consumer.poll(dur);
     PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
-    throttler.throttleAfterPoll(polledEnhanced.bytes);
+    var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
+    metrics.meterPolledRecords(topic(), polledEnhanced, throttled);
     return polledEnhanced;
   }
 
-  private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
-    int polledBytes = 0;
-    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
-      for (Header header : rec.headers()) {
-        polledBytes +=
-            (header.key() != null ? header.key().getBytes().length : 0)
-                + (header.value() != null ? header.value().length : 0);
-      }
-      polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
-      polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
-    }
-    return polledBytes;
+  private String topic() {
+    var topics = consumer.assignment().stream().map(TopicPartition::topic).toList();
+    // we assume that consumer will always read single topic
+    Preconditions.checkArgument(topics.size() == 1);
+    return topics.get(0);
   }
 
 }

+ 0 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -1,7 +1,5 @@
 package com.provectus.kafka.ui.emitter;
 
-import static com.provectus.kafka.ui.emitter.EnhancedConsumer.PolledRecords;
-
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;

+ 48 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java

@@ -0,0 +1,48 @@
+package com.provectus.kafka.ui.emitter;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Bytes;
+
+public record PolledRecords(int count,
+                            int bytes,
+                            Duration elapsed,
+                            ConsumerRecords<Bytes, Bytes> records) implements Iterable<ConsumerRecord<Bytes, Bytes>> {
+
+  static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
+    return new PolledRecords(
+        polled.count(),
+        calculatePolledRecSize(polled),
+        pollDuration,
+        polled
+    );
+  }
+
+  public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
+    return records.records(tp);
+  }
+
+  @Override
+  public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
+    return records.iterator();
+  }
+
+  private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
+    int polledBytes = 0;
+    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
+      for (Header header : rec.headers()) {
+        polledBytes +=
+            (header.key() != null ? header.key().getBytes().length : 0)
+                + (header.value() != null ? header.value().length : 0);
+      }
+      polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
+      polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
+    }
+    return polledBytes;
+  }
+}

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java

@@ -33,14 +33,17 @@ public class PollingThrottler {
     return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
   }
 
-  public void throttleAfterPoll(int polledBytes) {
+  //returns true if polling was throttled
+  public boolean throttleAfterPoll(int polledBytes) {
     if (polledBytes > 0) {
       double sleptSeconds = rateLimiter.acquire(polledBytes);
       if (!throttled && sleptSeconds > 0.0) {
         throttled = true;
         log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
+        return true;
       }
     }
+    return false;
   }
 
 }

+ 6 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -9,6 +9,7 @@ import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -27,7 +28,6 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.BytesDeserializer;
@@ -266,7 +266,11 @@ public class ConsumerGroupService {
     props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
     props.putAll(properties);
 
-    return new EnhancedConsumer(new KafkaConsumer<>(props), cluster.getPollingSettings().getPollingThrottler());
+    return EnhancedConsumer.create(
+        props,
+        cluster.getPollingSettings().getPollingThrottler(),
+        ApplicationMetrics.forCluster(cluster)
+    );
   }
 
 }

+ 81 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ApplicationMetrics.java

@@ -0,0 +1,81 @@
+package com.provectus.kafka.ui.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.provectus.kafka.ui.emitter.PolledRecords;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
+public class ApplicationMetrics {
+
+  private final String clusterName;
+  private final MeterRegistry registry;
+
+  public static ApplicationMetrics forCluster(KafkaCluster cluster) {
+    return new ApplicationMetrics(cluster.getName(), Metrics.globalRegistry);
+  }
+
+  @VisibleForTesting
+  public static ApplicationMetrics noop() {
+    return new ApplicationMetrics("noop", new SimpleMeterRegistry());
+  }
+
+  public void meterPolledRecords(String topic, PolledRecords polled, boolean throttled) {
+    pollTimer(topic).record(polled.elapsed());
+    polledRecords(topic).increment(polled.count());
+    polledBytes(topic).record(polled.bytes());
+    if (throttled) {
+      pollThrottlingActivations().increment();
+    }
+  }
+
+  private Counter polledRecords(String topic) {
+    return Counter.builder("topic_records_polled")
+        .description("Number of records polled from topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private DistributionSummary polledBytes(String topic) {
+    return DistributionSummary.builder("topic_polled_bytes")
+        .description("Bytes polled from kafka topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private Timer pollTimer(String topic) {
+    return Timer.builder("topic_poll_time")
+        .description("Time spend in polling for topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private Counter pollThrottlingActivations() {
+    return Counter.builder("poll_throttling_activations")
+        .description("Number of poll throttling activations")
+        .tag("cluster", clusterName)
+        .register(registry);
+  }
+
+  public AtomicInteger activeConsumers() {
+    var count = new AtomicInteger();
+    Gauge.builder("active_consumers", () -> count)
+        .description("Number of active consumers")
+        .tag("cluster", clusterName)
+        .register(registry);
+    return count;
+  }
+
+}

+ 6 - 4
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -8,9 +8,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.PollingSettings;
+import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
@@ -18,6 +20,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,7 +41,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -325,11 +327,11 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
   }
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer() {
+  private EnhancedConsumer createConsumer() {
     return createConsumer(Map.of());
   }
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer(Map<String, Object> properties) {
+  private EnhancedConsumer createConsumer(Map<String, Object> properties) {
     final Map<String, ? extends Serializable> map = Map.of(
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
         ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
@@ -340,7 +342,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     Properties props = new Properties();
     props.putAll(map);
     props.putAll(properties);
-    return new KafkaConsumer<>(props);
+    return new EnhancedConsumer(new KafkaConsumer<>(props), PollingThrottler.noop(), ApplicationMetrics.noop());
   }
 
   @Value