|
@@ -104,7 +104,7 @@ public class ConsumingService {
|
|
|
while (!sink.isCancelled()) {
|
|
|
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
|
|
|
log.info("{} records polled", records.count());
|
|
|
- if (records.count() == 0 && emptyPollsCount < MAX_EMPTY_POLLS_COUNT) {
|
|
|
+ if (records.count() == 0 && emptyPollsCount > MAX_EMPTY_POLLS_COUNT) {
|
|
|
break;
|
|
|
} else {
|
|
|
emptyPollsCount++;
|