ソースを参照

Messaging polling refactoring (#4088)

1. RecordEmitter logic refactored - all polling logic moved RangePollingEmitter. Backward/Forward implementations just chouse how range is calculated for next step.
2. Using consumer.position() method to define that range is fully polled (not empty polls counting deleted)
3. AnalysisTask: calculating progress based on processed offsets, not polled messages (more accurate for compacted topics)
4. Sorting polled messages by ts before sending to frontend
Ilya Kuramshin 1 年間 前
コミット
2a61b97fab
25 ファイル変更565 行追加477 行削除
  1. 0 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  2. 6 13
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
  3. 60 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java
  4. 0 126
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  5. 16 14
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
  6. 0 28
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java
  7. 61 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java
  8. 0 64
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  9. 63 29
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java
  10. 12 6
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java
  11. 0 29
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java
  12. 98 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java
  13. 26 13
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java
  14. 8 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
  15. 11 12
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/ConsumerRecordDeserializer.java
  16. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java
  17. 2 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java
  18. 12 45
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
  19. 5 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/AnalysisTasksStore.java
  20. 29 44
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
  21. 14 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/DataMasking.java
  22. 69 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessagesProcessingTest.java
  23. 30 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/ConsumerRecordDeserializerTest.java
  24. 42 30
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
  25. 0 4
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+ 0 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -57,8 +57,6 @@ public class ClustersProperties {
   @Data
   public static class PollingProperties {
     Integer pollTimeoutMs;
-    Integer partitionPollTimeout;
-    Integer noDataEmptyPolls;
     Integer maxPageSize;
     Integer defaultPageSize;
   }

+ 6 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,28 +1,22 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
-public abstract class AbstractEmitter {
+abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
   private final MessagesProcessing messagesProcessing;
-  protected final PollingSettings pollingSettings;
+  private final PollingSettings pollingSettings;
 
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
     this.messagesProcessing = messagesProcessing;
     this.pollingSettings = pollingSettings;
   }
 
-  protected PolledRecords poll(
-      FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
-    return poll(sink, consumer, pollingSettings.getPollTimeout());
-  }
-
-  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
-    var records = consumer.pollEnhanced(timeout);
+  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
+    var records = consumer.pollEnhanced(pollingSettings.getPollTimeout());
     sendConsuming(sink, records);
     return records;
   }
@@ -31,9 +25,8 @@ public abstract class AbstractEmitter {
     return messagesProcessing.limitReached();
   }
 
-  protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
-                             ConsumerRecord<Bytes, Bytes> msg) {
-    messagesProcessing.sendMsg(sink, msg);
+  protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
+    messagesProcessing.send(sink, records);
   }
 
   protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {

+ 60 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java

@@ -0,0 +1,60 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+
+public class BackwardEmitter extends RangePollingEmitter {
+
+  public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
+                         ConsumerPosition consumerPosition,
+                         int messagesPerPage,
+                         ConsumerRecordDeserializer deserializer,
+                         Predicate<TopicMessageDTO> filter,
+                         PollingSettings pollingSettings) {
+    super(
+        consumerSupplier,
+        consumerPosition,
+        messagesPerPage,
+        new MessagesProcessing(
+            deserializer,
+            filter,
+            false,
+            messagesPerPage
+        ),
+        pollingSettings
+    );
+  }
+
+  @Override
+  protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
+                                                                   SeekOperations seekOperations) {
+    TreeMap<TopicPartition, Long> readToOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
+    if (prevRange.isEmpty()) {
+      readToOffsets.putAll(seekOperations.getOffsetsForSeek());
+    } else {
+      readToOffsets.putAll(
+          prevRange.entrySet()
+              .stream()
+              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().from()))
+      );
+    }
+
+    int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readToOffsets.size());
+    TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
+    readToOffsets.forEach((tp, toOffset) -> {
+      long tpStartOffset = seekOperations.getBeginOffsets().get(tp);
+      if (toOffset > tpStartOffset) {
+        result.put(tp, new FromToOffset(Math.max(tpStartOffset, toOffset - msgsToPollPerPartition), toOffset));
+      }
+    });
+    return result;
+  }
+}

