Procházet zdrojové kódy

ISSSUE4052: Polling metrics (#4069)

Co-authored-by: iliax <ikuramshin@provectus.com>
Ilya Kuramshin před 1 rokem
rodič
revize
c96a0c6be5
18 změnil soubory, kde provedl 275 přidání a 124 odebrání
  1. 5 0
      kafka-ui-api/pom.xml
  2. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java
  3. 7 18
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
  4. 7 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  5. 4 13
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
  6. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java
  7. 82 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EnhancedConsumer.java
  8. 5 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  9. 2 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java
  10. 48 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java
  11. 4 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java
  12. 4 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
  13. 10 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java
  14. 4 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
  15. 82 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ApplicationMetrics.java
  16. 0 29
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java
  17. 1 1
      kafka-ui-api/src/main/resources/application.yml
  18. 7 7
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

+ 5 - 0
kafka-ui-api/pom.xml

@@ -114,6 +114,11 @@
             <artifactId>json</artifactId>
             <version>${org.json.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.micrometer</groupId>
+            <artifactId>micrometer-registry-prometheus</artifactId>
+            <scope>runtime</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/auth/AbstractAuthSecurityConfig.java

@@ -13,6 +13,7 @@ abstract class AbstractAuthSecurityConfig {
       "/resources/**",
       "/actuator/health/**",
       "/actuator/info",
+      "/actuator/prometheus",
       "/auth",
       "/login",
       "/logout",

+ 7 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -2,37 +2,28 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.time.Duration;
-import java.time.Instant;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 public abstract class AbstractEmitter {
 
   private final MessagesProcessing messagesProcessing;
-  private final PollingThrottler throttler;
   protected final PollingSettings pollingSettings;
 
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
     this.messagesProcessing = messagesProcessing;
     this.pollingSettings = pollingSettings;
-    this.throttler = pollingSettings.getPollingThrottler();
   }
 
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
+  protected PolledRecords poll(
+      FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
     return poll(sink, consumer, pollingSettings.getPollTimeout());
   }
 
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
-    Instant start = Instant.now();
-    ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
-    Instant finish = Instant.now();
-    int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
-    throttler.throttleAfterPoll(polledBytes);
+  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
+    var records = consumer.pollEnhanced(timeout);
+    sendConsuming(sink, records);
     return records;
   }
 
@@ -49,10 +40,8 @@ public abstract class AbstractEmitter {
     messagesProcessing.sendPhase(sink, name);
   }
 
-  protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
-                              ConsumerRecords<Bytes, Bytes> records,
-                              long elapsed) {
-    return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
+  protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
+    messagesProcessing.sentConsumingInfo(sink, records);
   }
 
   protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {

+ 7 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -9,9 +9,7 @@ import java.util.List;
 import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.Bytes;
@@ -22,12 +20,12 @@ public class BackwardRecordEmitter
     extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
   private final int messagesPerPage;
 
   public BackwardRecordEmitter(
-      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      Supplier<EnhancedConsumer> consumerSupplier,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
       MessagesProcessing messagesProcessing,
@@ -41,7 +39,7 @@ public class BackwardRecordEmitter
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting backward polling for {}", consumerPosition);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Created consumer");
 
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
@@ -91,7 +89,7 @@ public class BackwardRecordEmitter
       TopicPartition tp,
       long fromOffset,
       long toOffset,
-      Consumer<Bytes, Bytes> consumer,
+      EnhancedConsumer consumer,
       FluxSink<TopicMessageEventDTO> sink
   ) {
     consumer.assign(Collections.singleton(tp));
@@ -101,13 +99,13 @@ public class BackwardRecordEmitter
 
     var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
 
-    EmptyPollsCounter emptyPolls  = pollingSettings.createEmptyPollsCounter();
+    EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
     while (!sink.isCancelled()
         && !sendLimitReached()
         && recordsToSend.size() < desiredMsgsToPoll
         && !emptyPolls.noDataEmptyPollsReached()) {
       var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
-      emptyPolls.count(polledRecords);
+      emptyPolls.count(polledRecords.count());
 
       log.debug("{} records polled from {}", polledRecords.count(), tp);
 
@@ -115,7 +113,7 @@ public class BackwardRecordEmitter
           .filter(r -> r.offset() < toOffset)
           .toList();
 
-      if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
+      if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
         // we already read all messages in target offsets interval
         break;
       }

+ 4 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -2,9 +2,6 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 class ConsumingStats {
@@ -13,23 +10,17 @@ class ConsumingStats {
   private int records = 0;
   private long elapsed = 0;
 
-  /**
-   * returns bytes polled.
-   */
-  int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
-                        ConsumerRecords<Bytes, Bytes> polledRecords,
-                        long elapsed,
+  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
+                        PolledRecords polledRecords,
                         int filterApplyErrors) {
-    int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
-    bytes += polledBytes;
+    bytes += polledRecords.bytes();
     this.records += polledRecords.count();
-    this.elapsed += elapsed;
+    this.elapsed += polledRecords.elapsed().toMillis();
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
             .consuming(createConsumingStats(sink, filterApplyErrors))
     );
-    return polledBytes;
   }
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java

@@ -17,8 +17,8 @@ public class EmptyPollsCounter {
     this.maxEmptyPolls = maxEmptyPolls;
   }
 
-  public void count(ConsumerRecords<?, ?> polled) {
-    emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
+  public void count(int polledCount) {
+    emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
   }
 
   public boolean noDataEmptyPollsReached() {

+ 82 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EnhancedConsumer.java

@@ -0,0 +1,82 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Delegate;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+
+
+public class EnhancedConsumer extends KafkaConsumer<Bytes, Bytes> {
+
+  private final PollingThrottler throttler;
+  private final ApplicationMetrics metrics;
+  private String pollingTopic;
+
+  public EnhancedConsumer(Properties properties,
+                          PollingThrottler throttler,
+                          ApplicationMetrics metrics) {
+    super(properties, new BytesDeserializer(), new BytesDeserializer());
+    this.throttler = throttler;
+    this.metrics = metrics;
+    metrics.activeConsumers().incrementAndGet();
+  }
+
+  public PolledRecords pollEnhanced(Duration dur) {
+    var stopwatch = Stopwatch.createStarted();
+    ConsumerRecords<Bytes, Bytes> polled = poll(dur);
+    PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
+    var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
+    metrics.meterPolledRecords(pollingTopic, polledEnhanced, throttled);
+    return polledEnhanced;
+  }
+
+  @Override
+  public void assign(Collection<TopicPartition> partitions) {
+    super.assign(partitions);
+    Set<String> assignedTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+    Preconditions.checkState(assignedTopics.size() == 1);
+    this.pollingTopic = assignedTopics.iterator().next();
+  }
+
+  @Override
+  public void subscribe(Pattern pattern) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void subscribe(Collection<String> topics) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close(Duration timeout) {
+    metrics.activeConsumers().decrementAndGet();
+    super.close(timeout);
+  }
+
+}

+ 5 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -5,8 +5,6 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
@@ -16,11 +14,11 @@ public class ForwardRecordEmitter
     extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition position;
 
   public ForwardRecordEmitter(
-      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      Supplier<EnhancedConsumer> consumerSupplier,
       ConsumerPosition position,
       MessagesProcessing messagesProcessing,
       PollingSettings pollingSettings) {
@@ -32,7 +30,7 @@ public class ForwardRecordEmitter
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting forward polling for {}", position);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Assigning partitions");
       var seekOperations = SeekOperations.create(consumer, position);
       seekOperations.assignAndSeekNonEmptyPartitions();
@@ -44,8 +42,8 @@ public class ForwardRecordEmitter
           && !emptyPolls.noDataEmptyPollsReached()) {
 
         sendPhase(sink, "Polling");
-        ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
-        emptyPolls.count(records);
+        var records = poll(sink, consumer);
+        emptyPolls.count(records.count());
 
         log.debug("{} records polled", records.count());
 

+ 2 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -8,7 +8,6 @@ import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
@@ -54,13 +53,10 @@ public class MessagesProcessing {
     }
   }
 
-  int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
-                        ConsumerRecords<Bytes, Bytes> polledRecords,
-                        long elapsed) {
+  void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
     if (!sink.isCancelled()) {
-      return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
+      consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
     }
-    return 0;
   }
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {

+ 48 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java

@@ -0,0 +1,48 @@
+package com.provectus.kafka.ui.emitter;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Bytes;
+
+public record PolledRecords(int count,
+                            int bytes,
+                            Duration elapsed,
+                            ConsumerRecords<Bytes, Bytes> records) implements Iterable<ConsumerRecord<Bytes, Bytes>> {
+
+  static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
+    return new PolledRecords(
+        polled.count(),
+        calculatePolledRecSize(polled),
+        pollDuration,
+        polled
+    );
+  }
+
+  public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
+    return records.records(tp);
+  }
+
+  @Override
+  public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
+    return records.iterator();
+  }
+
+  private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
+    int polledBytes = 0;
+    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
+      for (Header header : rec.headers()) {
+        polledBytes +=
+            (header.key() != null ? header.key().getBytes().length : 0)
+                + (header.value() != null ? header.value().length : 0);
+      }
+      polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
+      polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
+    }
+    return polledBytes;
+  }
+}

