iliax před 1 rokem
rodič
revize
f985f0e360

+ 21 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,15 +1,17 @@
 package com.provectus.kafka.ui.emitter;
 
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.time.Duration;
+import java.util.Comparator;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  protected final MessagesProcessing messagesProcessing;
-  protected final PollingSettings pollingSettings;
+  private final MessagesProcessing messagesProcessing;
+  private final PollingSettings pollingSettings;
 
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
     this.messagesProcessing = messagesProcessing;
@@ -20,12 +22,28 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
     return poll(sink, consumer, pollingSettings.getPollTimeout());
   }
 
-  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
+  protected PolledRecords pollSinglePartition(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
+    return poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
+  }
+
+  private PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
     var records = consumer.pollEnhanced(timeout);
     sendConsuming(sink, records);
     return records;
   }
 
+  protected void buffer(ConsumerRecord<Bytes, Bytes> rec) {
+    messagesProcessing.buffer(rec);
+  }
+
+  protected void flushBuffer(FluxSink<TopicMessageEventDTO> sink) {
+    messagesProcessing.flush(sink);
+  }
+
+  protected void sendWithoutBuffer(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
+    messagesProcessing.sendWithoutBuffer(sink, rec);
+  }
+
   protected boolean sendLimitReached() {
     return messagesProcessing.limitReached();
   }

+ 26 - 20
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardPartitionsEmitterImpl.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java

@@ -2,34 +2,40 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.kafka.common.TopicPartition;
 