+ 0 - 126
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -1,126 +0,0 @@
-package com.provectus.kafka.ui.emitter;
-
-import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TreeMap;
-import java.util.function.Supplier;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.utils.Bytes;
-import reactor.core.publisher.FluxSink;
-
-@Slf4j
-public class BackwardRecordEmitter
-    extends AbstractEmitter
-    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
-
-  private final Supplier<EnhancedConsumer> consumerSupplier;
-  private final ConsumerPosition consumerPosition;
-  private final int messagesPerPage;
-
-  public BackwardRecordEmitter(
-      Supplier<EnhancedConsumer> consumerSupplier,
-      ConsumerPosition consumerPosition,
-      int messagesPerPage,
-      MessagesProcessing messagesProcessing,
-      PollingSettings pollingSettings) {
-    super(messagesProcessing, pollingSettings);
-    this.consumerPosition = consumerPosition;
-    this.messagesPerPage = messagesPerPage;
-    this.consumerSupplier = consumerSupplier;
-  }
-
-  @Override
-  public void accept(FluxSink<TopicMessageEventDTO> sink) {
-    log.debug("Starting backward polling for {}", consumerPosition);
-    try (EnhancedConsumer consumer = consumerSupplier.get()) {
-      sendPhase(sink, "Created consumer");
-
-      var seekOperations = SeekOperations.create(consumer, consumerPosition);
-      var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
-      readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
-
-      int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
-      log.debug("'Until' offsets for polling: {}", readUntilOffsets);
-
-      while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
-        new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
-          if (sink.isCancelled()) {
-            return; //fast return in case of sink cancellation
-          }
-          long beginOffset = seekOperations.getBeginOffsets().get(tp);
-          long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
-
-          partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
-              .forEach(r -> sendMessage(sink, r));
-
-          if (beginOffset == readFromOffset) {
-            // we fully read this partition -> removing it from polling iterations
-            readUntilOffsets.remove(tp);
-          } else {
-            // updating 'to' offset for next polling iteration
-            readUntilOffsets.put(tp, readFromOffset);
-          }
-        });
-        if (readUntilOffsets.isEmpty()) {
-          log.debug("begin reached after partitions poll iteration");
-        } else if (sink.isCancelled()) {
-          log.debug("sink is cancelled after partitions poll iteration");
-        }
-      }
-      sendFinishStatsAndCompleteSink(sink);
-      log.debug("Polling finished");
-    } catch (InterruptException kafkaInterruptException) {
-      log.debug("Polling finished due to thread interruption");
-      sink.complete();
-    } catch (Exception e) {
-      log.error("Error occurred while consuming records", e);
-      sink.error(e);
-    }
-  }
-
-  private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
-      TopicPartition tp,
-      long fromOffset,
-      long toOffset,
-      EnhancedConsumer consumer,
-      FluxSink<TopicMessageEventDTO> sink
-  ) {
-    consumer.assign(Collections.singleton(tp));
-    consumer.seek(tp, fromOffset);
-    sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
-    int desiredMsgsToPoll = (int) (toOffset - fromOffset);
-
-    var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
-
-    EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
-    while (!sink.isCancelled()
-        && !sendLimitReached()
-        && recordsToSend.size() < desiredMsgsToPoll
-        && !emptyPolls.noDataEmptyPollsReached()) {
-      var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
-      emptyPolls.count(polledRecords.count());
-
-      log.debug("{} records polled from {}", polledRecords.count(), tp);
-
-      var filteredRecords = polledRecords.records(tp).stream()
-          .filter(r -> r.offset() < toOffset)
-          .toList();
-
-      if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
-        // we already read all messages in target offsets interval
-        break;
-      }
-      recordsToSend.addAll(filteredRecords);
-    }
-    log.debug("{} records to send", recordsToSend.size());
-    Collections.reverse(recordsToSend);
-    return recordsToSend;
-  }
-}

+ 16 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -9,35 +9,37 @@ class ConsumingStats {
   private long bytes = 0;
   private int records = 0;
   private long elapsed = 0;
+  private int filterApplyErrors = 0;
 
-  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
-                        PolledRecords polledRecords,
-                        int filterApplyErrors) {
+  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
     bytes += polledRecords.bytes();
-    this.records += polledRecords.count();
-    this.elapsed += polledRecords.elapsed().toMillis();
+    records += polledRecords.count();
+    elapsed += polledRecords.elapsed().toMillis();
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
-            .consuming(createConsumingStats(sink, filterApplyErrors))
+            .consuming(createConsumingStats())
     );
   }
 
-  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
+  void incFilterApplyError() {
+    filterApplyErrors++;
+  }
+
+  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.DONE)
-            .consuming(createConsumingStats(sink, filterApplyErrors))
+            .consuming(createConsumingStats())
     );
   }
 
-  private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
-                                                        int filterApplyErrors) {
+  private TopicMessageConsumingDTO createConsumingStats() {
     return new TopicMessageConsumingDTO()
-        .bytesConsumed(this.bytes)
-        .elapsedMs(this.elapsed)
-        .isCancelled(sink.isCancelled())
+        .bytesConsumed(bytes)
+        .elapsedMs(elapsed)
+        .isCancelled(false)
         .filterApplyErrors(filterApplyErrors)
-        .messagesConsumed(this.records);
+        .messagesConsumed(records);
   }
 }

+ 0 - 28
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java

@@ -1,28 +0,0 @@
-package com.provectus.kafka.ui.emitter;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-
-// In some situations it is hard to say whether records range (between two offsets) was fully polled.
-// This happens when we have holes in records sequences that is usual case for compact topics or
-// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
-// there is no guarantee that you will ever see record with offset Y.
-// To workaround this we can assume that after N consecutive empty polls all target messages were read.
-public class EmptyPollsCounter {
-
-  private final int maxEmptyPolls;
-
-  private int emptyPolls = 0;
-
-  EmptyPollsCounter(int maxEmptyPolls) {
-    this.maxEmptyPolls = maxEmptyPolls;
-  }
-
-  public void count(int polledCount) {
-    emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
-  }
-
-  public boolean noDataEmptyPollsReached() {
-    return emptyPolls >= maxEmptyPolls;
-  }
-
-}

+ 61 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java

@@ -0,0 +1,61 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+
+public class ForwardEmitter extends RangePollingEmitter {
+
+  public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
+                        ConsumerPosition consumerPosition,
+                        int messagesPerPage,
+                        ConsumerRecordDeserializer deserializer,
+                        Predicate<TopicMessageDTO> filter,
+                        PollingSettings pollingSettings) {
+    super(
+        consumerSupplier,
+        consumerPosition,
+        messagesPerPage,
+        new MessagesProcessing(
+            deserializer,
+            filter,
+            true,
+            messagesPerPage
+        ),
+        pollingSettings
+    );
+  }
+
+  @Override
+  protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
+                                                                   SeekOperations seekOperations) {
+    TreeMap<TopicPartition, Long> readFromOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
+    if (prevRange.isEmpty()) {
+      readFromOffsets.putAll(seekOperations.getOffsetsForSeek());
+    } else {
+      readFromOffsets.putAll(
+          prevRange.entrySet()
+              .stream()
+              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().to()))
+      );
+    }
+
+    int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readFromOffsets.size());
+    TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
+    readFromOffsets.forEach((tp, fromOffset) -> {
+      long tpEndOffset = seekOperations.getEndOffsets().get(tp);
+      if (fromOffset < tpEndOffset) {
+        result.put(tp, new FromToOffset(fromOffset, Math.min(tpEndOffset, fromOffset + msgsToPollPerPartition)));
+      }
+    });
+    return result;
+  }
+
+}

