Просмотр исходного кода

ISSUE-4052: Application metrics preparations

iliax 1 год назад
Родитель
Сommit
e00ab56f38

+ 5 - 0
kafka-ui-api/pom.xml

@@ -114,6 +114,11 @@
             <artifactId>json</artifactId>
             <version>${org.json.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.micrometer</groupId>
+            <artifactId>micrometer-registry-prometheus</artifactId>
+            <scope>runtime</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>

+ 8 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,38 +1,30 @@
 package com.provectus.kafka.ui.emitter;
 
+import com.provectus.kafka.ui.emitter.EnhancedConsumer.PolledRecords;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.time.Duration;
-import java.time.Instant;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 public abstract class AbstractEmitter {
 
   private final MessagesProcessing messagesProcessing;
-  private final PollingThrottler throttler;
   protected final PollingSettings pollingSettings;
 
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
     this.messagesProcessing = messagesProcessing;
     this.pollingSettings = pollingSettings;
-    this.throttler = pollingSettings.getPollingThrottler();
   }
 
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
+  protected PolledRecords poll(
+      FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
     return poll(sink, consumer, pollingSettings.getPollTimeout());
   }
 
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
-    Instant start = Instant.now();
-    ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
-    Instant finish = Instant.now();
-    int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
-    throttler.throttleAfterPoll(polledBytes);
+  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
+    var records = consumer.pollEnhanced(timeout);
+    sendConsuming(sink, records);
     return records;
   }
 
@@ -49,10 +41,8 @@ public abstract class AbstractEmitter {
     messagesProcessing.sendPhase(sink, name);
   }
 
-  protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
-                              ConsumerRecords<Bytes, Bytes> records,
-                              long elapsed) {
-    return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
+  protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
+    messagesProcessing.sentConsumingInfo(sink, records);
   }
 
   protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {

+ 7 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -9,9 +9,7 @@ import java.util.List;
 import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.Bytes;
@@ -22,12 +20,12 @@ public class BackwardRecordEmitter
     extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
   private final int messagesPerPage;
 
   public BackwardRecordEmitter(
-      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      Supplier<EnhancedConsumer> consumerSupplier,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
       MessagesProcessing messagesProcessing,
@@ -41,7 +39,7 @@ public class BackwardRecordEmitter
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting backward polling for {}", consumerPosition);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Created consumer");
 
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
@@ -91,7 +89,7 @@ public class BackwardRecordEmitter
       TopicPartition tp,
       long fromOffset,
       long toOffset,
-      Consumer<Bytes, Bytes> consumer,
+      EnhancedConsumer consumer,
       FluxSink<TopicMessageEventDTO> sink
   ) {
     consumer.assign(Collections.singleton(tp));
@@ -101,13 +99,13 @@ public class BackwardRecordEmitter
 
     var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
 
-    EmptyPollsCounter emptyPolls  = pollingSettings.createEmptyPollsCounter();
+    EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
     while (!sink.isCancelled()
         && !sendLimitReached()
         && recordsToSend.size() < desiredMsgsToPoll
         && !emptyPolls.noDataEmptyPollsReached()) {
       var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
-      emptyPolls.count(polledRecords);
+      emptyPolls.count(polledRecords.count());
 
       log.debug("{} records polled from {}", polledRecords.count(), tp);
 
@@ -115,7 +113,7 @@ public class BackwardRecordEmitter
           .filter(r -> r.offset() < toOffset)
           .toList();
 
-      if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
+      if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
         // we already read all messages in target offsets interval
         break;
       }

+ 4 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -2,9 +2,6 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 class ConsumingStats {
@@ -13,23 +10,17 @@ class ConsumingStats {
   private int records = 0;
   private long elapsed = 0;
 
-  /**
-   * returns bytes polled.
-   */
-  int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
-                        ConsumerRecords<Bytes, Bytes> polledRecords,
-                        long elapsed,
+  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
+                        EnhancedConsumer.PolledRecords polledRecords,
                         int filterApplyErrors) {
-    int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
-    bytes += polledBytes;
+    bytes += polledRecords.bytes();
     this.records += polledRecords.count();
-    this.elapsed += elapsed;
+    this.elapsed += polledRecords.elapsed().toMillis();
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
             .consuming(createConsumingStats(sink, filterApplyErrors))
     );
-    return polledBytes;
   }
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java

@@ -17,8 +17,8 @@ public class EmptyPollsCounter {
     this.maxEmptyPolls = maxEmptyPolls;
   }
 
-  public void count(ConsumerRecords<?, ?> polled) {
-    emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
+  public void count(int polledCount) {
+    emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
   }
 
   public boolean noDataEmptyPollsReached() {

+ 68 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EnhancedConsumer.java

@@ -0,0 +1,68 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.google.common.base.Stopwatch;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Delegate;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Bytes;
+
+
+@RequiredArgsConstructor
+public class EnhancedConsumer implements Consumer<Bytes, Bytes> {
+
+  public record PolledRecords(int count, int bytes, Duration elapsed, ConsumerRecords<Bytes, Bytes> records)
+      implements Iterable<ConsumerRecord<Bytes, Bytes>> {
+
+    static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
+      return new PolledRecords(
+          polled.count(),
+          calculatePolledRecSize(polled),
+          pollDuration,
+          polled
+      );
+    }
+
+    public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
+      return records.records(tp);
+    }
+
+    @Override
+    public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
+      return records.iterator();
+    }
+  }
+
+  @Delegate
+  private final Consumer<Bytes, Bytes> consumer;
+  private final PollingThrottler throttler;
+
+  public PolledRecords pollEnhanced(Duration dur) {
+    var stopwatch = Stopwatch.createStarted();
+    ConsumerRecords<Bytes, Bytes> polled = consumer.poll(dur);
+    PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
+    throttler.throttleAfterPoll(polledEnhanced.bytes);
+    return polledEnhanced;
+  }
+
+  private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
+    int polledBytes = 0;
+    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
+      for (Header header : rec.headers()) {
+        polledBytes +=
+            (header.key() != null ? header.key().getBytes().length : 0)
+                + (header.value() != null ? header.value().length : 0);
+      }
+      polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
+      polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
+    }
+    return polledBytes;
+  }
+
+}

