Emitters logic refactoring (#2729)

* Emitters logic refactoring:
1. consumers seeking moved to SeekOperations class
2. offsets info gathering moved to OffsetsInfo class

* wip

* checkstyle fix

* checkstyle fix

* minor improvements

Co-authored-by: iliax <ikuramshin@provectus.com>
This commit is contained in:
Ilya Kuramshin 2022-10-23 19:47:21 +04:00 committed by GitHub
parent ae219de36c
commit 4558466ff6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 462 additions and 674 deletions

View file

@ -5,6 +5,7 @@ import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
import static java.util.stream.Collectors.toMap;
import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
@ -18,6 +19,7 @@ import com.provectus.kafka.ui.service.MessagesService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -63,18 +65,22 @@ public class MessagesController extends AbstractController implements MessagesAp
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
int recordsLimit =
Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
var positions = new ConsumerPosition(
seekType != null ? seekType : SeekTypeDTO.BEGINNING,
parseSeekTo(topicName, seekTo),
seekDirection
seekType,
topicName,
parseSeekTo(topicName, seekType, seekTo)
);
int recordsLimit = Optional.ofNullable(limit)
.map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT))
.orElse(DEFAULT_LOAD_RECORD_LIMIT);
return Mono.just(
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType, recordsLimit, keySerde, valueSerde)
getCluster(clusterName), topicName, positions, q, filterQueryType,
recordsLimit, seekDirection, keySerde, valueSerde)
)
);
}
@ -92,9 +98,13 @@ public class MessagesController extends AbstractController implements MessagesAp
* The format is [partition]::[offset] for specifying offsets
* or [partition]::[timestamp in millis] for specifying timestamps.
*/
private Map<TopicPartition, Long> parseSeekTo(String topic, List<String> seekTo) {
@Nullable
private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
if (seekTo == null || seekTo.isEmpty()) {
return Map.of();
if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
return null;
}
throw new ValidationException("seekTo should be set if seekType is " + seekType);
}
return seekTo.stream()
.map(p -> {

View file

@ -1,21 +1,18 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.util.OffsetsSeekBackward;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
@ -29,65 +26,55 @@ public class BackwardRecordEmitter
private static final Duration POLL_TIMEOUT = Duration.ofMillis(200);
private final Function<Map<String, Object>, KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final OffsetsSeekBackward offsetsSeek;
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;
public BackwardRecordEmitter(
Function<Map<String, Object>, KafkaConsumer<Bytes, Bytes>> consumerSupplier,
OffsetsSeekBackward offsetsSeek,
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
ConsumerRecordDeserializer recordDeserializer) {
super(recordDeserializer);
this.offsetsSeek = offsetsSeek;
this.consumerPosition = consumerPosition;
this.messagesPerPage = messagesPerPage;
this.consumerSupplier = consumerSupplier;
}
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
try (KafkaConsumer<Bytes, Bytes> configConsumer = consumerSupplier.apply(Map.of())) {
final List<TopicPartition> requestedPartitions =
offsetsSeek.getRequestedPartitions(configConsumer);
sendPhase(sink, "Request partitions");
final int msgsPerPartition = offsetsSeek.msgsPerPartition(requestedPartitions.size());
try (KafkaConsumer<Bytes, Bytes> consumer =
consumerSupplier.apply(
Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, msgsPerPartition)
)
) {
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
sendPhase(sink, "Created consumer");
SortedMap<TopicPartition, Long> readUntilOffsets =
new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
readUntilOffsets.putAll(offsetsSeek.getPartitionsOffsets(consumer));
var seekOperations = SeekOperations.create(consumer, consumerPosition);
var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
sendPhase(sink, "Requested partitions offsets");
log.debug("partition offsets: {}", readUntilOffsets);
var waitingOffsets =
offsetsSeek.waitingOffsets(consumer, readUntilOffsets.keySet());
log.debug("waiting offsets {} {}",
waitingOffsets.getBeginOffsets(),
waitingOffsets.getEndOffsets()
);
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
log.debug("'Until' offsets for polling: {}", readUntilOffsets);
while (!sink.isCancelled() && !waitingOffsets.beginReached()) {
while (!sink.isCancelled() && !readUntilOffsets.isEmpty()) {
new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
long lowestOffset = waitingOffsets.getBeginOffsets().get(tp.partition());
long readFromOffset = Math.max(lowestOffset, readToOffset - msgsPerPartition);
if (sink.isCancelled()) {
return; //fast return in case of sink cancellation
}
long beginOffset = seekOperations.getBeginOffsets().get(tp);
long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
.stream()
.filter(r -> !sink.isCancelled())
.forEach(r -> sendMessage(sink, r));
waitingOffsets.markPolled(tp.partition(), readFromOffset);
if (waitingOffsets.getBeginOffsets().get(tp.partition()) == null) {
if (beginOffset == readFromOffset) {
// we fully read this partition -> removing it from polling iterations
readUntilOffsets.remove(tp);
} else {
// updating 'to' offset for next polling iteration
readUntilOffsets.put(tp, readFromOffset);
}
});
if (waitingOffsets.beginReached()) {
if (readUntilOffsets.isEmpty()) {
log.debug("begin reached after partitions poll iteration");
} else if (sink.isCancelled()) {
log.debug("sink is cancelled after partitions poll iteration");
@ -95,14 +82,12 @@ public class BackwardRecordEmitter
}
sink.complete();
log.debug("Polling finished");
}
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
sink.error(e);
}
}
private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
TopicPartition tp,
long fromOffset,

View file

@ -1,8 +1,8 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.util.OffsetsSeek;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -17,34 +17,38 @@ public class ForwardRecordEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final OffsetsSeek offsetsSeek;
private final ConsumerPosition position;
public ForwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
OffsetsSeek offsetsSeek,
ConsumerPosition position,
ConsumerRecordDeserializer recordDeserializer) {
super(recordDeserializer);
this.position = position;
this.consumerSupplier = consumerSupplier;
this.offsetsSeek = offsetsSeek;
}
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
sendPhase(sink, "Assigning partitions");
var waitingOffsets = offsetsSeek.assignAndSeek(consumer);
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();
// we use empty polls counting to verify that topic was fully read
int emptyPolls = 0;
while (!sink.isCancelled() && !waitingOffsets.endReached() && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) {
while (!sink.isCancelled()
&& !seekOperations.assignedPartitionsFullyPolled()
&& emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) {
sendPhase(sink, "Polling");
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
log.info("{} records polled", records.count());
emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0;
for (ConsumerRecord<Bytes, Bytes> msg : records) {
if (!sink.isCancelled() && !waitingOffsets.endReached()) {
if (!sink.isCancelled()) {
sendMessage(sink, msg);
waitingOffsets.markPolled(msg);
} else {
break;
}

View file

@ -0,0 +1,59 @@
package com.provectus.kafka.ui.emitter;
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
@Slf4j
@Getter
public class OffsetsInfo {
private final Consumer<?, ?> consumer;
private final Map<TopicPartition, Long> beginOffsets;
private final Map<TopicPartition, Long> endOffsets;
private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
private final Set<TopicPartition> emptyPartitions = new HashSet<>();
public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
this(consumer,
consumer.partitionsFor(topic).stream()
.map(pi -> new TopicPartition(topic, pi.partition()))
.collect(Collectors.toList())
);
}
public OffsetsInfo(Consumer<?, ?> consumer,
Collection<TopicPartition> targetPartitions) {
this.consumer = consumer;
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
this.endOffsets = consumer.endOffsets(targetPartitions);
endOffsets.forEach((tp, endOffset) -> {
var beginningOffset = beginOffsets.get(tp);
if (endOffset > beginningOffset) {
nonEmptyPartitions.add(tp);
} else {
emptyPartitions.add(tp);
}
});
}
public boolean assignedPartitionsFullyPolled() {
for (var tp: consumer.assignment()) {
Preconditions.checkArgument(endOffsets.containsKey(tp));
if (endOffsets.get(tp) > consumer.position(tp)) {
return false;
}
}
return true;
}
}

View file

@ -0,0 +1,111 @@
package com.provectus.kafka.ui.emitter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
class SeekOperations {
private final Consumer<?, ?> consumer;
private final OffsetsInfo offsetsInfo;
private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
OffsetsInfo offsetsInfo;
if (consumerPosition.getSeekTo() == null) {
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
} else {
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet());
}
return new SeekOperations(
consumer,
offsetsInfo,
getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo())
);
}
void assignAndSeekNonEmptyPartitions() {
consumer.assign(offsetsForSeek.keySet());
offsetsForSeek.forEach(consumer::seek);
}
Map<TopicPartition, Long> getBeginOffsets() {
return offsetsInfo.getBeginOffsets();
}
Map<TopicPartition, Long> getEndOffsets() {
return offsetsInfo.getEndOffsets();
}
boolean assignedPartitionsFullyPolled() {
return offsetsInfo.assignedPartitionsFullyPolled();
}
// Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
Map<TopicPartition, Long> getOffsetsForSeek() {
return offsetsForSeek;
}
/**
* Finds offsets for ConsumerPosition. Note: will return empty map if no offsets found for desired criteria.
*/
@VisibleForTesting
static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
OffsetsInfo offsetsInfo,
SeekTypeDTO seekType,
@Nullable Map<TopicPartition, Long> seekTo) {
switch (seekType) {
case LATEST:
return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
case BEGINNING:
return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
case OFFSET:
Preconditions.checkNotNull(offsetsInfo);
return fixOffsets(offsetsInfo, seekTo);
case TIMESTAMP:
Preconditions.checkNotNull(offsetsInfo);
return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
default:
throw new IllegalStateException();
}
}
private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo, Map<TopicPartition, Long> offsets) {
offsets = new HashMap<>(offsets);
offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
Map<TopicPartition, Long> result = new HashMap<>();
offsets.forEach((tp, targetOffset) -> {
long endOffset = offsetsInfo.getEndOffsets().get(tp);
long beginningOffset = offsetsInfo.getBeginOffsets().get(tp);
// fixing offsets with min - max bounds
if (targetOffset > endOffset) {
targetOffset = endOffset;
} else if (targetOffset < beginningOffset) {
targetOffset = beginningOffset;
}
result.put(tp, targetOffset);
});
return result;
}
private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
Map<TopicPartition, Long> timestamps) {
timestamps = new HashMap<>(timestamps);
timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
return consumer.offsetsForTimes(timestamps).entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
}
}