+ 0 - 64
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -1,64 +0,0 @@
-package com.provectus.kafka.ui.emitter;
-
-import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import java.util.function.Supplier;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.utils.Bytes;
-import reactor.core.publisher.FluxSink;
-
-@Slf4j
-public class ForwardRecordEmitter
-    extends AbstractEmitter
-    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
-
-  private final Supplier<EnhancedConsumer> consumerSupplier;
-  private final ConsumerPosition position;
-
-  public ForwardRecordEmitter(
-      Supplier<EnhancedConsumer> consumerSupplier,
-      ConsumerPosition position,
-      MessagesProcessing messagesProcessing,
-      PollingSettings pollingSettings) {
-    super(messagesProcessing, pollingSettings);
-    this.position = position;
-    this.consumerSupplier = consumerSupplier;
-  }
-
-  @Override
-  public void accept(FluxSink<TopicMessageEventDTO> sink) {
-    log.debug("Starting forward polling for {}", position);
-    try (EnhancedConsumer consumer = consumerSupplier.get()) {
-      sendPhase(sink, "Assigning partitions");
-      var seekOperations = SeekOperations.create(consumer, position);
-      seekOperations.assignAndSeekNonEmptyPartitions();
-
-      EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
-      while (!sink.isCancelled()
-          && !sendLimitReached()
-          && !seekOperations.assignedPartitionsFullyPolled()
-          && !emptyPolls.noDataEmptyPollsReached()) {
-
-        sendPhase(sink, "Polling");
-        var records = poll(sink, consumer);
-        emptyPolls.count(records.count());
-
-        log.debug("{} records polled", records.count());
-
-        for (ConsumerRecord<Bytes, Bytes> msg : records) {
-          sendMessage(sink, msg);
-        }
-      }
-      sendFinishStatsAndCompleteSink(sink);
-      log.debug("Polling finished");
-    } catch (InterruptException kafkaInterruptException) {
-      log.debug("Polling finished due to thread interruption");
-      sink.complete();
-    } catch (Exception e) {
-      log.error("Error occurred while consuming records", e);
-      sink.error(e);
-    }
-  }
-}

+ 63 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -1,67 +1,75 @@
 package com.provectus.kafka.ui.emitter;
 
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Predicate;
 import javax.annotation.Nullable;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public class MessagesProcessing {
+@RequiredArgsConstructor
+class MessagesProcessing {
 
   private final ConsumingStats consumingStats = new ConsumingStats();
   private long sentMessages = 0;
-  private int filterApplyErrors = 0;
 
   private final ConsumerRecordDeserializer deserializer;
   private final Predicate<TopicMessageDTO> filter;
+  private final boolean ascendingSortBeforeSend;
   private final @Nullable Integer limit;
 
-  public MessagesProcessing(ConsumerRecordDeserializer deserializer,
-                            Predicate<TopicMessageDTO> filter,
-                            @Nullable Integer limit) {
-    this.deserializer = deserializer;
-    this.filter = filter;
-    this.limit = limit;
-  }
-
   boolean limitReached() {
     return limit != null && sentMessages >= limit;
   }
 
-  void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
-    if (!sink.isCancelled() && !limitReached()) {
-      TopicMessageDTO topicMessage = deserializer.deserialize(rec);
-      try {
-        if (filter.test(topicMessage)) {
-          sink.next(
-              new TopicMessageEventDTO()
-                  .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
-                  .message(topicMessage)
-          );
-          sentMessages++;
-        }
-      } catch (Exception e) {
-        filterApplyErrors++;
-        log.trace("Error applying filter for message {}", topicMessage);
-      }
-    }
+  void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
+    sortForSending(polled, ascendingSortBeforeSend)
+        .forEach(rec -> {
+          if (!limitReached() && !sink.isCancelled()) {
+            TopicMessageDTO topicMessage = deserializer.deserialize(rec);
+            try {
+              if (filter.test(topicMessage)) {
+                sink.next(
+                    new TopicMessageEventDTO()
+                        .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
+                        .message(topicMessage)
+                );
+                sentMessages++;
+              }
+            } catch (Exception e) {
+              consumingStats.incFilterApplyError();
+              log.trace("Error applying filter for message {}", topicMessage);
+            }
+          }
+        });
   }
 
   void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
     if (!sink.isCancelled()) {
-      consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
+      consumingStats.sendConsumingEvt(sink, polledRecords);
     }
   }
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
     if (!sink.isCancelled()) {
-      consumingStats.sendFinishEvent(sink, filterApplyErrors);
+      consumingStats.sendFinishEvent(sink);
     }
   }
 
@@ -75,4 +83,30 @@ public class MessagesProcessing {
     }
   }
 