+ 5 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -5,8 +5,6 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
@@ -16,11 +14,11 @@ public class ForwardRecordEmitter
     extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition position;
 
   public ForwardRecordEmitter(
-      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      Supplier<EnhancedConsumer> consumerSupplier,
       ConsumerPosition position,
       MessagesProcessing messagesProcessing,
       PollingSettings pollingSettings) {
@@ -32,7 +30,7 @@ public class ForwardRecordEmitter
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting forward polling for {}", position);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Assigning partitions");
       var seekOperations = SeekOperations.create(consumer, position);
       seekOperations.assignAndSeekNonEmptyPartitions();
@@ -44,8 +42,8 @@ public class ForwardRecordEmitter
           && !emptyPolls.noDataEmptyPollsReached()) {
 
         sendPhase(sink, "Polling");
-        ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
-        emptyPolls.count(records);
+        var records = poll(sink, consumer);
+        emptyPolls.count(records.count());
 
         log.debug("{} records polled", records.count());
 

+ 4 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.emitter;
 
+import static com.provectus.kafka.ui.emitter.EnhancedConsumer.PolledRecords;
+
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
@@ -8,7 +10,6 @@ import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
@@ -54,13 +55,10 @@ public class MessagesProcessing {
     }
   }
 
-  int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
-                        ConsumerRecords<Bytes, Bytes> polledRecords,
-                        long elapsed) {
+  void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
     if (!sink.isCancelled()) {
-      return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
+      consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
     }
-    return 0;
   }
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {

+ 0 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java

@@ -3,11 +3,8 @@ package com.provectus.kafka.ui.emitter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Bytes;
 
 @Slf4j
 public class PollingThrottler {
@@ -46,8 +43,4 @@ public class PollingThrottler {
     }
   }
 
-  public void throttleAfterPoll(ConsumerRecords<Bytes, Bytes> polled) {
-    throttleAfterPoll(ConsumerRecordsUtil.calculatePolledSize(polled));
-  }
-
 }

