parent
09ebd03e71
commit
ca476d3373
2 changed files with 12 additions and 0 deletions
|
@ -64,6 +64,13 @@ public class BackwardRecordEmitter
|
||||||
|
|
||||||
log.debug("{} records polled", records.count());
|
log.debug("{} records polled", records.count());
|
||||||
log.debug("{} records sent", partitionRecords.size());
|
log.debug("{} records sent", partitionRecords.size());
|
||||||
|
|
||||||
|
// This is workaround for case when partition begin offset is less than
|
||||||
|
// real minimal offset, usually appear in compcated topics
|
||||||
|
if (records.count() > 0 && partitionRecords.isEmpty()) {
|
||||||
|
waitingOffsets.markPolled(entry.getKey().partition());
|
||||||
|
}
|
||||||
|
|
||||||
for (ConsumerRecord<Bytes, Bytes> msg : partitionRecords) {
|
for (ConsumerRecord<Bytes, Bytes> msg : partitionRecords) {
|
||||||
if (!sink.isCancelled() && !waitingOffsets.beginReached()) {
|
if (!sink.isCancelled() && !waitingOffsets.beginReached()) {
|
||||||
sink.next(msg);
|
sink.next(msg);
|
||||||
|
|
|
@ -110,6 +110,11 @@ public abstract class OffsetsSeek {
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void markPolled(int partition) {
|
||||||
|
endOffsets.remove(partition);
|
||||||
|
beginOffsets.remove(partition);
|
||||||
|
}
|
||||||
|
|
||||||
public void markPolled(ConsumerRecord<?, ?> rec) {
|
public void markPolled(ConsumerRecord<?, ?> rec) {
|
||||||
Long endWaiting = endOffsets.get(rec.partition());
|
Long endWaiting = endOffsets.get(rec.partition());
|
||||||
if (endWaiting != null && endWaiting <= rec.offset()) {
|
if (endWaiting != null && endWaiting <= rec.offset()) {
|
||||||
|
|
Loading…
Add table
Reference in a new issue