diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index 2dbf18ab34..a85b628a04 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -68,12 +68,13 @@ public class ConsumingService { assignPartitions(consumer); seekOffsets(consumer); int pollsCount = 0; - while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) { + while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) { ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); log.info("{} records polled", records.count()); records.iterator() .forEachRemaining(sink::next); } + sink.complete(); } catch (Exception e) { log.error("Error occurred while consuming records", e); throw new RuntimeException(e);