+ 4 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java

@@ -3,11 +3,8 @@ package com.provectus.kafka.ui.emitter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Bytes;
 
 @Slf4j
 public class PollingThrottler {
@@ -36,18 +33,17 @@ public class PollingThrottler {
     return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
   }
 
-  public void throttleAfterPoll(int polledBytes) {
+  //returns true if polling was throttled
+  public boolean throttleAfterPoll(int polledBytes) {
     if (polledBytes > 0) {
       double sleptSeconds = rateLimiter.acquire(polledBytes);
       if (!throttled && sleptSeconds > 0.0) {
         throttled = true;
         log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
+        return true;
       }
     }
-  }
-
-  public void throttleAfterPoll(ConsumerRecords<Bytes, Bytes> polled) {
-    throttleAfterPoll(ConsumerRecordsUtil.calculatePolledSize(polled));
+    return false;
   }
 
 }

+ 4 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -5,19 +5,17 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.HashMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
 public class TailingEmitter extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
 
-  public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+  public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
                         ConsumerPosition consumerPosition,
                         MessagesProcessing messagesProcessing,
                         PollingSettings pollingSettings) {
@@ -29,7 +27,7 @@ public class TailingEmitter extends AbstractEmitter
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting tailing polling for {}", consumerPosition);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       assignAndSeek(consumer);
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");
@@ -47,7 +45,7 @@ public class TailingEmitter extends AbstractEmitter
     }
   }
 
