iliax 1 gadu atpakaļ
vecāks
revīzija
a11dda8a5f

+ 2 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,8 +1,6 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import java.time.Duration;
-import java.util.List;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
@@ -17,16 +15,12 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
     this.pollingSettings = pollingSettings;
   }
 
-  private PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
-    var records = consumer.pollEnhanced(timeout);
+  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
+    var records = consumer.pollEnhanced(pollingSettings.getPollTimeout());
     sendConsuming(sink, records);
     return records;
   }
 
-  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
-    return poll(sink, consumer, pollingSettings.getPollTimeout());
-  }
-
   protected boolean sendLimitReached() {
     return messagesProcessing.limitReached();
   }

+ 15 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -13,9 +13,9 @@ import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.TreeMap;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -40,30 +40,34 @@ class MessagesProcessing {
     return limit != null && sentMessages >= limit;
   }
 
+  /*
+   * Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
+   */
   @VisibleForTesting
-  static Iterable<ConsumerRecord<Bytes, Bytes>> sorted(Iterable<ConsumerRecord<Bytes, Bytes>> records, boolean asc) {
+  static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
+                                                               boolean asc) {
     Comparator<ConsumerRecord> offsetComparator = asc
         ? Comparator.comparingLong(ConsumerRecord::offset)
         : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
 
-    Comparator<ConsumerRecord> tsComparator = asc
-        ? Comparator.comparing(ConsumerRecord::timestamp)
-        : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
-
-    TreeMap<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
+    // partition -> sorted by offsets records
+    Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
         .collect(
             groupingBy(
                 ConsumerRecord::partition,
                 TreeMap::new,
-                collectingAndThen(
-                    toList(),
-                    lst -> lst.stream().sorted(offsetComparator).toList())));
+                collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
+
+    Comparator<ConsumerRecord> tsComparator = asc
+        ? Comparator.comparing(ConsumerRecord::timestamp)
+        : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
 
+    // merge-sorting records from partitions one by one using timestamp comparator
     return Iterables.mergeSorted(perPartition.values(), tsComparator);
   }
 
   void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
-    sorted(polled, ascendingSortBeforeSend)
+    sortForSending(polled, ascendingSortBeforeSend)
         .forEach(rec -> {
           if (!limitReached() && !sink.isCancelled()) {
             TopicMessageDTO topicMessage = deserializer.deserialize(rec);

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java

@@ -85,6 +85,7 @@ public abstract class RangePollingEmitter extends AbstractEmitter {
             .filter(r -> r.offset() < fromTo.to)
             .forEach(result::add);
 
+        //next position is out of target range -> pausing partition
         if (consumer.position(tp) >= fromTo.to) {
           consumer.pause(List.of(tp));
         }

+ 2 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessagesProcessingTest.java

@@ -33,7 +33,7 @@ class MessagesProcessingTest {
     var shuffled = new ArrayList<>(messagesInOrder);
     Collections.shuffle(shuffled);
 
-    var sortedList = MessagesProcessing.sorted(shuffled, true);
+    var sortedList = MessagesProcessing.sortForSending(shuffled, true);
     assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
   }
 
@@ -54,7 +54,7 @@ class MessagesProcessingTest {
     var shuffled = new ArrayList<>(messagesInOrder);
     Collections.shuffle(shuffled);
 
-    var sortedList = MessagesProcessing.sorted(shuffled, false);
+    var sortedList = MessagesProcessing.sortForSending(shuffled, false);
     assertThat(sortedList).containsExactlyElementsOf(messagesInOrder);
   }