iliax 1 year ago
parent
commit
fa316407b2

+ 2 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -8,7 +8,7 @@ import reactor.core.publisher.FluxSink;
 
 public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final MessagesProcessing messagesProcessing;
+  protected final MessagesProcessing messagesProcessing;
   protected final PollingSettings pollingSettings;
 
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
@@ -16,8 +16,7 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
     this.pollingSettings = pollingSettings;
   }
 
-  protected PolledRecords poll(
-      FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
+  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
     return poll(sink, consumer, pollingSettings.getPollTimeout());
   }
 
@@ -31,11 +30,6 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
     return messagesProcessing.limitReached();
   }
 
-  protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
-                             ConsumerRecord<Bytes, Bytes> msg) {
-    messagesProcessing.sendMsg(sink, msg);
-  }
-
   protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
     messagesProcessing.sendPhase(sink, name);
   }

+ 111 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractPartitionsEmitter.java

@@ -0,0 +1,111 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+@Slf4j
+public abstract class AbstractPartitionsEmitter extends AbstractEmitter {
+
+  private final Supplier<EnhancedConsumer> consumerSupplier;
+  protected final ConsumerPosition consumerPosition;
+  protected final int messagesPerPage;
+
+  public AbstractPartitionsEmitter(Supplier<EnhancedConsumer> consumerSupplier,
+                                   ConsumerPosition consumerPosition,
+                                   int messagesPerPage,
+                                   MessagesProcessing messagesProcessing,
+                                   PollingSettings pollingSettings) {
+    super(messagesProcessing, pollingSettings);
+    this.consumerPosition = consumerPosition;
+    this.messagesPerPage = messagesPerPage;
+    this.consumerSupplier = consumerSupplier;
+  }
+
+  // from inclusive, to exclusive
+  protected record FromToOffset(long from, long to) {
+  }
+
+  //should return empty map if polling should be stopped
+  protected abstract TreeMap<TopicPartition, FromToOffset> nexPollingRange(
+      EnhancedConsumer consumer,
+      TreeMap<TopicPartition, FromToOffset> prevRange, //empty on start
+      SeekOperations seekOperations
+  );
+
+  protected abstract Comparator<TopicMessageDTO> sortBeforeSend();
+
+  private void logReadRange(TreeMap<TopicPartition, FromToOffset> range) {
+    log.debug("Polling offsets range {}", range);
+  }
+
+  @Override
+  public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting polling for {}", consumerPosition);
+    try (EnhancedConsumer consumer = consumerSupplier.get()) {
+      sendPhase(sink, "Consumer created");
+
+      var seekOperations = SeekOperations.create(consumer, consumerPosition);
+      TreeMap<TopicPartition, FromToOffset> readRange = nexPollingRange(consumer, new TreeMap<>(), seekOperations);
+      logReadRange(readRange);
+      while (!sink.isCancelled() && !readRange.isEmpty() && !sendLimitReached()) {
+        readRange.forEach((tp, fromTo) -> {
+          if (sink.isCancelled()) {
+            return; //fast return in case of sink cancellation
+          }
+          partitionPollIteration(tp, fromTo.from, fromTo.to, consumer, sink)
+              .forEach(messagesProcessing::buffer);
+        });
+        messagesProcessing.flush(sink, sortBeforeSend());
+        readRange = nexPollingRange(consumer, readRange, seekOperations);
+      }
+      if (sink.isCancelled()) {
+        log.debug("Polling finished due to sink cancellation");
+      }
+      sendFinishStatsAndCompleteSink(sink);
+      log.debug("Polling finished");
+    } catch (InterruptException kafkaInterruptException) {
+      log.debug("Polling finished due to thread interruption");
+      sink.complete();
+    } catch (Exception e) {
+      log.error("Error occurred while consuming records", e);
+      sink.error(e);
+    }
+  }
+
+  private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(TopicPartition tp,
+                                                                    long fromOffset, //inclusive
+                                                                    long toOffset, //exclusive
+                                                                    EnhancedConsumer consumer,
+                                                                    FluxSink<TopicMessageEventDTO> sink) {
+    consumer.assign(List.of(tp));
+    consumer.seek(tp, fromOffset);
+    sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
+
+    var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
+    while (!sink.isCancelled()
+        && !sendLimitReached()
+        && consumer.position(tp) < toOffset) {
+
+      var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
+      log.debug("{} records polled from {}", polledRecords.count(), tp);
+
+      polledRecords.records(tp).stream()
+          .filter(r -> r.offset() < toOffset)
+          .forEach(recordsToSend::add);
+    }
+    return recordsToSend;
+  }
+}