-  private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
+  private void assignAndSeek(EnhancedConsumer consumer) {
     var seekOperations = SeekOperations.create(consumer, consumerPosition);
     var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
     seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions

+ 10 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -2,12 +2,14 @@ package com.provectus.kafka.ui.service;
 
 import com.google.common.collect.Streams;
 import com.google.common.collect.Table;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,11 +28,8 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 
@@ -248,25 +247,27 @@ public class ConsumerGroupService {
         .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
   }
 
-  public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
+  public EnhancedConsumer createConsumer(KafkaCluster cluster) {
     return createConsumer(cluster, Map.of());
   }
 
-  public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
-                                                    Map<String, Object> properties) {
+  public EnhancedConsumer createConsumer(KafkaCluster cluster,
+                                         Map<String, Object> properties) {
     Properties props = new Properties();
     SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
     props.putAll(cluster.getProperties());
     props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
     props.putAll(properties);
 
-    return new KafkaConsumer<>(props);
+    return new EnhancedConsumer(
+        props,
+        cluster.getPollingSettings().getPollingThrottler(),
+        ApplicationMetrics.forCluster(cluster)
+    );
   }
 
 }

+ 4 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,9 +1,9 @@
 package com.provectus.kafka.ui.service.analyze;
 
 import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.OffsetsInfo;
 import com.provectus.kafka.ui.emitter.PollingSettings;
-import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.exception.TopicAnalysisException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
@@ -20,11 +20,9 @@ import java.util.stream.IntStream;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -84,12 +82,11 @@ public class TopicAnalysisService {
     private final int partitionsCnt;
     private final long approxNumberOfMsgs;
     private final EmptyPollsCounter emptyPollsCounter;
-    private final PollingThrottler throttler;
 
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
     private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
 
-    private final KafkaConsumer<Bytes, Bytes> consumer;
+    private final EnhancedConsumer consumer;
 
     AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
                  long approxNumberOfMsgs, PollingSettings pollingSettings) {
@@ -104,7 +101,6 @@ public class TopicAnalysisService {
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
           )
       );
