From ca476d337337de4721552d962d9056f69b7ab113 Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 2 Jul 2021 14:52:40 +0300 Subject: [PATCH] #122 Fixed compacted topics backward reading (#607) --- .../provectus/kafka/ui/emitter/BackwardRecordEmitter.java | 7 +++++++ .../main/java/com/provectus/kafka/ui/util/OffsetsSeek.java | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index 088366c238..8bf2bab547 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -64,6 +64,13 @@ public class BackwardRecordEmitter log.debug("{} records polled", records.count()); log.debug("{} records sent", partitionRecords.size()); + + // This is workaround for case when partition begin offset is less than + // real minimal offset, usually appear in compcated topics + if (records.count() > 0 && partitionRecords.isEmpty()) { + waitingOffsets.markPolled(entry.getKey().partition()); + } + for (ConsumerRecord msg : partitionRecords) { if (!sink.isCancelled() && !waitingOffsets.beginReached()) { sink.next(msg); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java index 17a496d53b..6cc275c4c8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java @@ -110,6 +110,11 @@ public abstract class OffsetsSeek { .collect(Collectors.toList()); } + public void markPolled(int partition) { + endOffsets.remove(partition); + beginOffsets.remove(partition); + } + public void markPolled(ConsumerRecord rec) { Long endWaiting = endOffsets.get(rec.partition()); if (endWaiting != null && endWaiting <= rec.offset()) {