View file

@ -1,8 +1,9 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.util.OffsetsSeek;
import java.util.HashMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@ -15,21 +16,21 @@ public class TailingEmitter extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final OffsetsSeek offsetsSeek;
private final ConsumerPosition consumerPosition;
public TailingEmitter(ConsumerRecordDeserializer recordDeserializer,
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
OffsetsSeek offsetsSeek) {
public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
ConsumerPosition consumerPosition,
ConsumerRecordDeserializer recordDeserializer) {
super(recordDeserializer);
this.consumerSupplier = consumerSupplier;
this.offsetsSeek = offsetsSeek;
this.consumerPosition = consumerPosition;
}
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
log.debug("Starting topic tailing");
offsetsSeek.assignAndSeek(consumer);
assignAndSeek(consumer);
while (!sink.isCancelled()) {
sendPhase(sink, "Polling");
var polled = poll(sink, consumer);
@ -40,9 +41,17 @@ public class TailingEmitter extends AbstractEmitter
} catch (InterruptException kafkaInterruptException) {
sink.complete();
} catch (Exception e) {
log.error("Error consuming {}", offsetsSeek.getConsumerPosition(), e);
log.error("Error consuming {}", consumerPosition, e);
sink.error(e);
}
}
private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
var seekOperations = SeekOperations.create(consumer, consumerPosition);
var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions
consumer.assign(seekOffsets.keySet());
seekOffsets.forEach(consumer::seek);
}
}