-      this.throttler = pollingSettings.getPollingThrottler();
       this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
     }
 
@@ -127,9 +123,8 @@ public class TopicAnalysisService {
 
         var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
         while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
-          var polled = consumer.poll(Duration.ofSeconds(3));
-          throttler.throttleAfterPoll(polled);
-          emptyPollsCounter.count(polled);
+          var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
+          emptyPollsCounter.count(polled.count());
           polled.forEach(r -> {
             totalStats.apply(r);
             partitionStats.get(r.partition()).apply(r);

+ 82 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ApplicationMetrics.java

@@ -0,0 +1,82 @@
+package com.provectus.kafka.ui.util;
+
+import static lombok.AccessLevel.PRIVATE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.provectus.kafka.ui.emitter.PolledRecords;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor(access = PRIVATE)
+public class ApplicationMetrics {
+
+  private final String clusterName;
+  private final MeterRegistry registry;
+
+  public static ApplicationMetrics forCluster(KafkaCluster cluster) {
+    return new ApplicationMetrics(cluster.getName(), Metrics.globalRegistry);
+  }
+
+  @VisibleForTesting
+  public static ApplicationMetrics noop() {
+    return new ApplicationMetrics("noop", new SimpleMeterRegistry());
+  }
+
+  public void meterPolledRecords(String topic, PolledRecords polled, boolean throttled) {
+    pollTimer(topic).record(polled.elapsed());
+    polledRecords(topic).increment(polled.count());
+    polledBytes(topic).record(polled.bytes());
+    if (throttled) {
+      pollThrottlingActivations().increment();
+    }
+  }
+
+  private Counter polledRecords(String topic) {
+    return Counter.builder("topic_records_polled")
+        .description("Number of records polled from topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private DistributionSummary polledBytes(String topic) {
+    return DistributionSummary.builder("topic_polled_bytes")
+        .description("Bytes polled from kafka topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private Timer pollTimer(String topic) {
+    return Timer.builder("topic_poll_time")
+        .description("Time spend in polling for topic")
+        .tag("cluster", clusterName)
+        .tag("topic", topic)
+        .register(registry);
+  }
+
+  private Counter pollThrottlingActivations() {
+    return Counter.builder("poll_throttling_activations")
+        .description("Number of poll throttling activations")
+        .tag("cluster", clusterName)
+        .register(registry);
+  }
+
+  public AtomicInteger activeConsumers() {
+    var count = new AtomicInteger();
+    Gauge.builder("active_consumers", () -> count)
+        .description("Number of active consumers")
+        .tag("cluster", clusterName)
+        .register(registry);
+    return count;
+  }
+
+}

+ 0 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java

@@ -1,29 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.utils.Bytes;
-
-public class ConsumerRecordsUtil {
-
-  public static int calculatePolledRecSize(ConsumerRecord<Bytes, Bytes> rec) {
-    int polledBytes = 0;
-    for (Header header : rec.headers()) {
-      polledBytes +=
-          (header.key() != null ? header.key().getBytes().length : 0)
-              + (header.value() != null ? header.value().length : 0);
-    }
-    polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
-    polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
-    return polledBytes;
-  }
-
-  public static int calculatePolledSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
-    int polledBytes = 0;
-    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
-      polledBytes += calculatePolledRecSize(rec);
-    }
-    return polledBytes;
-  }
-
-}

+ 1 - 1
kafka-ui-api/src/main/resources/application.yml

@@ -10,7 +10,7 @@ management:
   endpoints:
     web:
       exposure:
-        include: "info,health"
+        include: "info,health,prometheus"
 
 logging:
   level:

+ 7 - 7
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -8,9 +8,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.PollingSettings;
+import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
@@ -18,6 +20,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -38,7 +41,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -325,22 +327,20 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
   }
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer() {
+  private EnhancedConsumer createConsumer() {
     return createConsumer(Map.of());
   }
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer(Map<String, Object> properties) {
+  private EnhancedConsumer createConsumer(Map<String, Object> properties) {
     final Map<String, ? extends Serializable> map = Map.of(
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
         ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
-        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls
-        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
+        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19 // to check multiple polls
     );
     Properties props = new Properties();
     props.putAll(map);
     props.putAll(properties);
-    return new KafkaConsumer<>(props);
+    return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
   }
 
   @Value