|
@@ -33,10 +33,13 @@ public class ForwardRecordEmitter
|
|
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
|
|
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
|
|
sendPhase(sink, "Assigning partitions");
|
|
sendPhase(sink, "Assigning partitions");
|
|
var waitingOffsets = offsetsSeek.assignAndSeek(consumer);
|
|
var waitingOffsets = offsetsSeek.assignAndSeek(consumer);
|
|
- while (!sink.isCancelled() && !waitingOffsets.endReached()) {
|
|
|
|
|
|
+ // we use empty polls counting to verify that topic was fully read
|
|
|
|
+ int emptyPolls = 0;
|
|
|
|
+ while (!sink.isCancelled() && !waitingOffsets.endReached() && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) {
|
|
sendPhase(sink, "Polling");
|
|
sendPhase(sink, "Polling");
|
|
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
|
|
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
|
|
log.info("{} records polled", records.count());
|
|
log.info("{} records polled", records.count());
|
|
|
|
+ emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0;
|
|
|
|
|
|
for (ConsumerRecord<Bytes, Bytes> msg : records) {
|
|
for (ConsumerRecord<Bytes, Bytes> msg : records) {
|
|
if (!sink.isCancelled() && !waitingOffsets.endReached()) {
|
|
if (!sink.isCancelled() && !waitingOffsets.endReached()) {
|