+ 4 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -5,19 +5,17 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.HashMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
 public class TailingEmitter extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
 
-  public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+  public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
                         ConsumerPosition consumerPosition,
                         MessagesProcessing messagesProcessing,
                         PollingSettings pollingSettings) {
@@ -29,7 +27,7 @@ public class TailingEmitter extends AbstractEmitter
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting tailing polling for {}", consumerPosition);
-    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
       assignAndSeek(consumer);
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");
@@ -47,7 +45,7 @@ public class TailingEmitter extends AbstractEmitter
     }
   }
 
-  private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
+  private void assignAndSeek(EnhancedConsumer consumer) {
     var seekOperations = SeekOperations.create(consumer, consumerPosition);
     var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
     seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions

+ 5 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
 
 import com.google.common.collect.Streams;
 import com.google.common.collect.Table;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
@@ -30,7 +31,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 
@@ -248,12 +248,12 @@ public class ConsumerGroupService {
         .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
   }
 
-  public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
+  public EnhancedConsumer createConsumer(KafkaCluster cluster) {
     return createConsumer(cluster, Map.of());
   }
 
-  public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
-                                                    Map<String, Object> properties) {
+  public EnhancedConsumer createConsumer(KafkaCluster cluster,
+                                         Map<String, Object> properties) {
     Properties props = new Properties();
     SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
     props.putAll(cluster.getProperties());
@@ -266,7 +266,7 @@ public class ConsumerGroupService {
     props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
     props.putAll(properties);
 
-    return new KafkaConsumer<>(props);
+    return new EnhancedConsumer(new KafkaConsumer<>(props), cluster.getPollingSettings().getPollingThrottler());
   }
 
 }

+ 4 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,9 +1,9 @@
 package com.provectus.kafka.ui.service.analyze;
 
 import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.OffsetsInfo;
 import com.provectus.kafka.ui.emitter.PollingSettings;
-import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.exception.TopicAnalysisException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
@@ -20,11 +20,9 @@ import java.util.stream.IntStream;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -84,12 +82,11 @@ public class TopicAnalysisService {
     private final int partitionsCnt;
     private final long approxNumberOfMsgs;
     private final EmptyPollsCounter emptyPollsCounter;
-    private final PollingThrottler throttler;
 
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
     private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
 
-    private final KafkaConsumer<Bytes, Bytes> consumer;
+    private final EnhancedConsumer consumer;
 
     AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
                  long approxNumberOfMsgs, PollingSettings pollingSettings) {
@@ -104,7 +101,6 @@ public class TopicAnalysisService {
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
           )
       );
-      this.throttler = pollingSettings.getPollingThrottler();
       this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
     }
 
@@ -127,9 +123,8 @@ public class TopicAnalysisService {
 
         var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
         while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
-          var polled = consumer.poll(Duration.ofSeconds(3));
-          throttler.throttleAfterPoll(polled);
-          emptyPollsCounter.count(polled);
+          var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
+          emptyPollsCounter.count(polled.count());
           polled.forEach(r -> {
             totalStats.apply(r);
             partitionStats.get(r.partition()).apply(r);

+ 0 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java

@@ -1,29 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.utils.Bytes;
-
-public class ConsumerRecordsUtil {
-
-  public static int calculatePolledRecSize(ConsumerRecord<Bytes, Bytes> rec) {
-    int polledBytes = 0;
-    for (Header header : rec.headers()) {
-      polledBytes +=
-          (header.key() != null ? header.key().getBytes().length : 0)
-              + (header.value() != null ? header.value().length : 0);
-    }
-    polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
-    polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
-    return polledBytes;
-  }
-
-  public static int calculatePolledSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
-    int polledBytes = 0;
-    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
-      polledBytes += calculatePolledRecSize(rec);
-    }
-    return polledBytes;
-  }
-
-}