+  /*
+   * Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
+   */
+  @VisibleForTesting
+  static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
+                                                               boolean asc) {
+    Comparator<ConsumerRecord> offsetComparator = asc
+        ? Comparator.comparingLong(ConsumerRecord::offset)
+        : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
+
+    // partition -> sorted by offsets records
+    Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
+        .collect(
+            groupingBy(
+                ConsumerRecord::partition,
+                TreeMap::new,
+                collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
+
+    Comparator<ConsumerRecord> tsComparator = asc
+        ? Comparator.comparing(ConsumerRecord::timestamp)
+        : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
+
+    // merge-sorting records from partitions one by one using timestamp comparator
+    return Iterables.mergeSorted(perPartition.values(), tsComparator);
+  }
+
 }

+ 12 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java

@@ -8,12 +8,13 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 @Slf4j
 @Getter
-public class OffsetsInfo {
+class OffsetsInfo {
 
   private final Consumer<?, ?> consumer;
 
@@ -23,7 +24,7 @@ public class OffsetsInfo {
   private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
   private final Set<TopicPartition> emptyPartitions = new HashSet<>();
 
-  public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
+  OffsetsInfo(Consumer<?, ?> consumer, String topic) {
     this(consumer,
         consumer.partitionsFor(topic).stream()
             .map(pi -> new TopicPartition(topic, pi.partition()))
@@ -31,8 +32,7 @@ public class OffsetsInfo {
     );
   }
 
-  public OffsetsInfo(Consumer<?, ?> consumer,
-                     Collection<TopicPartition> targetPartitions) {
+  OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
     this.consumer = consumer;
     this.beginOffsets = consumer.beginningOffsets(targetPartitions);
     this.endOffsets = consumer.endOffsets(targetPartitions);
@@ -46,8 +46,8 @@ public class OffsetsInfo {
     });
   }
 
-  public boolean assignedPartitionsFullyPolled() {
-    for (var tp: consumer.assignment()) {
+  boolean assignedPartitionsFullyPolled() {
+    for (var tp : consumer.assignment()) {
       Preconditions.checkArgument(endOffsets.containsKey(tp));
       if (endOffsets.get(tp) > consumer.position(tp)) {
         return false;
@@ -56,4 +56,10 @@ public class OffsetsInfo {
     return true;
   }
 
+  long summaryOffsetsRange() {
+    MutableLong cnt = new MutableLong();
+    nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
+    return cnt.getValue();
+  }
+
 }

+ 0 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java

@@ -8,13 +8,8 @@ import java.util.function.Supplier;
 public class PollingSettings {
 
   private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(1_000);
-  private static final Duration DEFAULT_PARTITION_POLL_TIMEOUT = Duration.ofMillis(200);
-  private static final int DEFAULT_NO_DATA_EMPTY_POLLS = 3;
 
   private final Duration pollTimeout;
-  private final Duration partitionPollTimeout;
-  private final int notDataEmptyPolls; //see EmptyPollsCounter docs
-
   private final Supplier<PollingThrottler> throttlerSupplier;
 
   public static PollingSettings create(ClustersProperties.Cluster cluster,
@@ -26,18 +21,8 @@ public class PollingSettings {
         ? Duration.ofMillis(pollingProps.getPollTimeoutMs())
         : DEFAULT_POLL_TIMEOUT;
 
-    var partitionPollTimeout = pollingProps.getPartitionPollTimeout() != null
-        ? Duration.ofMillis(pollingProps.getPartitionPollTimeout())
-        : Duration.ofMillis(pollTimeout.toMillis() / 5);
-
-    int noDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null
-        ? pollingProps.getNoDataEmptyPolls()
-        : DEFAULT_NO_DATA_EMPTY_POLLS;
-
     return new PollingSettings(
         pollTimeout,
-        partitionPollTimeout,
-        noDataEmptyPolls,
         PollingThrottler.throttlerSupplier(cluster)
     );
   }
@@ -45,34 +30,20 @@ public class PollingSettings {
   public static PollingSettings createDefault() {
     return new PollingSettings(
         DEFAULT_POLL_TIMEOUT,
-        DEFAULT_PARTITION_POLL_TIMEOUT,
-        DEFAULT_NO_DATA_EMPTY_POLLS,
         PollingThrottler::noop
     );
   }
 
   private PollingSettings(Duration pollTimeout,
-                          Duration partitionPollTimeout,
-                          int notDataEmptyPolls,
                           Supplier<PollingThrottler> throttlerSupplier) {
     this.pollTimeout = pollTimeout;
-    this.partitionPollTimeout = partitionPollTimeout;
-    this.notDataEmptyPolls = notDataEmptyPolls;
     this.throttlerSupplier = throttlerSupplier;
   }
 
-  public EmptyPollsCounter createEmptyPollsCounter() {
-    return new EmptyPollsCounter(notDataEmptyPolls);
-  }
-
   public Duration getPollTimeout() {
     return pollTimeout;
   }
 
-  public Duration getPartitionPollTimeout() {
-    return partitionPollTimeout;
-  }
-
   public PollingThrottler getPollingThrottler() {
     return throttlerSupplier.get();
   }

+ 98 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java

@@ -0,0 +1,98 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+@Slf4j
+abstract class RangePollingEmitter extends AbstractEmitter {
+
+  private final Supplier<EnhancedConsumer> consumerSupplier;
+  protected final ConsumerPosition consumerPosition;
+  protected final int messagesPerPage;
+
+  protected RangePollingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
+                                ConsumerPosition consumerPosition,
+                                int messagesPerPage,
+                                MessagesProcessing messagesProcessing,
+                                PollingSettings pollingSettings) {
+    super(messagesProcessing, pollingSettings);
+    this.consumerPosition = consumerPosition;
+    this.messagesPerPage = messagesPerPage;
+    this.consumerSupplier = consumerSupplier;
+  }
+
+  protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
+  }
+
+  //should return empty map if polling should be stopped
+  protected abstract TreeMap<TopicPartition, FromToOffset> nextPollingRange(
+      TreeMap<TopicPartition, FromToOffset> prevRange, //empty on start
+      SeekOperations seekOperations
+  );
+
+  @Override
+  public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting polling for {}", consumerPosition);
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
+      sendPhase(sink, "Consumer created");
+      var seekOperations = SeekOperations.create(consumer, consumerPosition);
+      TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
+      log.debug("Starting from offsets {}", pollRange);
+
+      while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
+        var polled = poll(consumer, sink, pollRange);
+        send(sink, polled);
+        pollRange = nextPollingRange(pollRange, seekOperations);
+      }
+      if (sink.isCancelled()) {
+        log.debug("Polling finished due to sink cancellation");
+      }
+      sendFinishStatsAndCompleteSink(sink);
+      log.debug("Polling finished");
+    } catch (InterruptException kafkaInterruptException) {
+      log.debug("Polling finished due to thread interruption");
+      sink.complete();
+    } catch (Exception e) {
+      log.error("Error occurred while consuming records", e);
+      sink.error(e);
+    }
+  }
+
+  private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
+                                                  FluxSink<TopicMessageEventDTO> sink,
+                                                  TreeMap<TopicPartition, FromToOffset> range) {
+    log.trace("Polling range {}", range);
+    sendPhase(sink,
+        "Polling partitions: %s".formatted(range.keySet().stream().map(TopicPartition::partition).sorted().toList()));
+
+    consumer.assign(range.keySet());
+    range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
+
+    List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
+    while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
+      var polledRecords = poll(sink, consumer);
+      range.forEach((tp, fromTo) -> {
+        polledRecords.records(tp).stream()
+            .filter(r -> r.offset() < fromTo.to)
+            .forEach(result::add);
+
+        //next position is out of target range -> pausing partition
+        if (consumer.position(tp) >= fromTo.to) {
+          consumer.pause(List.of(tp));
+        }
+      });
+    }
+    consumer.resume(consumer.paused());
+    return result;
+  }
+}

+ 26 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java

@@ -10,17 +10,18 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 @RequiredArgsConstructor(access = AccessLevel.PACKAGE)
-class SeekOperations {
+public class SeekOperations {
 
   private final Consumer<?, ?> consumer;
   private final OffsetsInfo offsetsInfo;
   private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
 
-  static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
+  public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
     OffsetsInfo offsetsInfo;
     if (consumerPosition.getSeekTo() == null) {
       offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
@@ -34,25 +35,37 @@ class SeekOperations {
     );
   }
 
-  void assignAndSeekNonEmptyPartitions() {
+  public void assignAndSeekNonEmptyPartitions() {
     consumer.assign(offsetsForSeek.keySet());
     offsetsForSeek.forEach(consumer::seek);
   }
 
-  Map<TopicPartition, Long> getBeginOffsets() {
+  public Map<TopicPartition, Long> getBeginOffsets() {
     return offsetsInfo.getBeginOffsets();
   }
 
-  Map<TopicPartition, Long> getEndOffsets() {
+  public Map<TopicPartition, Long> getEndOffsets() {
     return offsetsInfo.getEndOffsets();
   }
 
-  boolean assignedPartitionsFullyPolled() {
+  public boolean assignedPartitionsFullyPolled() {
     return offsetsInfo.assignedPartitionsFullyPolled();
   }
 
+  // sum of (end - start) offsets for all partitions
+  public long summaryOffsetsRange() {
+    return offsetsInfo.summaryOffsetsRange();
+  }
+
+  // sum of differences between initial consumer seek and current consumer position (across all partitions)
+  public long offsetsProcessedFromSeek() {
+    MutableLong count = new MutableLong();
+    offsetsForSeek.forEach((tp, initialOffset) -> count.add(consumer.position(tp) - initialOffset));
+    return count.getValue();
+  }
+
   // Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
-  Map<TopicPartition, Long> getOffsetsForSeek() {
+  public Map<TopicPartition, Long> getOffsetsForSeek() {
     return offsetsForSeek;
   }
 
@@ -61,19 +74,19 @@ class SeekOperations {
    */
   @VisibleForTesting
   static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
-                                                             OffsetsInfo offsetsInfo,
-                                                             SeekTypeDTO seekType,
-                                                             @Nullable Map<TopicPartition, Long> seekTo) {
+                                                     OffsetsInfo offsetsInfo,
+                                                     SeekTypeDTO seekType,
+                                                     @Nullable Map<TopicPartition, Long> seekTo) {
     switch (seekType) {
       case LATEST:
         return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
       case BEGINNING:
         return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
       case OFFSET:
-        Preconditions.checkNotNull(offsetsInfo);
+        Preconditions.checkNotNull(seekTo);
         return fixOffsets(offsetsInfo, seekTo);
       case TIMESTAMP:
-        Preconditions.checkNotNull(offsetsInfo);
+        Preconditions.checkNotNull(seekTo);
         return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
       default:
         throw new IllegalStateException();
@@ -100,7 +113,7 @@ class SeekOperations {
   }
 
   private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
-                                                        Map<TopicPartition, Long> timestamps) {
+                                                               Map<TopicPartition, Long> timestamps) {
     timestamps = new HashMap<>(timestamps);
     timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
 

+ 8 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -1,25 +1,28 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.HashMap;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.errors.InterruptException;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public class TailingEmitter extends AbstractEmitter
-    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
+public class TailingEmitter extends AbstractEmitter {
 
   private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
 
   public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
                         ConsumerPosition consumerPosition,
-                        MessagesProcessing messagesProcessing,
+                        ConsumerRecordDeserializer deserializer,
+                        Predicate<TopicMessageDTO> filter,
                         PollingSettings pollingSettings) {
-    super(messagesProcessing, pollingSettings);
+    super(new MessagesProcessing(deserializer, filter, false, null), pollingSettings);
     this.consumerSupplier = consumerSupplier;
     this.consumerPosition = consumerPosition;
   }
@@ -32,7 +35,7 @@ public class TailingEmitter extends AbstractEmitter
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");
         var polled = poll(sink, consumer);
-        polled.forEach(r -> sendMessage(sink, r));
+        send(sink, polled);
       }
       sink.complete();
       log.debug("Tailing finished");

+ 11 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/ConsumerRecordDeserializer.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.serdes;
 
 import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO.TimestampTypeEnum;
 import com.provectus.kafka.ui.serde.api.Serde;
 import java.time.Instant;
 import java.time.OffsetDateTime;
@@ -8,6 +9,7 @@ import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.UnaryOperator;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -32,6 +34,8 @@ public class ConsumerRecordDeserializer {
   private final Serde.Deserializer fallbackKeyDeserializer;
   private final Serde.Deserializer fallbackValueDeserializer;
 
+  private final UnaryOperator<TopicMessageDTO> masker;
+
   public TopicMessageDTO deserialize(ConsumerRecord<Bytes, Bytes> rec) {
     var message = new TopicMessageDTO();
     fillKey(message, rec);
@@ -47,20 +51,15 @@ public class ConsumerRecordDeserializer {
     message.setValueSize(getValueSize(rec));
     message.setHeadersSize(getHeadersSize(rec));
 
-    return message;
+    return masker.apply(message);
   }
 
-  private static TopicMessageDTO.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
-    switch (timestampType) {
-      case CREATE_TIME:
-        return TopicMessageDTO.TimestampTypeEnum.CREATE_TIME;
-      case LOG_APPEND_TIME:
-        return TopicMessageDTO.TimestampTypeEnum.LOG_APPEND_TIME;
-      case NO_TIMESTAMP_TYPE:
-        return TopicMessageDTO.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
-      default:
-        throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
-    }
+  private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
+    return switch (timestampType) {
+      case CREATE_TIME -> TimestampTypeEnum.CREATE_TIME;
+      case LOG_APPEND_TIME -> TimestampTypeEnum.LOG_APPEND_TIME;
+      case NO_TIMESTAMP_TYPE -> TimestampTypeEnum.NO_TIMESTAMP_TYPE;
+    };
   }
 
   private void fillHeaders(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ConsumerOffsetsSerde.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.serdes.builtin;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java

@@ -102,7 +102,8 @@ public class DeserializationService implements Closeable {
         valueSerde.deserializer(topic, Serde.Target.VALUE),
         fallbackSerde.getName(),
         fallbackSerde.deserializer(topic, Serde.Target.KEY),
-        fallbackSerde.deserializer(topic, Serde.Target.VALUE)
+        fallbackSerde.deserializer(topic, Serde.Target.VALUE),
+        cluster.getMasking().getMaskerForTopic(topic)
     );
   }
 

+ 12 - 45
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -2,10 +2,9 @@ package com.provectus.kafka.ui.service;
 
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
-import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.BackwardEmitter;
+import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilters;
-import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
@@ -18,7 +17,6 @@ import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.time.Instant;
@@ -45,7 +43,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -231,56 +228,26 @@ public class MessagesService {
                                                       @Nullable String keySerde,
                                                       @Nullable String valueSerde) {
 
-    java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
-
-    var processing = new MessagesProcessing(
-        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
-        getMsgFilter(query, filterQueryType),
-        seekDirection == SeekDirectionDTO.TAILING ? null : limit
-    );
-
-    if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
-      emitter = new ForwardRecordEmitter(
+    var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
+    var filter = getMsgFilter(query, filterQueryType);
+    var emitter = switch (seekDirection) {
+      case FORWARD -> new ForwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          processing,
-          cluster.getPollingSettings()
+          consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
       );
-    } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
-      emitter = new BackwardRecordEmitter(
+      case BACKWARD -> new BackwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          limit,
-          processing,
-          cluster.getPollingSettings()
+          consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
       );
-    } else {
-      emitter = new TailingEmitter(
+      case TAILING -> new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          processing,
-          cluster.getPollingSettings()
+          consumerPosition, deserializer, filter, cluster.getPollingSettings()
       );
-    }
+    };
     return Flux.create(emitter)
-        .map(getDataMasker(cluster, topic))
         .map(throttleUiPublish(seekDirection));
   }
 
-  private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
-    var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
-    var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
-    return evt -> {
-      if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
-        return evt;
-      }
-      return evt.message(
-          evt.getMessage()
-              .key(keyMasker.apply(evt.getMessage().getKey()))
-              .content(valMasker.apply(evt.getMessage().getContent())));
-    };
-  }
-
   private Predicate<TopicMessageDTO> getMsgFilter(String query,
                                                   MessageFilterTypeDTO filterQueryType) {
     if (StringUtils.isEmpty(query)) {

+ 5 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/AnalysisTasksStore.java

@@ -92,14 +92,12 @@ class AnalysisTasksStore {
         .result(completedState);
   }
 
-  @Value
   @Builder(toBuilder = true)
-  private static class RunningAnalysis {
-    Instant startedAt;
-    double completenessPercent;
-    long msgsScanned;
-    long bytesScanned;
-    Closeable task;
+  private record RunningAnalysis(Instant startedAt,
+                                 double completenessPercent,
+                                 long msgsScanned,
+                                 long bytesScanned,
+                                 Closeable task) {
 
     TopicAnalysisProgressDTO toDto() {
       return new TopicAnalysisProgressDTO()

+ 29 - 44
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,10 +1,11 @@
 package com.provectus.kafka.ui.service.analyze;
 
-import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
+import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
+
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
-import com.provectus.kafka.ui.emitter.OffsetsInfo;
-import com.provectus.kafka.ui.emitter.PollingSettings;
+import com.provectus.kafka.ui.emitter.SeekOperations;
 import com.provectus.kafka.ui.exception.TopicAnalysisException;
+import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
 import com.provectus.kafka.ui.service.ConsumerGroupService;
@@ -15,16 +16,14 @@ import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 
@@ -33,6 +32,14 @@ import reactor.core.scheduler.Schedulers;
 @RequiredArgsConstructor
 public class TopicAnalysisService {
 
+  private static final Scheduler SCHEDULER = Schedulers.newBoundedElastic(
+      Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
+      Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
+      "topic-analysis-tasks",
+      10, //ttl for idle threads (in sec)
+      true //daemon
+  );
+
   private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore();
 
   private final TopicsService topicsService;
@@ -40,30 +47,18 @@ public class TopicAnalysisService {
 
   public Mono<Void> analyze(KafkaCluster cluster, String topicName) {
     return topicsService.getTopicDetails(cluster, topicName)
-        .doOnNext(topic ->
-            startAnalysis(
-                cluster,
-                topicName,
-                topic.getPartitionCount(),
-                topic.getPartitions().values()
-                    .stream()
-                    .mapToLong(p -> p.getOffsetMax() - p.getOffsetMin())
-                    .sum()
-            )
-        ).then();
+        .doOnNext(topic -> startAnalysis(cluster, topicName))
+        .then();
   }
 
-  private synchronized void startAnalysis(KafkaCluster cluster,
-                                          String topic,
-                                          int partitionsCnt,
-                                          long approxNumberOfMsgs) {
+  private synchronized void startAnalysis(KafkaCluster cluster, String topic) {
     var topicId = new TopicIdentity(cluster, topic);
     if (analysisTasksStore.isAnalysisInProgress(topicId)) {
       throw new TopicAnalysisException("Topic is already analyzing");
     }
-    var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getPollingSettings());
+    var task = new AnalysisTask(cluster, topicId);
     analysisTasksStore.registerNewTask(topicId, task);
-    Schedulers.boundedElastic().schedule(task);
+    SCHEDULER.schedule(task);
   }
 
   public void cancelAnalysis(KafkaCluster cluster, String topicName) {
@@ -79,20 +74,14 @@ public class TopicAnalysisService {
     private final Instant startedAt = Instant.now();
 
     private final TopicIdentity topicId;
-    private final int partitionsCnt;
-    private final long approxNumberOfMsgs;
-    private final EmptyPollsCounter emptyPollsCounter;
 
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
     private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
 
     private final EnhancedConsumer consumer;
 
-    AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
-                 long approxNumberOfMsgs, PollingSettings pollingSettings) {
+    AnalysisTask(KafkaCluster cluster, TopicIdentity topicId) {
       this.topicId = topicId;
-      this.approxNumberOfMsgs = approxNumberOfMsgs;
-      this.partitionsCnt = partitionsCnt;
       this.consumer = consumerGroupService.createConsumer(
           cluster,
           // to improve polling throughput
@@ -101,7 +90,6 @@ public class TopicAnalysisService {
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
           )
       );
-      this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
     }
 
     @Override
@@ -113,23 +101,20 @@ public class TopicAnalysisService {
     public void run() {
       try {
         log.info("Starting {} topic analysis", topicId);
-        var topicPartitions = IntStream.range(0, partitionsCnt)
-            .peek(i -> partitionStats.put(i, new TopicAnalysisStats()))
-            .mapToObj(i -> new TopicPartition(topicId.topicName, i))
-            .collect(Collectors.toList());
+        consumer.partitionsFor(topicId.topicName)
+            .forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
 
-        consumer.assign(topicPartitions);
-        consumer.seekToBeginning(topicPartitions);
+        var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
+        long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
+        seekOperations.assignAndSeekNonEmptyPartitions();
 
-        var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
-        while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
+        while (!seekOperations.assignedPartitionsFullyPolled()) {
           var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
-          emptyPollsCounter.count(polled.count());
           polled.forEach(r -> {
             totalStats.apply(r);
             partitionStats.get(r.partition()).apply(r);
           });
-          updateProgress();
+          updateProgress(seekOperations.offsetsProcessedFromSeek(), summaryOffsetsRange);
         }
         analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats);
         log.info("{} topic analysis finished", topicId);
@@ -145,13 +130,13 @@ public class TopicAnalysisService {
       }
     }
 
-    private void updateProgress() {
-      if (totalStats.totalMsgs > 0 && approxNumberOfMsgs != 0) {
+    private void updateProgress(long processedOffsets, long summaryOffsetsRange) {
+      if (processedOffsets > 0 && summaryOffsetsRange != 0) {
         analysisTasksStore.updateProgress(
             topicId,
             totalStats.totalMsgs,
             totalStats.keysSize.sum + totalStats.valuesSize.sum,
-            Math.min(100.0, (((double) totalStats.totalMsgs) / approxNumberOfMsgs) * 100)
+            Math.min(100.0, (((double) processedOffsets) / summaryOffsetsRange) * 100)
         );
       }
     }

+ 14 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/masking/DataMasking.java

@@ -1,7 +1,5 @@
 package com.provectus.kafka.ui.service.masking;
 
-import static java.util.stream.Collectors.toList;
-
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.json.JsonMapper;
@@ -9,6 +7,7 @@ import com.fasterxml.jackson.databind.node.ContainerNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy;
 import java.util.List;
@@ -54,7 +53,8 @@ public class DataMasking {
               Optional.ofNullable(property.getTopicValuesPattern()).map(Pattern::compile).orElse(null),
               MaskingPolicy.create(property)
           );
-        }).collect(toList()));
+        }).toList()
+    );
   }
 
   @VisibleForTesting
@@ -62,8 +62,17 @@ public class DataMasking {
     this.masks = masks;
   }
 
-  public UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
-    var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).collect(toList());
+  public UnaryOperator<TopicMessageDTO> getMaskerForTopic(String topic) {
+    var keyMasker = getMaskingFunction(topic, Serde.Target.KEY);
+    var valMasker = getMaskingFunction(topic, Serde.Target.VALUE);
+    return msg -> msg
+        .key(keyMasker.apply(msg.getKey()))
+        .content(valMasker.apply(msg.getContent()));
+  }
+
+  @VisibleForTesting
+  UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
+    var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).toList();
     if (targetMasks.isEmpty()) {
       return UnaryOperator.identity();
     }

+ 69 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessagesProcessingTest.java

@@ -0,0 +1,69 @@
+package com.provectus.kafka.ui.emitter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.RepeatedTest;
+
+class MessagesProcessingTest {
+
+
+  @RepeatedTest(5)
+  void testSortingAsc() {
+    var messagesInOrder = List.of(
+        consumerRecord(1, 100L, "1999-01-01T00:00:00+00:00"),
+        consumerRecord(0, 0L, "2000-01-01T00:00:00+00:00"),
+        consumerRecord(1, 200L, "2000-01-05T00:00:00+00:00"),
+        consumerRecord(0, 10L, "2000-01-10T00:00:00+00:00"),
+        consumerRecord(0, 20L, "2000-01-20T00:00:00+00:00"),
+        consumerRecord(1, 300L, "3000-01-01T00:00:00+00:00"),
+        consumerRecord(2, 1000L, "4000-01-01T00:00:00+00:00"),
+        consumerRecord(2, 1001L, "2000-01-01T00:00:00+00:00"),
+        consumerRecord(2, 1003L, "3000-01-01T00:00:00+00:00")
+    );
+
+    var shuffled = new ArrayList<>(messagesInOrder);
+    Collections.shuffle(shuffled);
+
+    var sortedList = MessagesProcessing.sortForSending(shuffled, true);
+    assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
+  }
+
+  @RepeatedTest(5)
+  void testSortingDesc() {
+    var messagesInOrder = List.of(
+        consumerRecord(1, 300L, "3000-01-01T00:00:00+00:00"),
+        consumerRecord(2, 1003L, "3000-01-01T00:00:00+00:00"),
+        consumerRecord(0, 20L, "2000-01-20T00:00:00+00:00"),
+        consumerRecord(0, 10L, "2000-01-10T00:00:00+00:00"),
+        consumerRecord(1, 200L, "2000-01-05T00:00:00+00:00"),
+        consumerRecord(0, 0L, "2000-01-01T00:00:00+00:00"),
+        consumerRecord(2, 1001L, "2000-01-01T00:00:00+00:00"),
+        consumerRecord(2, 1000L, "4000-01-01T00:00:00+00:00"),
+        consumerRecord(1, 100L, "1999-01-01T00:00:00+00:00")
+    );
+
+    var shuffled = new ArrayList<>(messagesInOrder);
+    Collections.shuffle(shuffled);
+
+    var sortedList = MessagesProcessing.sortForSending(shuffled, false);
+    assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
+  }
+
+  private ConsumerRecord<Bytes, Bytes> consumerRecord(int partition, long offset, String ts) {
+    return new ConsumerRecord<>(
+        "topic", partition, offset, OffsetDateTime.parse(ts).toInstant().toEpochMilli(),
+        TimestampType.CREATE_TIME,
+        0, 0, null, null, new RecordHeaders(), Optional.empty()
+    );
+  }
+
+}

+ 30 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/ConsumerRecordDeserializerTest.java

@@ -0,0 +1,30 @@
+package com.provectus.kafka.ui.serdes;
+
+import static com.provectus.kafka.ui.serde.api.DeserializeResult.Type.STRING;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.Serde;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.Test;
+
+class ConsumerRecordDeserializerTest {
+
+  @Test
+  void dataMaskingAppliedOnDeserializedMessage() {
+    UnaryOperator<TopicMessageDTO> maskerMock = mock();
+    Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());
+
+    var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
+    recordDeser.deserialize(new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes())));
+
+    verify(maskerMock).apply(any(TopicMessageDTO.class));
+  }
+
+}

+ 42 - 30
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -7,13 +7,13 @@ import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
-import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.BackwardEmitter;
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
-import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
-import com.provectus.kafka.ui.emitter.MessagesProcessing;
+import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.serde.api.Serde;
@@ -31,16 +31,15 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -58,6 +57,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static final String EMPTY_TOPIC = TOPIC + "_empty";
   static final List<Record> SENT_RECORDS = new ArrayList<>();
   static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
+  static final Predicate<TopicMessageDTO> NOOP_FILTER = m -> true;
 
   @BeforeAll
   static void generateMsgs() throws Exception {
@@ -93,6 +93,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static void cleanup() {
     deleteTopic(TOPIC);
     deleteTopic(EMPTY_TOPIC);
+    SENT_RECORDS.clear();
   }
 
   private static ConsumerRecordDeserializer createRecordsDeserializer() {
@@ -105,28 +106,28 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         s.deserializer(null, Serde.Target.VALUE),
         StringSerde.name(),
         s.deserializer(null, Serde.Target.KEY),
-        s.deserializer(null, Serde.Target.VALUE)
+        s.deserializer(null, Serde.Target.VALUE),
+        msg -> msg
     );
   }
 
-  private MessagesProcessing createMessagesProcessing() {
-    return new MessagesProcessing(RECORD_DESERIALIZER, msg -> true, null);
-  }
-
   @Test
   void pollNothingOnEmptyTopic() {
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
-        createMessagesProcessing(),
+        100,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
         100,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -145,18 +146,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
   @Test
   void pollFullTopicFromBeginning() {
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, TOPIC, null),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(LATEST, TOPIC, null),
         PARTITIONS * MSGS_PER_PARTITION,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -174,18 +178,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       targetOffsets.put(new TopicPartition(TOPIC, i), offset);
     }
 
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         PARTITIONS * MSGS_PER_PARTITION,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -219,18 +226,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       );
     }
 
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
         PARTITIONS * MSGS_PER_PARTITION,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -257,11 +267,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION);
     }
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         numMessages,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -283,11 +294,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       offsets.put(new TopicPartition(TOPIC, i), 0L);
     }
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, offsets),
         100,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 

+ 0 - 4
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -3868,10 +3868,6 @@ components:
                   properties:
                     pollTimeoutMs:
                       type: integer
-                    partitionPollTimeout:
-                      type: integer
-                    noDataEmptyPolls:
-                      type: integer
                     maxPageSize:
                       type: integer
                     defaultPageSize: