diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java index d60d99d76c..94b6ce236b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java @@ -35,7 +35,7 @@ public class ForwardRecordEmitter try (KafkaConsumer consumer = consumerSupplier.get()) { sendPhase(sink, "Assigning partitions"); var seekOperations = SeekOperations.create(consumer, position); - seekOperations.assignAndSeekNonEmptyPartitions(); + seekOperations.assignAndSeek(); EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter(); while (!sink.isCancelled() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java index 8580272417..b8c31c0af5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java @@ -1,19 +1,20 @@ package com.provectus.kafka.ui.emitter; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.mutable.MutableLong; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; @Slf4j @Getter -class OffsetsInfo { +public class OffsetsInfo { private final Consumer consumer; @@ -23,15 +24,16 @@ class OffsetsInfo { private final Set nonEmptyPartitions = new HashSet<>(); private final Set emptyPartitions = new HashSet<>(); - OffsetsInfo(Consumer consumer, String topic) { + public OffsetsInfo(Consumer consumer, String topic) { this(consumer, consumer.partitionsFor(topic).stream() .map(pi -> new TopicPartition(topic, pi.partition())) - .toList() + .collect(Collectors.toList()) ); } - OffsetsInfo(Consumer consumer, Collection targetPartitions) { + public OffsetsInfo(Consumer consumer, + Collection targetPartitions) { this.consumer = consumer; this.beginOffsets = consumer.beginningOffsets(targetPartitions); this.endOffsets = consumer.endOffsets(targetPartitions); @@ -45,8 +47,8 @@ class OffsetsInfo { }); } - boolean assignedPartitionsFullyPolled() { - for (var tp : consumer.assignment()) { + public boolean assignedPartitionsFullyPolled() { + for (var tp: consumer.assignment()) { Preconditions.checkArgument(endOffsets.containsKey(tp)); if (endOffsets.get(tp) > consumer.position(tp)) { return false; @@ -55,10 +57,8 @@ class OffsetsInfo { return true; } - long summaryOffsetsRange() { - MutableLong cnt = new MutableLong(); - nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp))); - return cnt.getValue(); + public Set allTargetPartitions() { + return Sets.union(nonEmptyPartitions, emptyPartitions); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java index 3d1d02345a..44f727b20e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java @@ -21,20 +21,14 @@ class SeekOperations { private final Map offsetsForSeek; //only contains non-empty partitions! static SeekOperations create(Consumer consumer, ConsumerPosition consumerPosition) { - OffsetsInfo offsetsInfo; - if (consumerPosition.partitions().isEmpty()) { - offsetsInfo = new OffsetsInfo(consumer, consumerPosition.topic()); - } else { - offsetsInfo = new OffsetsInfo(consumer, consumerPosition.partitions()); - } - return new SeekOperations( - consumer, - offsetsInfo, - getOffsetsForSeek(consumer, offsetsInfo, consumerPosition) - ); + OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty() + ? new OffsetsInfo(consumer, consumerPosition.topic()) + : new OffsetsInfo(consumer, consumerPosition.partitions()); + var offsetsToSeek = getOffsetsForSeek(consumer, offsetsInfo, consumerPosition); + return new SeekOperations(consumer, offsetsInfo, offsetsToSeek); } - void assignAndSeekNonEmptyPartitions() { + void assignAndSeek() { consumer.assign(offsetsForSeek.keySet()); offsetsForSeek.forEach(consumer::seek); } @@ -43,10 +37,6 @@ class SeekOperations { return offsetsInfo.getBeginOffsets(); } - Map getEndOffsets() { - return offsetsInfo.getEndOffsets(); - } - boolean assignedPartitionsFullyPolled() { return offsetsInfo.assignedPartitionsFullyPolled(); } @@ -64,7 +54,9 @@ class SeekOperations { OffsetsInfo offsetsInfo, ConsumerPosition position) { switch (position.pollingMode()) { - case LATEST, TAILING: + case TAILING: + return consumer.endOffsets(offsetsInfo.allTargetPartitions()); + case LATEST: return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions()); case EARLIEST: return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java index c3f04fe8cc..dee522b01e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java @@ -1,28 +1,26 @@ package com.provectus.kafka.ui.emitter; import com.provectus.kafka.ui.model.ConsumerPosition; -import com.provectus.kafka.ui.model.TopicMessageDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; -import java.util.HashMap; -import java.util.function.Predicate; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.utils.Bytes; import reactor.core.publisher.FluxSink; @Slf4j public class TailingEmitter extends AbstractEmitter { - private final Supplier consumerSupplier; + private final Supplier> consumerSupplier; private final ConsumerPosition consumerPosition; - public TailingEmitter(Supplier consumerSupplier, + public TailingEmitter(Supplier> consumerSupplier, ConsumerPosition consumerPosition, - ConsumerRecordDeserializer deserializer, - Predicate filter, + ConsumerRecordDeserializer recordDeserializer, PollingSettings pollingSettings) { - super(new MessagesProcessing(deserializer, filter, false, null), pollingSettings); + super(recordDeserializer, pollingSettings); this.consumerSupplier = consumerSupplier; this.consumerPosition = consumerPosition; } @@ -30,12 +28,13 @@ public class TailingEmitter extends AbstractEmitter { @Override public void accept(FluxSink sink) { log.debug("Starting tailing polling for {}", consumerPosition); - try (EnhancedConsumer consumer = consumerSupplier.get()) { - assignAndSeek(consumer); + try (KafkaConsumer consumer = consumerSupplier.get()) { + SeekOperations.create(consumer, consumerPosition) + .assignAndSeek(); while (!sink.isCancelled()) { sendPhase(sink, "Polling"); var polled = poll(sink, consumer); - send(sink, polled); + polled.forEach(r -> sendMessage(sink, r)); } sink.complete(); log.debug("Tailing finished"); @@ -48,12 +47,4 @@ public class TailingEmitter extends AbstractEmitter { } } - private void assignAndSeek(EnhancedConsumer consumer) { - var seekOperations = SeekOperations.create(consumer, consumerPosition); - var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end - seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions - consumer.assign(seekOffsets.keySet()); - seekOffsets.forEach(consumer::seek); - } - } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java index 79bda50174..e288e77a11 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java @@ -1,5 +1,8 @@ package com.provectus.kafka.ui.emitter; +import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST; +import static com.provectus.kafka.ui.model.PollingModeDTO.LATEST; +import static com.provectus.kafka.ui.model.PollingModeDTO.TAILING; import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.model.ConsumerPosition; @@ -44,12 +47,22 @@ class SeekOperationsTest { @Nested class GetOffsetsForSeek { + @Test + void tailing() { + var offsets = SeekOperations.getOffsetsForSeek( + consumer, + new OffsetsInfo(consumer, topic), + new ConsumerPosition(TAILING, topic, List.of(), null, null) + ); + assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp0, 0L, tp1, 10L, tp2, 20L, tp3, 30L)); + } + @Test void latest() { var offsets = SeekOperations.getOffsetsForSeek( consumer, new OffsetsInfo(consumer, topic), - new ConsumerPosition(PollingModeDTO.LATEST, topic, null, null, null) + new ConsumerPosition(LATEST, topic, List.of(), null, null) ); assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L)); } @@ -59,7 +72,7 @@ class SeekOperationsTest { var offsets = SeekOperations.getOffsetsForSeek( consumer, new OffsetsInfo(consumer, topic), - new ConsumerPosition(PollingModeDTO.EARLIEST, topic, null, null, null) + new ConsumerPosition(EARLIEST, topic, List.of(), null, null) ); assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L)); }