View file

@ -1,12 +1,14 @@
package com.provectus.kafka.ui.model;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.Value;
import org.apache.kafka.common.TopicPartition;
@Value
public class ConsumerPosition {
SeekTypeDTO seekType;
Map<TopicPartition, Long> seekTo;
SeekDirectionDTO seekDirection;
String topic;
@Nullable
Map<TopicPartition, Long> seekTo; // null if positioning should apply to all tps
}

View file

@ -14,12 +14,9 @@ import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.OffsetsSeekBackward;
import com.provectus.kafka.ui.util.OffsetsSeekForward;
import com.provectus.kafka.ui.util.ResultSizeLimiter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
@ -129,58 +126,62 @@ public class MessagesService {
}
public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
ConsumerPosition consumerPosition, String query,
ConsumerPosition consumerPosition,
@Nullable String query,
MessageFilterTypeDTO filterQueryType,
int limit,
SeekDirectionDTO seekDirection,
@Nullable String keySerde,
@Nullable String valueSerde) {
return withExistingTopic(cluster, topic)
.flux()
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
filterQueryType, limit, keySerde, valueSerde));
filterQueryType, limit, seekDirection, keySerde, valueSerde));
}
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
String topic,
ConsumerPosition consumerPosition,
String query,
@Nullable String query,
MessageFilterTypeDTO filterQueryType,
int limit,
SeekDirectionDTO seekDirection,
@Nullable String keySerde,
@Nullable String valueSerde) {
java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
ConsumerRecordDeserializer recordDeserializer =
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
if (consumerPosition.getSeekDirection().equals(SeekDirectionDTO.FORWARD)) {
if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
emitter = new ForwardRecordEmitter(
() -> consumerGroupService.createConsumer(cluster),
new OffsetsSeekForward(topic, consumerPosition),
consumerPosition,
recordDeserializer
);
} else if (consumerPosition.getSeekDirection().equals(SeekDirectionDTO.BACKWARD)) {
} else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
emitter = new BackwardRecordEmitter(
(Map<String, Object> props) -> consumerGroupService.createConsumer(cluster, props),
new OffsetsSeekBackward(topic, consumerPosition, limit),
() -> consumerGroupService.createConsumer(cluster),
consumerPosition,
limit,
recordDeserializer
);
} else {
emitter = new TailingEmitter(
recordDeserializer,
() -> consumerGroupService.createConsumer(cluster),
new OffsetsSeekForward(topic, consumerPosition)
consumerPosition,
recordDeserializer
);
}
return Flux.create(emitter)
.filter(getMsgFilter(query, filterQueryType))
.takeWhile(createTakeWhilePredicate(consumerPosition, limit))
.takeWhile(createTakeWhilePredicate(seekDirection, limit))
.subscribeOn(Schedulers.boundedElastic())
.share();
}
private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
ConsumerPosition consumerPosition, int limit) {
return consumerPosition.getSeekDirection() == SeekDirectionDTO.TAILING
SeekDirectionDTO seekDirection, int limit) {
return seekDirection == SeekDirectionDTO.TAILING
? evt -> true // no limit for tailing
: new ResultSizeLimiter(limit);
}
@ -189,8 +190,6 @@ public class MessagesService {
if (StringUtils.isEmpty(query)) {
return evt -> true;
}
filterQueryType = Optional.ofNullable(filterQueryType)
.orElse(MessageFilterTypeDTO.STRING_CONTAINS);
var messageFilter = MessageFilters.createMsgFilter(query, filterQueryType);
return evt -> {
// we only apply filter for message events

View file

@ -2,12 +2,12 @@ package com.provectus.kafka.ui.service.analyze;
import static com.provectus.kafka.ui.emitter.AbstractEmitter.NO_MORE_DATA_EMPTY_POLLS_COUNT;
import com.provectus.kafka.ui.emitter.OffsetsInfo;
import com.provectus.kafka.ui.exception.TopicAnalysisException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.TopicsService;
import com.provectus.kafka.ui.util.OffsetsSeek.WaitingOffsets;
import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
@ -119,14 +119,14 @@ public class TopicAnalysisService {
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
var waitingOffsets = new WaitingOffsets(topicId.topicName, consumer, topicPartitions);
for (int emptyPolls = 0; !waitingOffsets.endReached() && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) {
var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
for (int emptyPolls = 0; !offsetsInfo.assignedPartitionsFullyPolled()
&& emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) {
var polled = consumer.poll(Duration.ofSeconds(3));
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
polled.forEach(r -> {
totalStats.apply(r);
partitionStats.get(r.partition()).apply(r);
waitingOffsets.markPolled(r);
});
updateProgress();
}

View file

@ -1,143 +0,0 @@
package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j
public abstract class OffsetsSeek {
protected final String topic;
protected final ConsumerPosition consumerPosition;
protected OffsetsSeek(String topic, ConsumerPosition consumerPosition) {
this.topic = topic;
this.consumerPosition = consumerPosition;
}
public ConsumerPosition getConsumerPosition() {
return consumerPosition;
}
public Map<TopicPartition, Long> getPartitionsOffsets(Consumer<Bytes, Bytes> consumer) {
SeekTypeDTO seekType = consumerPosition.getSeekType();
List<TopicPartition> partitions = getRequestedPartitions(consumer);
log.info("Positioning consumer for topic {} with {}", topic, consumerPosition);
Map<TopicPartition, Long> offsets;
switch (seekType) {
case OFFSET:
offsets = offsetsFromPositions(consumer, partitions);
break;
case TIMESTAMP:
offsets = offsetsForTimestamp(consumer);
break;
case BEGINNING:
offsets = offsetsFromBeginning(consumer, partitions);
break;
case LATEST:
offsets = endOffsets(consumer, partitions);
break;
default:
throw new IllegalArgumentException("Unknown seekType: " + seekType);
}
return offsets;
}
public WaitingOffsets waitingOffsets(Consumer<Bytes, Bytes> consumer,
Collection<TopicPartition> partitions) {
return new WaitingOffsets(topic, consumer, partitions);
}
public WaitingOffsets assignAndSeek(Consumer<Bytes, Bytes> consumer) {
final Map<TopicPartition, Long> partitionsOffsets = getPartitionsOffsets(consumer);
consumer.assign(partitionsOffsets.keySet());
partitionsOffsets.forEach(consumer::seek);
log.info("Assignment: {}", consumer.assignment());
return waitingOffsets(consumer, partitionsOffsets.keySet());
}
public List<TopicPartition> getRequestedPartitions(Consumer<Bytes, Bytes> consumer) {
Map<TopicPartition, Long> partitionPositions = consumerPosition.getSeekTo();
return consumer.partitionsFor(topic).stream()
.filter(
p -> partitionPositions.isEmpty()
|| partitionPositions.containsKey(new TopicPartition(p.topic(), p.partition()))
).map(p -> new TopicPartition(p.topic(), p.partition()))
.collect(Collectors.toList());
}
protected Map<TopicPartition, Long> endOffsets(
Consumer<Bytes, Bytes> consumer, List<TopicPartition> partitions) {
return consumer.endOffsets(partitions);
}
protected abstract Map<TopicPartition, Long> offsetsFromBeginning(
Consumer<Bytes, Bytes> consumer, List<TopicPartition> partitions);
protected abstract Map<TopicPartition, Long> offsetsForTimestamp(
Consumer<Bytes, Bytes> consumer);
protected abstract Map<TopicPartition, Long> offsetsFromPositions(
Consumer<Bytes, Bytes> consumer, List<TopicPartition> partitions);
public static class WaitingOffsets {
private final Map<Integer, Long> endOffsets; // partition number -> offset
private final Map<Integer, Long> beginOffsets; // partition number -> offset
public WaitingOffsets(String topic, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
var allBeginningOffsets = consumer.beginningOffsets(partitions);
var allEndOffsets = consumer.endOffsets(partitions);
this.endOffsets = allEndOffsets.entrySet().stream()
.filter(entry -> !allBeginningOffsets.get(entry.getKey()).equals(entry.getValue()))
.map(e -> Tuples.of(e.getKey().partition(), e.getValue() - 1))
.collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
this.beginOffsets = this.endOffsets.keySet().stream()
.map(p -> Tuples.of(p, allBeginningOffsets.get(new TopicPartition(topic, p))))
.collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
}
public void markPolled(ConsumerRecord<?, ?> rec) {
markPolled(rec.partition(), rec.offset());
}
public void markPolled(int partition, long offset) {
Long endWaiting = endOffsets.get(partition);
if (endWaiting != null && endWaiting <= offset) {
endOffsets.remove(partition);
}
Long beginWaiting = beginOffsets.get(partition);
if (beginWaiting != null && beginWaiting >= offset) {
beginOffsets.remove(partition);
}
}
public boolean endReached() {
return endOffsets.isEmpty();
}
public boolean beginReached() {
return beginOffsets.isEmpty();
}
public Map<Integer, Long> getEndOffsets() {
return endOffsets;
}
public Map<Integer, Long> getBeginOffsets() {
return beginOffsets;
}
}
}

View file

@ -1,120 +0,0 @@
package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.model.ConsumerPosition;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j
public class OffsetsSeekBackward extends OffsetsSeek {
private final int maxMessages;
public OffsetsSeekBackward(String topic,
ConsumerPosition consumerPosition, int maxMessages) {
super(topic, consumerPosition);
this.maxMessages = maxMessages;
}
public int msgsPerPartition(int partitionsSize) {
return msgsPerPartition(maxMessages, partitionsSize);
}
public int msgsPerPartition(long awaitingMessages, int partitionsSize) {
return (int) Math.ceil((double) awaitingMessages / partitionsSize);
}
protected Map<TopicPartition, Long> offsetsFromPositions(Consumer<Bytes, Bytes> consumer,
List<TopicPartition> partitions) {
return findOffsetsInt(consumer, consumerPosition.getSeekTo(), partitions);
}
protected Map<TopicPartition, Long> offsetsFromBeginning(Consumer<Bytes, Bytes> consumer,
List<TopicPartition> partitions) {
return findOffsets(consumer, Map.of(), partitions);
}
protected Map<TopicPartition, Long> offsetsForTimestamp(Consumer<Bytes, Bytes> consumer) {
Map<TopicPartition, Long> timestampsToSearch =
consumerPosition.getSeekTo().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));
Map<TopicPartition, Long> offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch)
.entrySet().stream()
.filter(e -> e.getValue() != null)
.map(v -> Tuples.of(v.getKey(), v.getValue().offset()))
.collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
if (offsetsForTimestamps.isEmpty()) {
throw new IllegalArgumentException("No offsets were found for requested timestamps");
}
log.info("Timestamps: {} to offsets: {}", timestampsToSearch, offsetsForTimestamps);
return findOffsets(consumer, offsetsForTimestamps, offsetsForTimestamps.keySet());
}
protected Map<TopicPartition, Long> findOffsetsInt(
Consumer<Bytes, Bytes> consumer, Map<TopicPartition, Long> seekTo,
List<TopicPartition> partitions) {
return findOffsets(consumer, seekTo, partitions);
}
protected Map<TopicPartition, Long> findOffsets(
Consumer<Bytes, Bytes> consumer, Map<TopicPartition, Long> seekTo,
Collection<TopicPartition> partitions) {
final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
final Map<TopicPartition, Long> seekMap = new HashMap<>();
final Set<TopicPartition> emptyPartitions = new HashSet<>();
for (Map.Entry<TopicPartition, Long> entry : seekTo.entrySet()) {
final Long endOffset = endOffsets.get(entry.getKey());
final Long beginningOffset = beginningOffsets.get(entry.getKey());
if (beginningOffset != null
&& endOffset != null
&& beginningOffset < endOffset
&& entry.getValue() > beginningOffset
) {
final Long value;
if (entry.getValue() > endOffset) {
value = endOffset;
} else {
value = entry.getValue();
}
seekMap.put(entry.getKey(), value);
} else {
emptyPartitions.add(entry.getKey());
}
}
Set<TopicPartition> waiting = new HashSet<>(partitions);
waiting.removeAll(emptyPartitions);
waiting.removeAll(seekMap.keySet());
for (TopicPartition topicPartition : waiting) {
seekMap.put(topicPartition, endOffsets.get(topicPartition));
}
return seekMap;
}
}

View file

@ -1,61 +0,0 @@
package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.model.ConsumerPosition;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
@Slf4j
public class OffsetsSeekForward extends OffsetsSeek {
public OffsetsSeekForward(String topic, ConsumerPosition consumerPosition) {
super(topic, consumerPosition);
}
protected Map<TopicPartition, Long> offsetsFromPositions(Consumer<Bytes, Bytes> consumer,
List<TopicPartition> partitions) {
final Map<TopicPartition, Long> offsets =
offsetsFromBeginning(consumer, partitions);
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(offsets.keySet());
final Set<TopicPartition> set = new HashSet<>(consumerPosition.getSeekTo().keySet());
final Map<TopicPartition, Long> collect = consumerPosition.getSeekTo().entrySet().stream()
.filter(e -> e.getValue() < endOffsets.get(e.getKey()))
.filter(e -> endOffsets.get(e.getKey()) > offsets.get(e.getKey()))
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));
offsets.putAll(collect);
set.removeAll(collect.keySet());
set.forEach(offsets::remove);
return offsets;
}
protected Map<TopicPartition, Long> offsetsForTimestamp(Consumer<Bytes, Bytes> consumer) {
Map<TopicPartition, Long> offsetsForTimestamps =
consumer.offsetsForTimes(consumerPosition.getSeekTo())
.entrySet().stream()
.filter(e -> e.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
if (offsetsForTimestamps.isEmpty()) {
throw new IllegalArgumentException("No offsets were found for requested timestamps");
}
return offsetsForTimestamps;
}
protected Map<TopicPartition, Long> offsetsFromBeginning(Consumer<Bytes, Bytes> consumer,
List<TopicPartition> partitions) {
return consumer.beginningOffsets(partitions);
}
}

