diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java index 84228c4134..9889a68910 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java @@ -16,6 +16,13 @@ import reactor.core.publisher.FluxSink; public abstract class AbstractEmitter { private static final Duration DEFAULT_POLL_TIMEOUT_MS = Duration.ofMillis(1000L); + // In some situations it is hard to say whether records range (between two offsets) was fully polled. + // This happens when we have holes in records sequences that is usual case for compact topics or + // topics with transactional writes. In such cases if you want to poll all records between offsets X and Y + // there is no guarantee that you will ever see record with offset Y. + // To workaround this we can assume that after N consecutive empty polls all target messages were read. + public static final int NO_MORE_DATA_EMPTY_POLLS_COUNT = 3; + private final RecordSerDe recordDeserializer; private final ConsumingStats consumingStats = new ConsumingStats(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index ff29110c97..b84f4cbf6f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -118,7 +118,7 @@ public class BackwardRecordEmitter var recordsToSend = new ArrayList>(); // we use empty polls counting to verify that partition was fully read - for (int emptyPolls = 0; recordsToSend.size() < desiredMsgsToPoll && emptyPolls < 3; ) { + for (int emptyPolls = 0; recordsToSend.size() < desiredMsgsToPoll && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) { var polledRecords = poll(sink, consumer, POLL_TIMEOUT); log.debug("{} records polled from {}", polledRecords.count(), tp); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java index 27a6bbec8c..90561cde94 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java @@ -33,10 +33,13 @@ public class ForwardRecordEmitter try (KafkaConsumer consumer = consumerSupplier.get()) { sendPhase(sink, "Assigning partitions"); var waitingOffsets = offsetsSeek.assignAndSeek(consumer); - while (!sink.isCancelled() && !waitingOffsets.endReached()) { + // we use empty polls counting to verify that topic was fully read + int emptyPolls = 0; + while (!sink.isCancelled() && !waitingOffsets.endReached() && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) { sendPhase(sink, "Polling"); ConsumerRecords records = poll(sink, consumer); log.info("{} records polled", records.count()); + emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0; for (ConsumerRecord msg : records) { if (!sink.isCancelled() && !waitingOffsets.endReached()) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java index 757b469a6b..3e72e8a07e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java @@ -1,5 +1,7 @@ package com.provectus.kafka.ui.service.analyze; +import static com.provectus.kafka.ui.emitter.AbstractEmitter.NO_MORE_DATA_EMPTY_POLLS_COUNT; + import com.provectus.kafka.ui.exception.TopicAnalysisException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.TopicAnalysisDTO; @@ -118,7 +120,7 @@ public class TopicAnalysisService { consumer.seekToBeginning(topicPartitions); var waitingOffsets = new WaitingOffsets(topicId.topicName, consumer, topicPartitions); - for (int emptyPolls = 0; !waitingOffsets.endReached() && emptyPolls < 3; ) { + for (int emptyPolls = 0; !waitingOffsets.endReached() && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) { var polled = consumer.poll(Duration.ofSeconds(3)); emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0; polled.forEach(r -> {