#122 Fixed npe in backward emitter (#601)

This commit is contained in:
German Osin 2021-06-29 19:00:31 +03:00 committed by GitHub
parent 97ec512b00
commit 3fa2b995c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 26 deletions

View file

@ -40,42 +40,44 @@ public class BackwardRecordEmitter
) {
final Map<TopicPartition, Long> partitionsOffsets =
offsetsSeek.getPartitionsOffsets(consumer);
log.info("partition offsets: {}", partitionsOffsets);
log.debug("partition offsets: {}", partitionsOffsets);
var waitingOffsets =
offsetsSeek.waitingOffsets(consumer, partitionsOffsets.keySet());
log.info("waittin offsets {} {}",
log.debug("waittin offsets {} {}",
waitingOffsets.getBeginOffsets(),
waitingOffsets.getEndOffsets()
);
while (!sink.isCancelled() && !waitingOffsets.beginReached()) {
for (Map.Entry<TopicPartition, Long> entry : partitionsOffsets.entrySet()) {
final Long lowest = waitingOffsets.getBeginOffsets().get(entry.getKey().partition());
consumer.assign(Collections.singleton(entry.getKey()));
final long offset = Math.max(lowest, entry.getValue() - msgsPerPartition);
log.info("Polling {} from {}", entry.getKey(), offset);
consumer.seek(entry.getKey(), offset);
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
final List<ConsumerRecord<Bytes, Bytes>> partitionRecords =
records.records(entry.getKey()).stream()
.filter(r -> r.offset() < partitionsOffsets.get(entry.getKey()))
.collect(Collectors.toList());
Collections.reverse(partitionRecords);
if (lowest != null) {
consumer.assign(Collections.singleton(entry.getKey()));
final long offset = Math.max(lowest, entry.getValue() - msgsPerPartition);
log.debug("Polling {} from {}", entry.getKey(), offset);
consumer.seek(entry.getKey(), offset);
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
final List<ConsumerRecord<Bytes, Bytes>> partitionRecords =
records.records(entry.getKey()).stream()
.filter(r -> r.offset() < partitionsOffsets.get(entry.getKey()))
.collect(Collectors.toList());
Collections.reverse(partitionRecords);
log.info("{} records polled", records.count());
log.info("{} records sent", partitionRecords.size());
for (ConsumerRecord<Bytes, Bytes> msg : partitionRecords) {
if (!sink.isCancelled() && !waitingOffsets.beginReached()) {
sink.next(msg);
waitingOffsets.markPolled(msg);
} else {
log.info("Begin reached");
break;
log.debug("{} records polled", records.count());
log.debug("{} records sent", partitionRecords.size());
for (ConsumerRecord<Bytes, Bytes> msg : partitionRecords) {
if (!sink.isCancelled() && !waitingOffsets.beginReached()) {
sink.next(msg);
waitingOffsets.markPolled(msg);
} else {
log.info("Begin reached");
break;
}
}
partitionsOffsets.put(
entry.getKey(),
Math.max(offset, entry.getValue() - msgsPerPartition)
);
}
partitionsOffsets.put(
entry.getKey(),
Math.max(offset, entry.getValue() - msgsPerPartition)
);
}
if (waitingOffsets.beginReached()) {
log.info("begin reached after partitions");

View file

@ -3,7 +3,7 @@ kafka:
- name: local
bootstrapServers: b-1.kad-msk.uxahxx.c6.kafka.eu-west-1.amazonaws.com:9092
# zookeeper: localhost:2181
# schemaRegistry: http://localhost:8083
schemaRegistry: http://kad-ecs-application-lb-857515197.eu-west-1.elb.amazonaws.com:9000/api/schema-registry
# -
# name: secondLocal
# zookeeper: zookeeper1:2181