View file

@ -0,0 +1,53 @@
package com.provectus.kafka.ui.emitter;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class OffsetsInfoTest {
final String topic = "test";
final TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0
final TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10
final TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20
final TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30
MockConsumer<Bytes, Bytes> consumer;
@BeforeEach
void initMockConsumer() {
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.updatePartitions(
topic,
Stream.of(tp0, tp1, tp2, tp3)
.map(tp -> new PartitionInfo(topic, tp.partition(), null, null, null, null))
.collect(Collectors.toList()));
consumer.updateBeginningOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 0L, tp3, 25L));
consumer.updateEndOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 20L, tp3, 30L));
}
@Test
void fillsInnerFieldsAccordingToTopicState() {
var offsets = new OffsetsInfo(consumer, List.of(tp0, tp1, tp2, tp3));
assertThat(offsets.getBeginOffsets()).containsEntry(tp0, 0L).containsEntry(tp1, 10L).containsEntry(tp2, 0L)
.containsEntry(tp3, 25L);
assertThat(offsets.getEndOffsets()).containsEntry(tp0, 0L).containsEntry(tp1, 10L).containsEntry(tp2, 20L)
.containsEntry(tp3, 30L);
assertThat(offsets.getEmptyPartitions()).contains(tp0, tp1);
assertThat(offsets.getNonEmptyPartitions()).contains(tp2, tp3);
}
}

