diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index 088366c238..8bf2bab547 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -64,6 +64,13 @@ public class BackwardRecordEmitter log.debug("{} records polled", records.count()); log.debug("{} records sent", partitionRecords.size()); + + // This is workaround for case when partition begin offset is less than + // real minimal offset, usually appear in compcated topics + if (records.count() > 0 && partitionRecords.isEmpty()) { + waitingOffsets.markPolled(entry.getKey().partition()); + } + for (ConsumerRecord msg : partitionRecords) { if (!sink.isCancelled() && !waitingOffsets.beginReached()) { sink.next(msg); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java index 17a496d53b..6cc275c4c8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java @@ -110,6 +110,11 @@ public abstract class OffsetsSeek { .collect(Collectors.toList()); } + public void markPolled(int partition) { + endOffsets.remove(partition); + beginOffsets.remove(partition); + } + public void markPolled(ConsumerRecord rec) { Long endWaiting = endOffsets.get(rec.partition()); if (endWaiting != null && endWaiting <= rec.offset()) {