From 5a0b23ed59dae90d04b9ddd73d462bb01c3e0563 Mon Sep 17 00:00:00 2001 From: Anton Petrov Date: Mon, 15 Jun 2020 13:12:56 +0300 Subject: [PATCH] Fix messages waiting for empty topic (#60) --- .../provectus/kafka/ui/cluster/service/ConsumingService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java index 2dbf18ab34..a85b628a04 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java @@ -68,12 +68,13 @@ public class ConsumingService { assignPartitions(consumer); seekOffsets(consumer); int pollsCount = 0; - while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) { + while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) { ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); log.info("{} records polled", records.count()); records.iterator() .forEachRemaining(sink::next); } + sink.complete(); } catch (Exception e) { log.error("Error occurred while consuming records", e); throw new RuntimeException(e);