View file

@ -0,0 +1,88 @@
package com.provectus.kafka.ui.emitter;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
class SeekOperationsTest {
final String topic = "test";
final TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0
final TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10
final TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20
final TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30
MockConsumer<Bytes, Bytes> consumer;
@BeforeEach
void initMockConsumer() {
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.updatePartitions(
topic,
Stream.of(tp0, tp1, tp2, tp3)
.map(tp -> new PartitionInfo(topic, tp.partition(), null, null, null, null))
.collect(Collectors.toList()));
consumer.updateBeginningOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 0L, tp3, 25L));
consumer.updateEndOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 20L, tp3, 30L));
}
@Nested
class GetOffsetsForSeek {
@Test
void latest() {
var offsets = SeekOperations.getOffsetsForSeek(
consumer,
new OffsetsInfo(consumer, topic),
SeekTypeDTO.LATEST,
null
);
assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L));
}
@Test
void beginning() {
var offsets = SeekOperations.getOffsetsForSeek(
consumer,
new OffsetsInfo(consumer, topic),
SeekTypeDTO.BEGINNING,
null
);
assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L));
}
@Test
void offsets() {
var offsets = SeekOperations.getOffsetsForSeek(
consumer,
new OffsetsInfo(consumer, topic),
SeekTypeDTO.OFFSET,
Map.of(tp1, 10L, tp2, 10L, tp3, 26L)
);
assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 10L, tp3, 26L));
}
@Test
void offsetsWithBoundsFixing() {
var offsets = SeekOperations.getOffsetsForSeek(
consumer,
new OffsetsInfo(consumer, topic),
SeekTypeDTO.OFFSET,
Map.of(tp1, 10L, tp2, 21L, tp3, 24L)
);
assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 25L));
}
}
}

