diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java index 30a5f566d3..84228c4134 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java @@ -14,7 +14,7 @@ import org.apache.kafka.common.utils.Bytes; import reactor.core.publisher.FluxSink; public abstract class AbstractEmitter { - private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); + private static final Duration DEFAULT_POLL_TIMEOUT_MS = Duration.ofMillis(1000L); private final RecordSerDe recordDeserializer; private final ConsumingStats consumingStats = new ConsumingStats(); @@ -25,8 +25,13 @@ public abstract class AbstractEmitter { protected ConsumerRecords poll( FluxSink sink, Consumer consumer) { + return poll(sink, consumer, DEFAULT_POLL_TIMEOUT_MS); + } + + protected ConsumerRecords poll( + FluxSink sink, Consumer consumer, Duration timeout) { Instant start = Instant.now(); - ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); + ConsumerRecords records = consumer.poll(timeout); Instant finish = Instant.now(); sendConsuming(sink, records, Duration.between(start, finish).toMillis()); return records; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index 302cb9879b..ff29110c97 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -3,6 +3,8 @@ package com.provectus.kafka.ui.emitter; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serde.RecordSerDe; import com.provectus.kafka.ui.util.OffsetsSeekBackward; +import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -12,9 +14,9 @@ import java.util.TreeMap; import java.util.function.Function; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; @@ -25,6 +27,8 @@ public class BackwardRecordEmitter extends AbstractEmitter implements java.util.function.Consumer> { + private static final Duration POLL_TIMEOUT = Duration.ofMillis(200); + private final Function, KafkaConsumer> consumerSupplier; private final OffsetsSeekBackward offsetsSeek; @@ -51,73 +55,88 @@ public class BackwardRecordEmitter ) { sendPhase(sink, "Created consumer"); - SortedMap partitionsOffsets = + SortedMap readUntilOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition)); - partitionsOffsets.putAll(offsetsSeek.getPartitionsOffsets(consumer)); + readUntilOffsets.putAll(offsetsSeek.getPartitionsOffsets(consumer)); sendPhase(sink, "Requested partitions offsets"); - log.debug("partition offsets: {}", partitionsOffsets); + log.debug("partition offsets: {}", readUntilOffsets); var waitingOffsets = - offsetsSeek.waitingOffsets(consumer, partitionsOffsets.keySet()); + offsetsSeek.waitingOffsets(consumer, readUntilOffsets.keySet()); log.debug("waiting offsets {} {}", waitingOffsets.getBeginOffsets(), waitingOffsets.getEndOffsets() ); while (!sink.isCancelled() && !waitingOffsets.beginReached()) { - for (Map.Entry entry : partitionsOffsets.entrySet()) { - final Long lowest = waitingOffsets.getBeginOffsets().get(entry.getKey().partition()); - if (lowest != null) { - consumer.assign(Collections.singleton(entry.getKey())); - final long offset = Math.max(lowest, entry.getValue() - msgsPerPartition); - log.debug("Polling {} from {}", entry.getKey(), offset); - consumer.seek(entry.getKey(), offset); - sendPhase(sink, - String.format("Consuming partition: %s from %s", entry.getKey(), offset) - ); - final ConsumerRecords records = poll(sink, consumer); - final List> partitionRecords = - records.records(entry.getKey()).stream() - .filter(r -> r.offset() < partitionsOffsets.get(entry.getKey())) - .collect(Collectors.toList()); - Collections.reverse(partitionRecords); + new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> { + long lowestOffset = waitingOffsets.getBeginOffsets().get(tp.partition()); + long readFromOffset = Math.max(lowestOffset, readToOffset - msgsPerPartition); - log.debug("{} records polled", records.count()); - log.debug("{} records sent", partitionRecords.size()); + partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink) + .stream() + .filter(r -> !sink.isCancelled()) + .forEach(r -> sendMessage(sink, r)); - // This is workaround for case when partition begin offset is less than - // real minimal offset, usually appear in compcated topics - if (records.count() > 0 && partitionRecords.isEmpty()) { - waitingOffsets.markPolled(entry.getKey().partition()); - } - - for (ConsumerRecord msg : partitionRecords) { - if (!sink.isCancelled() && !waitingOffsets.beginReached()) { - sendMessage(sink, msg); - waitingOffsets.markPolled(msg); - } else { - log.info("Begin reached"); - break; - } - } - partitionsOffsets.put( - entry.getKey(), - Math.max(offset, entry.getValue() - msgsPerPartition) - ); + waitingOffsets.markPolled(tp.partition(), readFromOffset); + if (waitingOffsets.getBeginOffsets().get(tp.partition()) == null) { + // we fully read this partition -> removing it from polling iterations + readUntilOffsets.remove(tp); + } else { + readUntilOffsets.put(tp, readFromOffset); } - } + }); + if (waitingOffsets.beginReached()) { - log.info("begin reached after partitions"); + log.debug("begin reached after partitions poll iteration"); } else if (sink.isCancelled()) { - log.info("sink is cancelled after partitions"); + log.debug("sink is cancelled after partitions poll iteration"); } } sink.complete(); - log.info("Polling finished"); + log.debug("Polling finished"); } } catch (Exception e) { log.error("Error occurred while consuming records", e); sink.error(e); } } + + + private List> partitionPollIteration( + TopicPartition tp, + long fromOffset, + long toOffset, + Consumer consumer, + FluxSink sink + ) { + consumer.assign(Collections.singleton(tp)); + consumer.seek(tp, fromOffset); + sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset)); + int desiredMsgsToPoll = (int) (toOffset - fromOffset); + + var recordsToSend = new ArrayList>(); + + // we use empty polls counting to verify that partition was fully read + for (int emptyPolls = 0; recordsToSend.size() < desiredMsgsToPoll && emptyPolls < 3; ) { + var polledRecords = poll(sink, consumer, POLL_TIMEOUT); + log.debug("{} records polled from {}", polledRecords.count(), tp); + + // counting sequential empty polls + emptyPolls = polledRecords.isEmpty() ? emptyPolls + 1 : 0; + + var filteredRecords = polledRecords.records(tp).stream() + .filter(r -> r.offset() < toOffset) + .collect(Collectors.toList()); + + if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) { + // we already read all messages in target offsets interval + break; + } + recordsToSend.addAll(filteredRecords); + } + log.debug("{} records to send", recordsToSend.size()); + Collections.reverse(recordsToSend); + return recordsToSend; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java index fa3ff78e02..0fd830b323 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java @@ -111,27 +111,19 @@ public abstract class OffsetsSeek { .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); } - public List topicPartitions() { - return this.endOffsets.keySet().stream() - .map(p -> new TopicPartition(topic, p)) - .collect(Collectors.toList()); - } - - public void markPolled(int partition) { - endOffsets.remove(partition); - beginOffsets.remove(partition); - } - public void markPolled(ConsumerRecord rec) { - Long endWaiting = endOffsets.get(rec.partition()); - if (endWaiting != null && endWaiting <= rec.offset()) { - endOffsets.remove(rec.partition()); - } - Long beginWaiting = beginOffsets.get(rec.partition()); - if (beginWaiting != null && beginWaiting >= rec.offset()) { - beginOffsets.remove(rec.partition()); - } + markPolled(rec.partition(), rec.offset()); + } + public void markPolled(int partition, long offset) { + Long endWaiting = endOffsets.get(partition); + if (endWaiting != null && endWaiting <= offset) { + endOffsets.remove(partition); + } + Long beginWaiting = beginOffsets.get(partition); + if (beginWaiting != null && beginWaiting >= offset) { + beginOffsets.remove(partition); + } } public boolean endReached() {