diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java index fceb80aa9e..a6351aae01 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java @@ -14,12 +14,14 @@ import com.provectus.kafka.ui.util.OffsetsSeekBackward; import com.provectus.kafka.ui.util.OffsetsSeekForward; import java.time.Duration; import java.util.Collection; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; @@ -136,6 +138,15 @@ public class ConsumingService { private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); + private static final Comparator> PARTITION_COMPARING = + Comparator.comparing( + ConsumerRecord::partition, + Comparator.nullsFirst(Comparator.naturalOrder()) + ); + private static final Comparator> REVERED_COMPARING = + PARTITION_COMPARING.thenComparing(ConsumerRecord::offset).reversed(); + + private final Supplier> consumerSupplier; private final OffsetsSeek offsetsSeek; @@ -146,7 +157,16 @@ public class ConsumingService { while (!sink.isCancelled() && !waitingOffsets.endReached()) { ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); log.info("{} records polled", records.count()); - for (ConsumerRecord record : records) { + + final Iterable> iterable; + if (offsetsSeek.getConsumerPosition().getSeekDirection().equals(SeekDirection.FORWARD)) { + iterable = records; + } else { + iterable = StreamSupport.stream(records.spliterator(), false) + .sorted(REVERED_COMPARING).collect(Collectors.toList()); + } + + for (ConsumerRecord record : iterable) { if (!sink.isCancelled() && !waitingOffsets.endReached()) { sink.next(record); waitingOffsets.markPolled(record); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java index 9d082c67bc..6383a83121 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java @@ -23,6 +23,10 @@ public abstract class OffsetsSeek { this.consumerPosition = consumerPosition; } + public ConsumerPosition getConsumerPosition() { + return consumerPosition; + } + public WaitingOffsets assignAndSeek(Consumer consumer) { SeekType seekType = consumerPosition.getSeekType(); log.info("Positioning consumer for topic {} with {}", topic, consumerPosition);