View file

@ -111,10 +111,11 @@ class TailingEmitterTest extends AbstractIntegrationTest {
return applicationContext.getBean(MessagesService.class)
.loadMessages(cluster, topicName,
new ConsumerPosition(SeekTypeDTO.LATEST, Map.of(), SeekDirectionDTO.TAILING),
new ConsumerPosition(SeekTypeDTO.LATEST, topic, null),
query,
MessageFilterTypeDTO.STRING_CONTAINS,
0,
SeekDirectionDTO.TAILING,
"String",
"String");
}
@ -137,7 +138,7 @@ class TailingEmitterTest extends AbstractIntegrationTest {
Awaitility.await()
.pollInSameThread()
.pollDelay(Duration.ofMillis(100))
.atMost(Duration.ofSeconds(10))
.atMost(Duration.ofSeconds(200))
.until(() -> fluxOutput.stream()
.anyMatch(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.CONSUMING));
}

View file

@ -45,7 +45,7 @@ class MessagesServiceTest extends AbstractIntegrationTest {
@Test
void loadMessagesReturnsExceptionWhenTopicNotFound() {
StepVerifier.create(messagesService
.loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, "String", "String"))
.loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String"))
.expectError(TopicNotFoundException.class)
.verify();
}

View file

@ -1,8 +1,7 @@
package com.provectus.kafka.ui.service;
import static com.provectus.kafka.ui.model.SeekDirectionDTO.BACKWARD;
import static com.provectus.kafka.ui.model.SeekDirectionDTO.FORWARD;
import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
import static com.provectus.kafka.ui.model.SeekTypeDTO.LATEST;
import static com.provectus.kafka.ui.model.SeekTypeDTO.OFFSET;
import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP;
import static org.assertj.core.api.Assertions.assertThat;
@ -17,8 +16,6 @@ import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
import com.provectus.kafka.ui.util.OffsetsSeekBackward;
import com.provectus.kafka.ui.util.OffsetsSeekForward;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@ -112,18 +109,15 @@ class RecordEmitterTest extends AbstractIntegrationTest {
void pollNothingOnEmptyTopic() {
var forwardEmitter = new ForwardRecordEmitter(
this::createConsumer,
new OffsetsSeekForward(EMPTY_TOPIC,
new ConsumerPosition(BEGINNING, Map.of(), FORWARD)
), RECORD_DESERIALIZER
new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
RECORD_DESERIALIZER
);
var backwardEmitter = new BackwardRecordEmitter(
this::createConsumer,
new OffsetsSeekBackward(
EMPTY_TOPIC,
new ConsumerPosition(BEGINNING, Map.of(), BACKWARD),
100
), RECORD_DESERIALIZER
new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
100,
RECORD_DESERIALIZER
);
StepVerifier.create(
@ -143,17 +137,15 @@ class RecordEmitterTest extends AbstractIntegrationTest {
void pollFullTopicFromBeginning() {
var forwardEmitter = new ForwardRecordEmitter(
this::createConsumer,
new OffsetsSeekForward(TOPIC,
new ConsumerPosition(BEGINNING, Map.of(), FORWARD)
), RECORD_DESERIALIZER
new ConsumerPosition(BEGINNING, TOPIC, null),
RECORD_DESERIALIZER
);
var backwardEmitter = new BackwardRecordEmitter(
this::createConsumer,
new OffsetsSeekBackward(TOPIC,
new ConsumerPosition(BEGINNING, Map.of(), BACKWARD),
PARTITIONS * MSGS_PER_PARTITION
), RECORD_DESERIALIZER
new ConsumerPosition(LATEST, TOPIC, null),
PARTITIONS * MSGS_PER_PARTITION,
RECORD_DESERIALIZER
);
List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
@ -172,17 +164,15 @@ class RecordEmitterTest extends AbstractIntegrationTest {
var forwardEmitter = new ForwardRecordEmitter(
this::createConsumer,
new OffsetsSeekForward(TOPIC,
new ConsumerPosition(OFFSET, targetOffsets, FORWARD)
), RECORD_DESERIALIZER
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
RECORD_DESERIALIZER
);
var backwardEmitter = new BackwardRecordEmitter(
this::createConsumer,
new OffsetsSeekBackward(TOPIC,
new ConsumerPosition(OFFSET, targetOffsets, BACKWARD),
PARTITIONS * MSGS_PER_PARTITION
), RECORD_DESERIALIZER
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
PARTITIONS * MSGS_PER_PARTITION,
RECORD_DESERIALIZER
);
var expectedValues = SENT_RECORDS.stream()
@ -217,17 +207,15 @@ class RecordEmitterTest extends AbstractIntegrationTest {
var forwardEmitter = new ForwardRecordEmitter(
this::createConsumer,
new OffsetsSeekForward(TOPIC,
new ConsumerPosition(TIMESTAMP, targetTimestamps, FORWARD)
), RECORD_DESERIALIZER
new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
RECORD_DESERIALIZER
);
var backwardEmitter = new BackwardRecordEmitter(
this::createConsumer,
new OffsetsSeekBackward(TOPIC,
new ConsumerPosition(TIMESTAMP, targetTimestamps, BACKWARD),
PARTITIONS * MSGS_PER_PARTITION
), RECORD_DESERIALIZER
new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
PARTITIONS * MSGS_PER_PARTITION,
RECORD_DESERIALIZER
);
var expectedValues = SENT_RECORDS.stream()
@ -255,10 +243,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
var backwardEmitter = new BackwardRecordEmitter(
this::createConsumer,
new OffsetsSeekBackward(TOPIC,
new ConsumerPosition(OFFSET, targetOffsets, BACKWARD),
numMessages
), RECORD_DESERIALIZER
new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
numMessages,
RECORD_DESERIALIZER
);
var expectedValues = SENT_RECORDS.stream()
@ -281,10 +268,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
var backwardEmitter = new BackwardRecordEmitter(
this::createConsumer,
new OffsetsSeekBackward(TOPIC,
new ConsumerPosition(OFFSET, offsets, BACKWARD),
100
), RECORD_DESERIALIZER
new ConsumerPosition(OFFSET, TOPIC, offsets),
100,
RECORD_DESERIALIZER
);
expectEmitter(backwardEmitter,
@ -331,7 +317,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
final Map<String, ? extends Serializable> map = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20, // to check multiple polls
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
);

View file

@ -502,12 +502,13 @@ public class SendAndReadTests extends AbstractIntegrationTest {
topic,
new ConsumerPosition(
SeekTypeDTO.BEGINNING,
Map.of(new TopicPartition(topic, 0), 0L),
SeekDirectionDTO.FORWARD
topic,
Map.of(new TopicPartition(topic, 0), 0L)
),
null,
null,
1,
SeekDirectionDTO.FORWARD,
msgToSend.getKeySerde().get(),
msgToSend.getValueSerde().get()
).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))

View file

@ -1,196 +0,0 @@
package com.provectus.kafka.ui.util;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
class OffsetsSeekTest {
final String topic = "test";
final TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0
final TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10
final TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20
final TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30
MockConsumer<Bytes, Bytes> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@BeforeEach
void initConsumer() {
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.updatePartitions(
topic,
Stream.of(tp0, tp1, tp2, tp3)
.map(tp -> new PartitionInfo(topic, tp.partition(), null, null, null, null))
.collect(Collectors.toList()));
consumer.updateBeginningOffsets(Map.of(
tp0, 0L,
tp1, 10L,
tp2, 0L,
tp3, 25L
));
consumer.updateEndOffsets(Map.of(
tp0, 0L,
tp1, 10L,
tp2, 20L,
tp3, 30L
));
}
@Test
void forwardSeekToBeginningAllPartitions() {
var seek = new OffsetsSeekForward(
topic,
new ConsumerPosition(
SeekTypeDTO.BEGINNING,
Map.of(tp0, 0L, tp1, 0L),
SeekDirectionDTO.FORWARD
)
);
seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1);
assertThat(consumer.position(tp0)).isZero();
assertThat(consumer.position(tp1)).isEqualTo(10L);
}
@Test
void backwardSeekToBeginningAllPartitions() {
var seek = new OffsetsSeekBackward(
topic,
new ConsumerPosition(
SeekTypeDTO.BEGINNING,
Map.of(tp2, 0L, tp3, 0L),
SeekDirectionDTO.BACKWARD
),
10
);
seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2, tp3);
assertThat(consumer.position(tp2)).isEqualTo(20L);
assertThat(consumer.position(tp3)).isEqualTo(30L);
}
@Test
void forwardSeekToBeginningWithPartitionsList() {
var seek = new OffsetsSeekForward(
topic,
new ConsumerPosition(SeekTypeDTO.BEGINNING, Map.of(), SeekDirectionDTO.FORWARD));
seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3);
assertThat(consumer.position(tp0)).isZero();
assertThat(consumer.position(tp1)).isEqualTo(10L);
assertThat(consumer.position(tp2)).isZero();
assertThat(consumer.position(tp3)).isEqualTo(25L);
}
@Test
void backwardSeekToBeginningWithPartitionsList() {
var seek = new OffsetsSeekBackward(
topic,
new ConsumerPosition(SeekTypeDTO.BEGINNING, Map.of(), SeekDirectionDTO.BACKWARD),
10
);
seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3);
assertThat(consumer.position(tp0)).isZero();
assertThat(consumer.position(tp1)).isEqualTo(10L);
assertThat(consumer.position(tp2)).isEqualTo(20L);
assertThat(consumer.position(tp3)).isEqualTo(30L);
}
@Test
void forwardSeekToOffset() {
var seek = new OffsetsSeekForward(
topic,
new ConsumerPosition(
SeekTypeDTO.OFFSET,
Map.of(tp0, 0L, tp1, 1L, tp2, 2L),
SeekDirectionDTO.FORWARD
)
);
seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2);
assertThat(consumer.position(tp2)).isEqualTo(2L);
}
@Test
void backwardSeekToOffset() {
var seek = new OffsetsSeekBackward(
topic,
new ConsumerPosition(
SeekTypeDTO.OFFSET,
Map.of(tp0, 0L, tp1, 1L, tp2, 20L),
SeekDirectionDTO.BACKWARD
),
2
);
seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2);
assertThat(consumer.position(tp2)).isEqualTo(20L);
}
@Test
void backwardSeekToOffsetOnlyOnePartition() {
var seek = new OffsetsSeekBackward(
topic,
new ConsumerPosition(
SeekTypeDTO.OFFSET,
Map.of(tp2, 20L),
SeekDirectionDTO.BACKWARD
),
20
);
seek.assignAndSeek(consumer);
assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2);
assertThat(consumer.position(tp2)).isEqualTo(20L);
}
@Nested
class WaitingOffsetsTest {
OffsetsSeekForward.WaitingOffsets offsets;
@BeforeEach
void assignAndCreateOffsets() {
consumer.assign(List.of(tp0, tp1, tp2, tp3));
offsets = new OffsetsSeek.WaitingOffsets(topic, consumer, List.of(tp0, tp1, tp2, tp3));
}
@Test
void collectsSignificantOffsetsMinus1ForAssignedPartitions() {
// offsets for partition 0 & 1 should be skipped because they
// effectively contains no data (start offset = end offset)
assertThat(offsets.getEndOffsets()).containsExactlyInAnyOrderEntriesOf(
Map.of(2, 19L, 3, 29L)
);
}
@Test
void returnTrueWhenOffsetsReachedReached() {
assertThat(offsets.endReached()).isFalse();
offsets.markPolled(new ConsumerRecord<>(topic, 2, 19, null, null));
assertThat(offsets.endReached()).isFalse();
offsets.markPolled(new ConsumerRecord<>(topic, 3, 29, null, null));
assertThat(offsets.endReached()).isTrue();
}
}
}