This commit is contained in:
iliax 2023-03-22 23:52:50 +04:00 committed by gokhanimral
parent 1037e41889
commit 65f60b9edb
5 changed files with 46 additions and 50 deletions

View file

@ -35,7 +35,7 @@ public class ForwardRecordEmitter
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) { try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
sendPhase(sink, "Assigning partitions"); sendPhase(sink, "Assigning partitions");
var seekOperations = SeekOperations.create(consumer, position); var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions(); seekOperations.assignAndSeek();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter(); EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled() while (!sink.isCancelled()

View file

@ -1,19 +1,20 @@
package com.provectus.kafka.ui.emitter; package com.provectus.kafka.ui.emitter;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@Slf4j @Slf4j
@Getter @Getter
class OffsetsInfo { public class OffsetsInfo {
private final Consumer<?, ?> consumer; private final Consumer<?, ?> consumer;
@ -23,15 +24,16 @@ class OffsetsInfo {
private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>(); private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
private final Set<TopicPartition> emptyPartitions = new HashSet<>(); private final Set<TopicPartition> emptyPartitions = new HashSet<>();
OffsetsInfo(Consumer<?, ?> consumer, String topic) { public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
this(consumer, this(consumer,
consumer.partitionsFor(topic).stream() consumer.partitionsFor(topic).stream()
.map(pi -> new TopicPartition(topic, pi.partition())) .map(pi -> new TopicPartition(topic, pi.partition()))
.toList() .collect(Collectors.toList())
); );
} }
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) { public OffsetsInfo(Consumer<?, ?> consumer,
Collection<TopicPartition> targetPartitions) {
this.consumer = consumer; this.consumer = consumer;
this.beginOffsets = consumer.beginningOffsets(targetPartitions); this.beginOffsets = consumer.beginningOffsets(targetPartitions);
this.endOffsets = consumer.endOffsets(targetPartitions); this.endOffsets = consumer.endOffsets(targetPartitions);
@ -45,7 +47,7 @@ class OffsetsInfo {
}); });
} }
boolean assignedPartitionsFullyPolled() { public boolean assignedPartitionsFullyPolled() {
for (var tp: consumer.assignment()) { for (var tp: consumer.assignment()) {
Preconditions.checkArgument(endOffsets.containsKey(tp)); Preconditions.checkArgument(endOffsets.containsKey(tp));
if (endOffsets.get(tp) > consumer.position(tp)) { if (endOffsets.get(tp) > consumer.position(tp)) {
@ -55,10 +57,8 @@ class OffsetsInfo {
return true; return true;
} }
long summaryOffsetsRange() { public Set<TopicPartition> allTargetPartitions() {
MutableLong cnt = new MutableLong(); return Sets.union(nonEmptyPartitions, emptyPartitions);
nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
return cnt.getValue();
} }
} }

View file

@ -21,20 +21,14 @@ class SeekOperations {
private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions! private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) { static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
OffsetsInfo offsetsInfo; OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty()
if (consumerPosition.partitions().isEmpty()) { ? new OffsetsInfo(consumer, consumerPosition.topic())
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.topic()); : new OffsetsInfo(consumer, consumerPosition.partitions());
} else { var offsetsToSeek = getOffsetsForSeek(consumer, offsetsInfo, consumerPosition);
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.partitions()); return new SeekOperations(consumer, offsetsInfo, offsetsToSeek);
}
return new SeekOperations(
consumer,
offsetsInfo,
getOffsetsForSeek(consumer, offsetsInfo, consumerPosition)
);
} }
void assignAndSeekNonEmptyPartitions() { void assignAndSeek() {
consumer.assign(offsetsForSeek.keySet()); consumer.assign(offsetsForSeek.keySet());
offsetsForSeek.forEach(consumer::seek); offsetsForSeek.forEach(consumer::seek);
} }
@ -43,10 +37,6 @@ class SeekOperations {
return offsetsInfo.getBeginOffsets(); return offsetsInfo.getBeginOffsets();
} }
Map<TopicPartition, Long> getEndOffsets() {
return offsetsInfo.getEndOffsets();
}
boolean assignedPartitionsFullyPolled() { boolean assignedPartitionsFullyPolled() {
return offsetsInfo.assignedPartitionsFullyPolled(); return offsetsInfo.assignedPartitionsFullyPolled();
} }
@ -64,7 +54,9 @@ class SeekOperations {
OffsetsInfo offsetsInfo, OffsetsInfo offsetsInfo,
ConsumerPosition position) { ConsumerPosition position) {
switch (position.pollingMode()) { switch (position.pollingMode()) {
case LATEST, TAILING: case TAILING:
return consumer.endOffsets(offsetsInfo.allTargetPartitions());
case LATEST:
return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions()); return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
case EARLIEST: case EARLIEST:
return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions()); return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());