+ 54 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardPartitionsEmitterImpl.java

@@ -0,0 +1,54 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
+
+public class BackwardPartitionsEmitterImpl extends AbstractPartitionsEmitter {
+
+
+  public BackwardPartitionsEmitterImpl(Supplier<EnhancedConsumer> consumerSupplier,
+                                       ConsumerPosition consumerPosition,
+                                       int messagesPerPage,
+                                       MessagesProcessing messagesProcessing,
+                                       PollingSettings pollingSettings) {
+    super(consumerSupplier, consumerPosition, messagesPerPage, messagesProcessing, pollingSettings);
+  }
+
+  @Override
+  protected Comparator<TopicMessageDTO> sortBeforeSend() {
+    return (m1, m2) -> 0;
+  }
+
+  @Override
+  protected TreeMap<TopicPartition, FromToOffset> nexPollingRange(EnhancedConsumer consumer,
+                                                                  TreeMap<TopicPartition, FromToOffset> prevRange,
+                                                                  SeekOperations seekOperations) {
+
+    TreeMap<TopicPartition, Long> readToOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
+    if (prevRange.isEmpty()) {
+      readToOffsets.putAll(seekOperations.getOffsetsForSeek());
+    } else {
+      readToOffsets.putAll(
+          prevRange.entrySet()
+              .stream()
+              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().from()))
+      );
+    }
+
+    int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readToOffsets.size());
+    TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
+    readToOffsets.forEach((tp, toOffset) -> {
+      long tpStartOffset = seekOperations.getOffsetsForSeek().get(tp);
+      if (toOffset > tpStartOffset) {
+        result.put(tp, new FromToOffset(toOffset, Math.min(tpStartOffset, toOffset - msgsToPollPerPartition)));
+      }
+    });
+    return result;
+  }
+}