-public class BackwardPartitionsEmitterImpl extends AbstractPartitionsEmitter {
-
-
-  public BackwardPartitionsEmitterImpl(Supplier<EnhancedConsumer> consumerSupplier,
-                                       ConsumerPosition consumerPosition,
-                                       int messagesPerPage,
-                                       MessagesProcessing messagesProcessing,
-                                       PollingSettings pollingSettings) {
-    super(consumerSupplier, consumerPosition, messagesPerPage, messagesProcessing, pollingSettings);
-  }
-
-  @Override
-  protected Comparator<TopicMessageDTO> sortBeforeSend() {
-    return (m1, m2) -> 0;
+public class BackwardEmitter extends PerPartitionEmitter {
+
+  public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
+                         ConsumerPosition consumerPosition,
+                         int messagesPerPage,
+                         ConsumerRecordDeserializer deserializer,
+                         Predicate<TopicMessageDTO> filter,
+                         PollingSettings pollingSettings) {
+    super(
+        consumerSupplier,
+        consumerPosition,
+        messagesPerPage,
+        new MessagesProcessing(
+            deserializer,
+            filter,
+            false,
+            messagesPerPage
+        ),
+        pollingSettings
+    );
   }
 
   @Override
-  protected TreeMap<TopicPartition, FromToOffset> nexPollingRange(EnhancedConsumer consumer,
-                                                                  TreeMap<TopicPartition, FromToOffset> prevRange,
-                                                                  SeekOperations seekOperations) {
-
+  protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
+                                                                   SeekOperations seekOperations) {
     TreeMap<TopicPartition, Long> readToOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
     if (prevRange.isEmpty()) {
       readToOffsets.putAll(seekOperations.getOffsetsForSeek());
@@ -44,9 +50,9 @@ public class BackwardPartitionsEmitterImpl extends AbstractPartitionsEmitter {
     int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readToOffsets.size());
     TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
     readToOffsets.forEach((tp, toOffset) -> {
-      long tpStartOffset = seekOperations.getOffsetsForSeek().get(tp);
+      long tpStartOffset = seekOperations.getBeginOffsets().get(tp);
       if (toOffset > tpStartOffset) {
-        result.put(tp, new FromToOffset(toOffset, Math.min(tpStartOffset, toOffset - msgsToPollPerPartition)));
+        result.put(tp, new FromToOffset(Math.max(tpStartOffset, toOffset - msgsToPollPerPartition), toOffset));
       }
     });
     return result;

+ 0 - 118
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -1,118 +0,0 @@
-package com.provectus.kafka.ui.emitter;
-
-import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TreeMap;
-import java.util.function.Supplier;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.utils.Bytes;
-import reactor.core.publisher.FluxSink;
-
-@Slf4j
-public class BackwardRecordEmitter extends AbstractEmitter {
-
-  private final Supplier<EnhancedConsumer> consumerSupplier;
-  private final ConsumerPosition consumerPosition;
-  private final int messagesPerPage;
-
-  private final MessagesProcessing messagesProcessing;
-
-  public BackwardRecordEmitter(
-      Supplier<EnhancedConsumer> consumerSupplier,
-      ConsumerPosition consumerPosition,
-      int messagesPerPage,
-      MessagesProcessing messagesProcessing,
-      PollingSettings pollingSettings) {
-    super(messagesProcessing, pollingSettings);
-    this.messagesProcessing = messagesProcessing;
-    this.consumerPosition = consumerPosition;
-    this.messagesPerPage = messagesPerPage;
-    this.consumerSupplier = consumerSupplier;
-  }
-
-  @Override
-  public void accept(FluxSink<TopicMessageEventDTO> sink) {
-//    log.debug("Starting backward polling for {}", consumerPosition);
-//    try (EnhancedConsumer consumer = consumerSupplier.get()) {
-//      sendPhase(sink, "Created consumer");
-//
-//      var seekOperations = SeekOperations.create(consumer, consumerPosition);
-//      var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
-//      readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
-//
-//      int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
-//      log.debug("'Until' offsets for polling: {}", readUntilOffsets);
-//
-//      while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
-//        new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
-//          if (sink.isCancelled()) {
-//            return; //fast return in case of sink cancellation
-//          }
-//          long beginOffset = seekOperations.getBeginOffsets().get(tp);
-//          long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
-//
-//          partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
-//              .forEach(r -> sendMessage(sink, r));
-//
-//          if (beginOffset == readFromOffset) {
-//            // we fully read this partition -> removing it from polling iterations
-//            readUntilOffsets.remove(tp);
-//          } else {
-//            // updating 'to' offset for next polling iteration
-//            readUntilOffsets.put(tp, readFromOffset);
-//          }
-//        });
-//        if (readUntilOffsets.isEmpty()) {
-//          log.debug("begin reached after partitions poll iteration");
-//        } else if (sink.isCancelled()) {
-//          log.debug("sink is cancelled after partitions poll iteration");
-//        }
-//        messagesProcessing.flush(sink);
-//      }
-//      sendFinishStatsAndCompleteSink(sink);
-//      log.debug("Polling finished");
-//    } catch (InterruptException kafkaInterruptException) {
-//      log.debug("Polling finished due to thread interruption");
-//      sink.complete();
-//    } catch (Exception e) {
-//      log.error("Error occurred while consuming records", e);
-//      sink.error(e);
-//    }
-  }
-
-  private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
-      TopicPartition tp,
-      long fromOffset, //inclusive
-      long toOffset, //exclusive
-      EnhancedConsumer consumer,
-      FluxSink<TopicMessageEventDTO> sink
-  ) {
-    consumer.assign(Collections.singleton(tp));
-    consumer.seek(tp, fromOffset);
-    sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
-
-    var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
-    while (!sink.isCancelled()
-        && !sendLimitReached()
-        && consumer.position(tp) < toOffset) {
-
-      var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
-      log.debug("{} records polled from {}", polledRecords.count(), tp);
-
-      var filteredRecords = polledRecords.records(tp).stream()
-          .filter(r -> r.offset() < toOffset)
-          .toList();
-
-      recordsToSend.addAll(filteredRecords);
-    }
-    log.debug("{} records to send", recordsToSend.size());
-    return recordsToSend;
-  }
-}

+ 0 - 38
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BufferingMessagesProcessing.java

@@ -1,38 +0,0 @@
-package com.provectus.kafka.ui.emitter;
-
-import com.provectus.kafka.ui.model.TopicMessageDTO;
-import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Predicate;
-import javax.annotation.Nullable;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.utils.Bytes;
-import reactor.core.publisher.FluxSink;
-
-
-public class BufferingMessagesProcessing extends MessagesProcessing {
-
-  private final List<TopicMessageDTO> buffer = new ArrayList<>();
-
-  private final Comparator<TopicMessageDTO> comparator;
-
-  public BufferingMessagesProcessing(ConsumerRecordDeserializer deserializer,
-                                     Predicate<TopicMessageDTO> filter,
-                                     Comparator<TopicMessageDTO> comparator,
-                                     @Nullable Integer limit) {
-    super(deserializer, filter, limit);
-    this.comparator = comparator;
-  }
-
-  void buffer(ConsumerRecord<Bytes, Bytes> rec) {
-    //TODO
-  }
-
-  void flush(FluxSink<TopicMessageEventDTO> sink) {
-    //TODO
-  }
-
-}

+ 23 - 20
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardPartitionsEmitterImpl.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java

@@ -2,37 +2,40 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
-import java.util.ArrayList;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import lombok.Lombok;
 import org.apache.kafka.common.TopicPartition;
 
-public class ForwardPartitionsEmitterImpl extends AbstractPartitionsEmitter {
+public class ForwardEmitter extends PerPartitionEmitter {
 
-  public ForwardPartitionsEmitterImpl(
-      Supplier<EnhancedConsumer> consumerSupplier,
-      ConsumerPosition consumerPosition,
-      int messagesPerPage,
-      MessagesProcessing messagesProcessing,
-      PollingSettings pollingSettings
-  ) {
-    super(consumerSupplier, consumerPosition, messagesPerPage, messagesProcessing, pollingSettings);
+  public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
+                        ConsumerPosition consumerPosition,
+                        int messagesPerPage,
+                        ConsumerRecordDeserializer deserializer,
+                        Predicate<TopicMessageDTO> filter,
+                        PollingSettings pollingSettings) {
+    super(
+        consumerSupplier,
+        consumerPosition,
+        messagesPerPage,
+        new MessagesProcessing(
+            deserializer,
+            filter,
+            true,
+            messagesPerPage
+        ),
+        pollingSettings
+    );
   }
 
   @Override
-  protected Comparator<TopicMessageDTO> sortBeforeSend() {
-    return (m1, m2) -> 0;
-  }
-
-  @Override
-  protected TreeMap<TopicPartition, FromToOffset> nexPollingRange(EnhancedConsumer consumer,
-                                                                  TreeMap<TopicPartition, FromToOffset> prevRange,
-                                                                  SeekOperations seekOperations) {
+  protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
+                                                                   SeekOperations seekOperations) {
     TreeMap<TopicPartition, Long> readFromOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
     if (prevRange.isEmpty()) {
       readFromOffsets.putAll(seekOperations.getOffsetsForSeek());

+ 0 - 107
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -1,107 +0,0 @@
-package com.provectus.kafka.ui.emitter;
-
-import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.util.SslPropertiesUtil;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import java.util.function.Supplier;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import reactor.core.publisher.FluxSink;
-
-@Slf4j
-public class ForwardRecordEmitter extends AbstractEmitter {
-
-  private final Supplier<EnhancedConsumer> consumerSupplier;
-  private final ConsumerPosition position;
-
-  public ForwardRecordEmitter(
-      Supplier<EnhancedConsumer> consumerSupplier,
-      ConsumerPosition position,
-      MessagesProcessing messagesProcessing,
-      PollingSettings pollingSettings) {
-    super(messagesProcessing, pollingSettings);
-    this.position = position;
-    this.consumerSupplier = consumerSupplier;
-  }
-
-  @Override
-  public void accept(FluxSink<TopicMessageEventDTO> sink) {
-    log.debug("Starting forward polling for {}", position);
-    try (EnhancedConsumer consumer = consumerSupplier.get()) {
-      sendPhase(sink, "Assigning partitions");
-      var seekOperations = SeekOperations.create(consumer, position);
-      seekOperations.assignAndSeekNonEmptyPartitions();
-
-      while (!sink.isCancelled()
-          && !sendLimitReached()
-          && !seekOperations.assignedPartitionsFullyPolled()) {
-        sendPhase(sink, "Polling");
-        var records = poll(sink, consumer);
-
-        log.debug("{} records polled", records.count());
-
-//        for (ConsumerRecord<Bytes, Bytes> msg : records) {
-//          sendMessage(sink, msg);
-//        }
-      }
-      sendFinishStatsAndCompleteSink(sink);
-      log.debug("Polling finished");
-    } catch (InterruptException kafkaInterruptException) {
-      log.debug("Polling finished due to thread interruption");
-      sink.complete();
-    } catch (Exception e) {
-      log.error("Error occurred while consuming records", e);
-      sink.error(e);
-    }
-  }
-
-  //  @SneakyThrows
-  //  public static void main(String[] args) {
-  //    String topic = "test2tx";
-  //
-  //    Properties properties = new Properties();
-  //    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-  //    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-  //    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-  //    properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
-  //
-  //    try (var producer = new KafkaProducer<>(properties)) {
-  //      producer.initTransactions();
-  //
-  //      for (int i = 0; i < 5; i++) {
-  //        producer.beginTransaction();
-  //        for (int j = 0; j < 300; j++) {
-  //          producer.send(new ProducerRecord<>(topic, (i + 1) + "", "j=" + j + "-" + RandomStringUtils.random(5)))
-  //              .get();
-  //        }
-  //        producer.abortTransaction();
-  //
-  //        producer.beginTransaction();
-  //        producer.send(new ProducerRecord<>(topic, (i + 1) + "", "VISIBLE" + "-" + RandomStringUtils.random(5)))
-  //            .get();
-  //        producer.commitTransaction();
-  //
-  //        producer.beginTransaction();
-  //        for (int j = 0; j < 300; j++) {
-  //          producer.send(
-  //          new ProducerRecord<>(topic, ((i * 10) + 1) + "", "j=" + j + "-" + RandomStringUtils.random(5)))
-  //              .get();
-  //        }
-  //        producer.abortTransaction();
-  //      }
-  //    }
-  //  }
-
-}

+ 52 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -1,39 +1,42 @@
 package com.provectus.kafka.ui.emitter;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public class MessagesProcessing {
+@RequiredArgsConstructor
+class MessagesProcessing {
 
   private final ConsumingStats consumingStats = new ConsumingStats();
   private final List<TopicMessageDTO> buffer = new ArrayList<>();
-
   private long sentMessages = 0;
   private int filterApplyErrors = 0;
+
   private final ConsumerRecordDeserializer deserializer;
   private final Predicate<TopicMessageDTO> filter;
+  private final boolean ascendingSortBeforeSend;
   private final @Nullable Integer limit;
 
-  public MessagesProcessing(ConsumerRecordDeserializer deserializer,
-                            Predicate<TopicMessageDTO> filter,
-                            @Nullable Integer limit) {
-    this.deserializer = deserializer;
-    this.filter = filter;
-    this.limit = limit;
-  }
-
   boolean limitReached() {
     return limit != null && sentMessages >= limit;
   }
@@ -53,23 +56,50 @@ public class MessagesProcessing {
     }
   }
 
-  void flush(FluxSink<TopicMessageEventDTO> sink, Comparator<TopicMessageDTO> sortBeforeSend) {
-    while (!sink.isCancelled()) {
-      buffer.sort(sortBeforeSend);
-      for (TopicMessageDTO topicMessage : buffer) {
-        sink.next(
-            new TopicMessageEventDTO()
-                .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
-                .message(topicMessage)
-        );
-      }
-    }
+  @VisibleForTesting
+  static Stream<TopicMessageDTO> sorted(List<TopicMessageDTO> buffer, boolean asc) {
+    Comparator<TopicMessageDTO> offsetComparator = asc
+        ? Comparator.comparingLong(TopicMessageDTO::getOffset)
+        : Comparator.comparingLong(TopicMessageDTO::getOffset).reversed();
+
+    Comparator<TopicMessageDTO> tsComparator = asc
+        ? Comparator.comparing(TopicMessageDTO::getTimestamp)
+        : Comparator.comparing(TopicMessageDTO::getTimestamp).reversed();
+
+    TreeMap<Integer, List<TopicMessageDTO>> perPartition = buffer.stream()
+        .collect(
+            Collectors.groupingBy(
+                TopicMessageDTO::getPartition,
+                TreeMap::new,
+                Collectors.collectingAndThen(
+                    Collectors.toList(),
+                    lst -> lst.stream().sorted(offsetComparator).toList())));
+
+    return Streams.stream(
+        Iterables.mergeSorted(
+            perPartition.values(),
+            tsComparator
+        )
+    );
+  }
+
+  void flush(FluxSink<TopicMessageEventDTO> sink) {
+    sorted(buffer, ascendingSortBeforeSend)
+        .forEach(topicMessage -> {
+          if (!sink.isCancelled()) {
+            sink.next(
+                new TopicMessageEventDTO()
+                    .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
+                    .message(topicMessage)
+            );
+          }
+        });
     buffer.clear();
   }
 
   void sendWithoutBuffer(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
     buffer(rec);
-    flush(sink, (m1, m2) -> 0);
+    flush(sink);
   }
 
   void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {

+ 15 - 24
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractPartitionsEmitter.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PerPartitionEmitter.java

@@ -4,8 +4,6 @@ import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.TreeMap;
 import java.util.function.Supplier;
@@ -17,17 +15,17 @@ import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public abstract class AbstractPartitionsEmitter extends AbstractEmitter {
+public abstract class PerPartitionEmitter extends AbstractEmitter {
 
   private final Supplier<EnhancedConsumer> consumerSupplier;
   protected final ConsumerPosition consumerPosition;
   protected final int messagesPerPage;
 
-  public AbstractPartitionsEmitter(Supplier<EnhancedConsumer> consumerSupplier,
-                                   ConsumerPosition consumerPosition,
-                                   int messagesPerPage,
-                                   MessagesProcessing messagesProcessing,
-                                   PollingSettings pollingSettings) {
+  protected PerPartitionEmitter(Supplier<EnhancedConsumer> consumerSupplier,
+                                ConsumerPosition consumerPosition,
+                                int messagesPerPage,
+                                MessagesProcessing messagesProcessing,
+                                PollingSettings pollingSettings) {
     super(messagesProcessing, pollingSettings);
     this.consumerPosition = consumerPosition;
     this.messagesPerPage = messagesPerPage;
@@ -35,41 +33,34 @@ public abstract class AbstractPartitionsEmitter extends AbstractEmitter {
   }
 
   // from inclusive, to exclusive
-  protected record FromToOffset(long from, long to) {
+  protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
   }
 
   //should return empty map if polling should be stopped
-  protected abstract TreeMap<TopicPartition, FromToOffset> nexPollingRange(
-      EnhancedConsumer consumer,
+  protected abstract TreeMap<TopicPartition, FromToOffset> nextPollingRange(
       TreeMap<TopicPartition, FromToOffset> prevRange, //empty on start
       SeekOperations seekOperations
   );
 
-  protected abstract Comparator<TopicMessageDTO> sortBeforeSend();
-
-  private void logReadRange(TreeMap<TopicPartition, FromToOffset> range) {
-    log.debug("Polling offsets range {}", range);
-  }
-
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting polling for {}", consumerPosition);
     try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Consumer created");
-
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
-      TreeMap<TopicPartition, FromToOffset> readRange = nexPollingRange(consumer, new TreeMap<>(), seekOperations);
-      logReadRange(readRange);
+      TreeMap<TopicPartition, FromToOffset> readRange = nextPollingRange(new TreeMap<>(), seekOperations);
+      log.debug("Starting from offsets {}", readRange);
+
       while (!sink.isCancelled() && !readRange.isEmpty() && !sendLimitReached()) {
         readRange.forEach((tp, fromTo) -> {
           if (sink.isCancelled()) {
             return; //fast return in case of sink cancellation
           }
           partitionPollIteration(tp, fromTo.from, fromTo.to, consumer, sink)
-              .forEach(messagesProcessing::buffer);
+              .forEach(this::buffer);
         });
-        messagesProcessing.flush(sink, sortBeforeSend());
-        readRange = nexPollingRange(consumer, readRange, seekOperations);
+        flushBuffer(sink);
+        readRange = nextPollingRange(readRange, seekOperations);
       }
       if (sink.isCancelled()) {
         log.debug("Polling finished due to sink cancellation");
@@ -99,7 +90,7 @@ public abstract class AbstractPartitionsEmitter extends AbstractEmitter {
         && !sendLimitReached()
         && consumer.position(tp) < toOffset) {
 
-      var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
+      var polledRecords = pollSinglePartition(sink, consumer);
       log.debug("{} records polled from {}", polledRecords.count(), tp);
 
       polledRecords.records(tp).stream()

+ 7 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -1,8 +1,11 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.HashMap;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.errors.InterruptException;
@@ -16,9 +19,10 @@ public class TailingEmitter extends AbstractEmitter {
 
   public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
                         ConsumerPosition consumerPosition,
-                        MessagesProcessing messagesProcessing,
+                        ConsumerRecordDeserializer deserializer,
+                        Predicate<TopicMessageDTO> filter,
                         PollingSettings pollingSettings) {
-    super(messagesProcessing, pollingSettings);
+    super(new MessagesProcessing(deserializer, filter, true, null), pollingSettings);
     this.consumerSupplier = consumerSupplier;
     this.consumerPosition = consumerPosition;
   }
@@ -31,7 +35,7 @@ public class TailingEmitter extends AbstractEmitter {
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");
         var polled = poll(sink, consumer);
-        polled.forEach(r -> messagesProcessing.sendWithoutBuffer(sink, r));
+        polled.forEach(r -> sendWithoutBuffer(sink, r));
       }
       sink.complete();
       log.debug("Tailing finished");

+ 7 - 16
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -3,10 +3,9 @@ package com.provectus.kafka.ui.service;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.emitter.AbstractEmitter;
-import com.provectus.kafka.ui.emitter.BackwardPartitionsEmitterImpl;
-import com.provectus.kafka.ui.emitter.ForwardPartitionsEmitterImpl;
+import com.provectus.kafka.ui.emitter.BackwardEmitter;
+import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilters;
-import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
@@ -234,25 +233,17 @@ public class MessagesService {
     var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
     var filter = getMsgFilter(query, filterQueryType);
     AbstractEmitter emitter = switch (seekDirection) {
-      case FORWARD -> new ForwardPartitionsEmitterImpl(
+      case FORWARD -> new ForwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          limit,
-          new MessagesProcessing(deserializer, filter, limit),
-          cluster.getPollingSettings()
+          consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
       );
-      case BACKWARD -> new BackwardPartitionsEmitterImpl(
+      case BACKWARD -> new BackwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          limit,
-          new MessagesProcessing(deserializer, filter, limit),
-          cluster.getPollingSettings()
+          consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
       );
       case TAILING -> new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          new MessagesProcessing(deserializer, filter, null),
-          cluster.getPollingSettings()
+          consumerPosition, deserializer, filter, cluster.getPollingSettings()
       );
     };
     return Flux.create(emitter)

+ 57 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessagesProcessingTest.java

@@ -0,0 +1,57 @@
+package com.provectus.kafka.ui.emitter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.junit.jupiter.api.RepeatedTest;
+
+class MessagesProcessingTest {
+
+  @RepeatedTest(5)
+  void testSortingAsc() {
+    var messagesInOrder = List.of(
+        new TopicMessageDTO(1, 100L, OffsetDateTime.parse("1999-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(0, 0L, OffsetDateTime.parse("2000-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(1, 200L, OffsetDateTime.parse("2000-01-05T00:00:00+00:00")),
+        new TopicMessageDTO(0, 10L, OffsetDateTime.parse("2000-01-10T00:00:00+00:00")),
+        new TopicMessageDTO(0, 20L, OffsetDateTime.parse("2000-01-20T00:00:00+00:00")),
+        new TopicMessageDTO(1, 300L, OffsetDateTime.parse("3000-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(2, 1000L, OffsetDateTime.parse("4000-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(2, 1001L, OffsetDateTime.parse("2000-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(2, 1003L, OffsetDateTime.parse("3000-01-01T00:00:00+00:00"))
+    );
+
+    var shuffled = new ArrayList<>(messagesInOrder);
+    Collections.shuffle(shuffled);
+
+    var sortedList = MessagesProcessing.sorted(shuffled, true).toList();
+    assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
+  }
+
+
+  @RepeatedTest(5)
+  void testSortingDesc() {
+    var messagesInOrder = List.of(
+        new TopicMessageDTO(1, 300L, OffsetDateTime.parse("3000-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(2, 1003L, OffsetDateTime.parse("3000-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(0, 20L, OffsetDateTime.parse("2000-01-20T00:00:00+00:00")),
+        new TopicMessageDTO(0, 10L, OffsetDateTime.parse("2000-01-10T00:00:00+00:00")),
+        new TopicMessageDTO(1, 200L, OffsetDateTime.parse("2000-01-05T00:00:00+00:00")),
+        new TopicMessageDTO(0, 0L, OffsetDateTime.parse("2000-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(2, 1001L, OffsetDateTime.parse("2000-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(2, 1000L, OffsetDateTime.parse("4000-01-01T00:00:00+00:00")),
+        new TopicMessageDTO(1, 100L, OffsetDateTime.parse("1999-01-01T00:00:00+00:00"))
+    );
+
+    var shuffled = new ArrayList<>(messagesInOrder);
+    Collections.shuffle(shuffled);
+
+    var sortedList = MessagesProcessing.sorted(shuffled, false).toList();
+    assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
+  }
+
+}

+ 39 - 30
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -7,14 +7,13 @@ import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
-import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.BackwardEmitter;
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
-import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
-import com.provectus.kafka.ui.emitter.MessagesProcessing;
+import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.emitter.PollingThrottler;
-import com.provectus.kafka.ui.emitter.TimestampsSortedMessageProcessing;
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.serde.api.Serde;
@@ -32,16 +31,15 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -59,6 +57,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static final String EMPTY_TOPIC = TOPIC + "_empty";
   static final List<Record> SENT_RECORDS = new ArrayList<>();
   static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
+  static final Predicate<TopicMessageDTO> NOOP_FILTER = m -> true;
 
   @BeforeAll
   static void generateMsgs() throws Exception {
@@ -110,24 +109,23 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     );
   }
 
-  private MessagesProcessing createMessagesProcessing() {
-    return new MessagesProcessing(RECORD_DESERIALIZER, msg -> true, null);
-  }
-
   @Test
   void pollNothingOnEmptyTopic() {
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
-        createMessagesProcessing(),
+        100,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
         100,
-        new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -146,18 +144,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
   @Test
   void pollFullTopicFromBeginning() {
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, TOPIC, null),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(LATEST, TOPIC, null),
         PARTITIONS * MSGS_PER_PARTITION,
-        new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -175,18 +176,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       targetOffsets.put(new TopicPartition(TOPIC, i), offset);
     }
 
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         PARTITIONS * MSGS_PER_PARTITION,
-        new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -220,18 +224,21 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       );
     }
 
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
         PARTITIONS * MSGS_PER_PARTITION,
-        new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -258,11 +265,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION);
     }
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         numMessages,
-        new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -284,11 +292,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       offsets.put(new TopicPartition(TOPIC, i), 0L);
     }
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, offsets),
         100,
-        new TimestampsSortedMessageProcessing(RECORD_DESERIALIZER, msg -> true, null),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault()
     );