|
@@ -107,6 +107,9 @@ public class ConsumingService {
|
|
|
while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) {
|
|
|
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
|
|
|
log.info("{} records polled", records.count());
|
|
|
+ if (records.count() == 0) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
records.iterator()
|
|
|
.forEachRemaining(sink::next);
|
|
|
}
|