+ 48 - 48
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -22,13 +22,13 @@ public class BackwardRecordEmitter extends AbstractEmitter {
   private final ConsumerPosition consumerPosition;
   private final int messagesPerPage;
 
-  private final TimestampsSortedMessageProcessing messagesProcessing;
+  private final MessagesProcessing messagesProcessing;
 
   public BackwardRecordEmitter(
       Supplier<EnhancedConsumer> consumerSupplier,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
-      TimestampsSortedMessageProcessing messagesProcessing,
+      MessagesProcessing messagesProcessing,
       PollingSettings pollingSettings) {
     super(messagesProcessing, pollingSettings);
     this.messagesProcessing = messagesProcessing;
@@ -39,52 +39,52 @@ public class BackwardRecordEmitter extends AbstractEmitter {
 
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
-    log.debug("Starting backward polling for {}", consumerPosition);
-    try (EnhancedConsumer consumer = consumerSupplier.get()) {
-      sendPhase(sink, "Created consumer");
-
-      var seekOperations = SeekOperations.create(consumer, consumerPosition);
-      var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
-      readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
-
-      int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
-      log.debug("'Until' offsets for polling: {}", readUntilOffsets);
-
-      while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
-        new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
-          if (sink.isCancelled()) {
-            return; //fast return in case of sink cancellation
-          }
-          long beginOffset = seekOperations.getBeginOffsets().get(tp);
-          long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
-
-          partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
-              .forEach(r -> sendMessage(sink, r));
-
-          if (beginOffset == readFromOffset) {
-            // we fully read this partition -> removing it from polling iterations
-            readUntilOffsets.remove(tp);
-          } else {
-            // updating 'to' offset for next polling iteration
-            readUntilOffsets.put(tp, readFromOffset);
-          }
-        });
-        if (readUntilOffsets.isEmpty()) {
-          log.debug("begin reached after partitions poll iteration");
-        } else if (sink.isCancelled()) {
-          log.debug("sink is cancelled after partitions poll iteration");
-        }
-        messagesProcessing.flush(sink);
-      }
-      sendFinishStatsAndCompleteSink(sink);
-      log.debug("Polling finished");
-    } catch (InterruptException kafkaInterruptException) {
-      log.debug("Polling finished due to thread interruption");
-      sink.complete();
-    } catch (Exception e) {
-      log.error("Error occurred while consuming records", e);
-      sink.error(e);
-    }
+//    log.debug("Starting backward polling for {}", consumerPosition);
+//    try (EnhancedConsumer consumer = consumerSupplier.get()) {
+//      sendPhase(sink, "Created consumer");
+//
+//      var seekOperations = SeekOperations.create(consumer, consumerPosition);
+//      var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
+//      readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
+//
+//      int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
+//      log.debug("'Until' offsets for polling: {}", readUntilOffsets);
+//
+//      while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
+//        new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
+//          if (sink.isCancelled()) {
+//            return; //fast return in case of sink cancellation
+//          }
+//          long beginOffset = seekOperations.getBeginOffsets().get(tp);
+//          long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
+//
+//          partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
+//              .forEach(r -> sendMessage(sink, r));
+//
+//          if (beginOffset == readFromOffset) {
+//            // we fully read this partition -> removing it from polling iterations
+//            readUntilOffsets.remove(tp);
+//          } else {
+//            // updating 'to' offset for next polling iteration
+//            readUntilOffsets.put(tp, readFromOffset);
+//          }
+//        });
+//        if (readUntilOffsets.isEmpty()) {
+//          log.debug("begin reached after partitions poll iteration");
+//        } else if (sink.isCancelled()) {
+//          log.debug("sink is cancelled after partitions poll iteration");
+//        }
+//        messagesProcessing.flush(sink);
+//      }
+//      sendFinishStatsAndCompleteSink(sink);
+//      log.debug("Polling finished");
+//    } catch (InterruptException kafkaInterruptException) {
+//      log.debug("Polling finished due to thread interruption");
+//      sink.complete();
+//    } catch (Exception e) {
+//      log.error("Error occurred while consuming records", e);
+//      sink.error(e);
+//    }
   }
 
   private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(

+ 38 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BufferingMessagesProcessing.java

@@ -0,0 +1,38 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+
+public class BufferingMessagesProcessing extends MessagesProcessing {
+
+  private final List<TopicMessageDTO> buffer = new ArrayList<>();
+
+  private final Comparator<TopicMessageDTO> comparator;
+
+  public BufferingMessagesProcessing(ConsumerRecordDeserializer deserializer,
+                                     Predicate<TopicMessageDTO> filter,
+                                     Comparator<TopicMessageDTO> comparator,
+                                     @Nullable Integer limit) {
+    super(deserializer, filter, limit);
+    this.comparator = comparator;
+  }
+
+  void buffer(ConsumerRecord<Bytes, Bytes> rec) {
+    //TODO
+  }
+
+  void flush(FluxSink<TopicMessageEventDTO> sink) {
+    //TODO
+  }
+
+}

+ 58 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardPartitionsEmitterImpl.java

@@ -0,0 +1,58 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.Lombok;
+import org.apache.kafka.common.TopicPartition;
+
+public class ForwardPartitionsEmitterImpl extends AbstractPartitionsEmitter {
+
+  public ForwardPartitionsEmitterImpl(
+      Supplier<EnhancedConsumer> consumerSupplier,
+      ConsumerPosition consumerPosition,
+      int messagesPerPage,
+      MessagesProcessing messagesProcessing,
+      PollingSettings pollingSettings
+  ) {
+    super(consumerSupplier, consumerPosition, messagesPerPage, messagesProcessing, pollingSettings);
+  }
+
+  @Override
+  protected Comparator<TopicMessageDTO> sortBeforeSend() {
+    return (m1, m2) -> 0;
+  }
+
+  @Override
+  protected TreeMap<TopicPartition, FromToOffset> nexPollingRange(EnhancedConsumer consumer,
+                                                                  TreeMap<TopicPartition, FromToOffset> prevRange,
+                                                                  SeekOperations seekOperations) {
+    TreeMap<TopicPartition, Long> readFromOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
+    if (prevRange.isEmpty()) {
+      readFromOffsets.putAll(seekOperations.getOffsetsForSeek());
+    } else {
+      readFromOffsets.putAll(
+          prevRange.entrySet()
+              .stream()
+              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().to()))
+      );
+    }
+
+    int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readFromOffsets.size());
+    TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
+    readFromOffsets.forEach((tp, fromOffset) -> {
+      long tpEndOffset = seekOperations.getEndOffsets().get(tp);
+      if (fromOffset < tpEndOffset) {
+        result.put(tp, new FromToOffset(fromOffset, Math.min(tpEndOffset, fromOffset + msgsToPollPerPartition)));
+      }
+    });
+    return result;
+  }
+
+}

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -52,9 +52,9 @@ public class ForwardRecordEmitter extends AbstractEmitter {
 
         log.debug("{} records polled", records.count());
 
-        for (ConsumerRecord<Bytes, Bytes> msg : records) {
-          sendMessage(sink, msg);
-        }
+//        for (ConsumerRecord<Bytes, Bytes> msg : records) {
+//          sendMessage(sink, msg);
+//        }
       }
       sendFinishStatsAndCompleteSink(sink);
       log.debug("Polling finished");