View file

@ -1,28 +1,26 @@
package com.provectus.kafka.ui.emitter; package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.HashMap;
import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
@Slf4j @Slf4j
public class TailingEmitter extends AbstractEmitter { public class TailingEmitter extends AbstractEmitter {
private final Supplier<EnhancedConsumer> consumerSupplier; private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final ConsumerPosition consumerPosition; private final ConsumerPosition consumerPosition;
public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier, public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
ConsumerPosition consumerPosition, ConsumerPosition consumerPosition,
ConsumerRecordDeserializer deserializer, ConsumerRecordDeserializer recordDeserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) { PollingSettings pollingSettings) {
super(new MessagesProcessing(deserializer, filter, false, null), pollingSettings); super(recordDeserializer, pollingSettings);
this.consumerSupplier = consumerSupplier; this.consumerSupplier = consumerSupplier;
this.consumerPosition = consumerPosition; this.consumerPosition = consumerPosition;
} }
@ -30,12 +28,13 @@ public class TailingEmitter extends AbstractEmitter {
@Override @Override
public void accept(FluxSink<TopicMessageEventDTO> sink) { public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting tailing polling for {}", consumerPosition); log.debug("Starting tailing polling for {}", consumerPosition);
try (EnhancedConsumer consumer = consumerSupplier.get()) { try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
assignAndSeek(consumer); SeekOperations.create(consumer, consumerPosition)
.assignAndSeek();
while (!sink.isCancelled()) { while (!sink.isCancelled()) {
sendPhase(sink, "Polling"); sendPhase(sink, "Polling");
var polled = poll(sink, consumer); var polled = poll(sink, consumer);
send(sink, polled); polled.forEach(r -> sendMessage(sink, r));
} }
sink.complete(); sink.complete();
log.debug("Tailing finished"); log.debug("Tailing finished");
@ -48,12 +47,4 @@ public class TailingEmitter extends AbstractEmitter {
} }
} }
private void assignAndSeek(EnhancedConsumer consumer) {
var seekOperations = SeekOperations.create(consumer, consumerPosition);
var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions
consumer.assign(seekOffsets.keySet());
seekOffsets.forEach(consumer::seek);
}
} }

View file

@ -1,5 +1,8 @@
package com.provectus.kafka.ui.emitter; package com.provectus.kafka.ui.emitter;
import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST;
import static com.provectus.kafka.ui.model.PollingModeDTO.LATEST;
import static com.provectus.kafka.ui.model.PollingModeDTO.TAILING;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.ConsumerPosition;
@ -44,12 +47,22 @@ class SeekOperationsTest {
@Nested @Nested
class GetOffsetsForSeek { class GetOffsetsForSeek {
@Test
void tailing() {
var offsets = SeekOperations.getOffsetsForSeek(
consumer,
new OffsetsInfo(consumer, topic),
new ConsumerPosition(TAILING, topic, List.of(), null, null)
);
assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp0, 0L, tp1, 10L, tp2, 20L, tp3, 30L));
}
@Test @Test
void latest() { void latest() {
var offsets = SeekOperations.getOffsetsForSeek( var offsets = SeekOperations.getOffsetsForSeek(
consumer, consumer,
new OffsetsInfo(consumer, topic), new OffsetsInfo(consumer, topic),
new ConsumerPosition(PollingModeDTO.LATEST, topic, null, null, null) new ConsumerPosition(LATEST, topic, List.of(), null, null)
); );
assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L)); assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L));
} }
@ -59,7 +72,7 @@ class SeekOperationsTest {
var offsets = SeekOperations.getOffsetsForSeek( var offsets = SeekOperations.getOffsetsForSeek(
consumer, consumer,
new OffsetsInfo(consumer, topic), new OffsetsInfo(consumer, topic),
new ConsumerPosition(PollingModeDTO.EARLIEST, topic, null, null, null) new ConsumerPosition(EARLIEST, topic, List.of(), null, null)
); );
assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L)); assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L));
} }