ISSUE-1024: BackwardRecordEmitter fix (#1934)

BackwardRecordEmitter fix

Co-authored-by: iliax <ikuramshin@provectus.com>
This commit is contained in:
Ilya Kuramshin 2022-05-19 19:34:04 +04:00 committed by GitHub
parent d1c59dd74b
commit 6891138f15
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 67 deletions

View file

@ -14,7 +14,7 @@ import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
public abstract class AbstractEmitter {
private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
private static final Duration DEFAULT_POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
private final RecordSerDe recordDeserializer;
private final ConsumingStats consumingStats = new ConsumingStats();
@ -25,8 +25,13 @@ public abstract class AbstractEmitter {
protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
return poll(sink, consumer, DEFAULT_POLL_TIMEOUT_MS);
}
protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
Instant start = Instant.now();
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
Instant finish = Instant.now();
sendConsuming(sink, records, Duration.between(start, finish).toMillis());
return records;

View file

@ -3,6 +3,8 @@ package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serde.RecordSerDe;
import com.provectus.kafka.ui.util.OffsetsSeekBackward;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@ -12,9 +14,9 @@ import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
@ -25,6 +27,8 @@ public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private static final Duration POLL_TIMEOUT = Duration.ofMillis(200);
private final Function<Map<String, Object>, KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final OffsetsSeekBackward offsetsSeek;
@ -51,73 +55,88 @@ public class BackwardRecordEmitter
) {
sendPhase(sink, "Created consumer");
SortedMap<TopicPartition, Long> partitionsOffsets =
SortedMap<TopicPartition, Long> readUntilOffsets =
new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
partitionsOffsets.putAll(offsetsSeek.getPartitionsOffsets(consumer));
readUntilOffsets.putAll(offsetsSeek.getPartitionsOffsets(consumer));
sendPhase(sink, "Requested partitions offsets");
log.debug("partition offsets: {}", partitionsOffsets);
log.debug("partition offsets: {}", readUntilOffsets);
var waitingOffsets =
offsetsSeek.waitingOffsets(consumer, partitionsOffsets.keySet());
offsetsSeek.waitingOffsets(consumer, readUntilOffsets.keySet());
log.debug("waiting offsets {} {}",
waitingOffsets.getBeginOffsets(),
waitingOffsets.getEndOffsets()
);
while (!sink.isCancelled() && !waitingOffsets.beginReached()) {
for (Map.Entry<TopicPartition, Long> entry : partitionsOffsets.entrySet()) {
final Long lowest = waitingOffsets.getBeginOffsets().get(entry.getKey().partition());
if (lowest != null) {
consumer.assign(Collections.singleton(entry.getKey()));
final long offset = Math.max(lowest, entry.getValue() - msgsPerPartition);
log.debug("Polling {} from {}", entry.getKey(), offset);
consumer.seek(entry.getKey(), offset);
sendPhase(sink,
String.format("Consuming partition: %s from %s", entry.getKey(), offset)
);
final ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
final List<ConsumerRecord<Bytes, Bytes>> partitionRecords =
records.records(entry.getKey()).stream()
.filter(r -> r.offset() < partitionsOffsets.get(entry.getKey()))
.collect(Collectors.toList());
Collections.reverse(partitionRecords);
new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
long lowestOffset = waitingOffsets.getBeginOffsets().get(tp.partition());
long readFromOffset = Math.max(lowestOffset, readToOffset - msgsPerPartition);
log.debug("{} records polled", records.count());
log.debug("{} records sent", partitionRecords.size());
partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
.stream()
.filter(r -> !sink.isCancelled())
.forEach(r -> sendMessage(sink, r));
// This is workaround for case when partition begin offset is less than
// real minimal offset, usually appear in compcated topics
if (records.count() > 0 && partitionRecords.isEmpty()) {
waitingOffsets.markPolled(entry.getKey().partition());
}
for (ConsumerRecord<Bytes, Bytes> msg : partitionRecords) {
if (!sink.isCancelled() && !waitingOffsets.beginReached()) {
sendMessage(sink, msg);
waitingOffsets.markPolled(msg);
waitingOffsets.markPolled(tp.partition(), readFromOffset);
if (waitingOffsets.getBeginOffsets().get(tp.partition()) == null) {
// we fully read this partition -> removing it from polling iterations
readUntilOffsets.remove(tp);
} else {
log.info("Begin reached");
break;
}
}
partitionsOffsets.put(
entry.getKey(),
Math.max(offset, entry.getValue() - msgsPerPartition)
);
}
readUntilOffsets.put(tp, readFromOffset);
}
});
if (waitingOffsets.beginReached()) {
log.info("begin reached after partitions");
log.debug("begin reached after partitions poll iteration");
} else if (sink.isCancelled()) {
log.info("sink is cancelled after partitions");
log.debug("sink is cancelled after partitions poll iteration");
}
}
sink.complete();
log.info("Polling finished");
log.debug("Polling finished");
}
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
sink.error(e);
}
}
private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
TopicPartition tp,
long fromOffset,
long toOffset,
Consumer<Bytes, Bytes> consumer,
FluxSink<TopicMessageEventDTO> sink
) {
consumer.assign(Collections.singleton(tp));
consumer.seek(tp, fromOffset);
sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
int desiredMsgsToPoll = (int) (toOffset - fromOffset);
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
// we use empty polls counting to verify that partition was fully read
for (int emptyPolls = 0; recordsToSend.size() < desiredMsgsToPoll && emptyPolls < 3; ) {
var polledRecords = poll(sink, consumer, POLL_TIMEOUT);
log.debug("{} records polled from {}", polledRecords.count(), tp);
// counting sequential empty polls
emptyPolls = polledRecords.isEmpty() ? emptyPolls + 1 : 0;
var filteredRecords = polledRecords.records(tp).stream()
.filter(r -> r.offset() < toOffset)
.collect(Collectors.toList());
if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
// we already read all messages in target offsets interval
break;
}
recordsToSend.addAll(filteredRecords);
}
log.debug("{} records to send", recordsToSend.size());
Collections.reverse(recordsToSend);
return recordsToSend;
}
}

View file

@ -111,27 +111,19 @@ public abstract class OffsetsSeek {
.collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
}
public List<TopicPartition> topicPartitions() {
return this.endOffsets.keySet().stream()
.map(p -> new TopicPartition(topic, p))
.collect(Collectors.toList());
public void markPolled(ConsumerRecord<?, ?> rec) {
markPolled(rec.partition(), rec.offset());
}
public void markPolled(int partition) {
public void markPolled(int partition, long offset) {
Long endWaiting = endOffsets.get(partition);
if (endWaiting != null && endWaiting <= offset) {
endOffsets.remove(partition);
}
Long beginWaiting = beginOffsets.get(partition);
if (beginWaiting != null && beginWaiting >= offset) {
beginOffsets.remove(partition);
}
public void markPolled(ConsumerRecord<?, ?> rec) {
Long endWaiting = endOffsets.get(rec.partition());
if (endWaiting != null && endWaiting <= rec.offset()) {
endOffsets.remove(rec.partition());
}
Long beginWaiting = beginOffsets.get(rec.partition());
if (beginWaiting != null && beginWaiting >= rec.offset()) {
beginOffsets.remove(rec.partition());
}
}
public boolean endReached() {