+ 32 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -4,6 +4,9 @@ import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
 import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
@@ -14,13 +17,14 @@ import reactor.core.publisher.FluxSink;
 @Slf4j
 public class MessagesProcessing {
 
-  protected final ConsumingStats consumingStats = new ConsumingStats();
-  protected long sentMessages = 0;
-  protected int filterApplyErrors = 0;
+  private final ConsumingStats consumingStats = new ConsumingStats();
+  private final List<TopicMessageDTO> buffer = new ArrayList<>();
 
-  protected final ConsumerRecordDeserializer deserializer;
-  protected final Predicate<TopicMessageDTO> filter;
-  protected final @Nullable Integer limit;
+  private long sentMessages = 0;
+  private int filterApplyErrors = 0;
+  private final ConsumerRecordDeserializer deserializer;
+  private final Predicate<TopicMessageDTO> filter;
+  private final @Nullable Integer limit;
 
   public MessagesProcessing(ConsumerRecordDeserializer deserializer,
                             Predicate<TopicMessageDTO> filter,
@@ -34,16 +38,12 @@ public class MessagesProcessing {
     return limit != null && sentMessages >= limit;
   }
 
-  void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
-    if (!sink.isCancelled() && !limitReached()) {
+  void buffer(ConsumerRecord<Bytes, Bytes> rec) {
+    if (!limitReached()) {
       TopicMessageDTO topicMessage = deserializer.deserialize(rec);
       try {
         if (filter.test(topicMessage)) {
-          sink.next(
-              new TopicMessageEventDTO()
-                  .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
-                  .message(topicMessage)
-          );
+          buffer.add(topicMessage);
           sentMessages++;
         }
       } catch (Exception e) {
@@ -53,6 +53,25 @@ public class MessagesProcessing {
     }
   }
 
+  void flush(FluxSink<TopicMessageEventDTO> sink, Comparator<TopicMessageDTO> sortBeforeSend) {
+    while (!sink.isCancelled()) {
+      buffer.sort(sortBeforeSend);
+      for (TopicMessageDTO topicMessage : buffer) {
+        sink.next(
+            new TopicMessageEventDTO()
+                .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
+                .message(topicMessage)
+        );
+      }
+    }
+    buffer.clear();
+  }
+
+  void sendWithoutBuffer(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
+    buffer(rec);
+    flush(sink, (m1, m2) -> 0);
+  }
+
   void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
     if (!sink.isCancelled()) {
       consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -31,7 +31,7 @@ public class TailingEmitter extends AbstractEmitter {
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");
         var polled = poll(sink, consumer);
-        polled.forEach(r -> sendMessage(sink, r));
+        polled.forEach(r -> messagesProcessing.sendWithoutBuffer(sink, r));
       }
       sink.complete();
       log.debug("Tailing finished");

+ 0 - 71
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TimestampsSortedMessageProcessing.java

@@ -1,71 +0,0 @@
-package com.provectus.kafka.ui.emitter;
-
-import com.provectus.kafka.ui.model.TopicMessageDTO;
-import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.utils.Bytes;
-import org.jetbrains.annotations.Nullable;
-import reactor.core.publisher.FluxSink;
-
-@Slf4j
-public class TimestampsSortedMessageProcessing extends MessagesProcessing {
-
-  private final List<TopicMessageDTO> buffer = new ArrayList<>();
-
-  public TimestampsSortedMessageProcessing(ConsumerRecordDeserializer deserializer,
-                                           Predicate<TopicMessageDTO> filter,
-                                           @Nullable Integer limit) {
-    super(deserializer, filter, limit);
-  }
-
-  @Override
-  void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
-    if (!sink.isCancelled() && !limitReached()) {
-      TopicMessageDTO topicMessage = deserializer.deserialize(rec);
-      try {
-        if (filter.test(topicMessage)) {
-          buffer.add(topicMessage);
-          sentMessages++;
-        }
-      } catch (Exception e) {
-        filterApplyErrors++;
-        log.trace("Error applying filter for message {}", topicMessage);
-      }
-    }
-  }
-
-  @Override
-  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
-    flush(sink);
-    super.sendFinishEvent(sink);
-  }
-
-  void flush(FluxSink<TopicMessageEventDTO> sink) {
-    sorted(buffer)
-        .forEach(topicMessage ->
-            sink.next(
-                new TopicMessageEventDTO()
-                    .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
-                    .message(topicMessage)));
-    buffer.clear();
-  }
-
-  static Stream<TopicMessageDTO> sorted(List<TopicMessageDTO> messages) {
-    return messages.stream()
-        .sorted(Comparator.comparingLong(TopicMessageDTO::getOffset).reversed())
-        .sorted(Comparator.comparingInt(TopicMessageDTO::getPartition))
-        .sorted((m1, m2) -> {
-          if (m1.getPartition().equals(m2.getPartition())) {
-            return 0; //sorting is stable, so it will just keep messages in same order
-          }
-          return -m1.getTimestamp().compareTo(m2.getTimestamp());
-        });
-  }
-}

+ 6 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -3,12 +3,11 @@ package com.provectus.kafka.ui.service;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.emitter.AbstractEmitter;
-import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
-import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.BackwardPartitionsEmitterImpl;
+import com.provectus.kafka.ui.emitter.ForwardPartitionsEmitterImpl;
 import com.provectus.kafka.ui.emitter.MessageFilters;
 import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
-import com.provectus.kafka.ui.emitter.TimestampsSortedMessageProcessing;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
@@ -47,7 +46,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -236,17 +234,18 @@ public class MessagesService {
     var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
     var filter = getMsgFilter(query, filterQueryType);
     AbstractEmitter emitter = switch (seekDirection) {
-      case FORWARD -> new ForwardRecordEmitter(
+      case FORWARD -> new ForwardPartitionsEmitterImpl(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
+          limit,
           new MessagesProcessing(deserializer, filter, limit),
           cluster.getPollingSettings()
       );
-      case BACKWARD -> new BackwardRecordEmitter(
+      case BACKWARD -> new BackwardPartitionsEmitterImpl(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           limit,
-          new TimestampsSortedMessageProcessing(deserializer, filter, limit),
+          new MessagesProcessing(deserializer, filter, limit),
           cluster.getPollingSettings()
       );
       case TAILING -> new TailingEmitter(