From 4558466ff6aec1bd4f26ab4dc8b8b85a52345ba3 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Sun, 23 Oct 2022 19:47:21 +0400 Subject: [PATCH] Emitters logic refactoring (#2729) * Emitters logic refactoring: 1. consumers seeking moved to SeekOperations class 2. offsets info gathering moved to OffsetsInfo class * wip * checkstyle fix * checkstyle fix * minor improvements Co-authored-by: iliax --- .../ui/controller/MessagesController.java | 28 ++- .../ui/emitter/BackwardRecordEmitter.java | 101 ++++----- .../ui/emitter/ForwardRecordEmitter.java | 20 +- .../kafka/ui/emitter/OffsetsInfo.java | 59 ++++++ .../kafka/ui/emitter/SeekOperations.java | 111 ++++++++++ .../kafka/ui/emitter/TailingEmitter.java | 25 ++- .../kafka/ui/model/ConsumerPosition.java | 6 +- .../kafka/ui/service/MessagesService.java | 35 ++-- .../service/analyze/TopicAnalysisService.java | 8 +- .../provectus/kafka/ui/util/OffsetsSeek.java | 143 ------------- .../kafka/ui/util/OffsetsSeekBackward.java | 120 ----------- .../kafka/ui/util/OffsetsSeekForward.java | 61 ------ .../kafka/ui/emitter/OffsetsInfoTest.java | 53 +++++ .../kafka/ui/emitter/SeekOperationsTest.java | 88 ++++++++ .../kafka/ui/emitter/TailingEmitterTest.java | 5 +- .../kafka/ui/service/MessagesServiceTest.java | 2 +- .../kafka/ui/service/RecordEmitterTest.java | 70 +++---- .../kafka/ui/service/SendAndReadTests.java | 5 +- .../kafka/ui/util/OffsetsSeekTest.java | 196 ------------------ 19 files changed, 462 insertions(+), 674 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/OffsetsInfoTest.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java delete mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index fab4bcfd5c..79ae59c3b3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -5,6 +5,7 @@ import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE; import static java.util.stream.Collectors.toMap; import com.provectus.kafka.ui.api.MessagesApi; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; import com.provectus.kafka.ui.model.MessageFilterTypeDTO; @@ -18,6 +19,7 @@ import com.provectus.kafka.ui.service.MessagesService; import java.util.List; import java.util.Map; import java.util.Optional; +import javax.annotation.Nullable; import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -63,18 +65,22 @@ public class MessagesController extends AbstractController implements MessagesAp String keySerde, String valueSerde, ServerWebExchange exchange) { + seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING; + seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD; + filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS; + int recordsLimit = + Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT); + var positions = new ConsumerPosition( - seekType != null ? seekType : SeekTypeDTO.BEGINNING, - parseSeekTo(topicName, seekTo), - seekDirection + seekType, + topicName, + parseSeekTo(topicName, seekType, seekTo) ); - int recordsLimit = Optional.ofNullable(limit) - .map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)) - .orElse(DEFAULT_LOAD_RECORD_LIMIT); return Mono.just( ResponseEntity.ok( messagesService.loadMessages( - getCluster(clusterName), topicName, positions, q, filterQueryType, recordsLimit, keySerde, valueSerde) + getCluster(clusterName), topicName, positions, q, filterQueryType, + recordsLimit, seekDirection, keySerde, valueSerde) ) ); } @@ -92,9 +98,13 @@ public class MessagesController extends AbstractController implements MessagesAp * The format is [partition]::[offset] for specifying offsets * or [partition]::[timestamp in millis] for specifying timestamps. */ - private Map parseSeekTo(String topic, List seekTo) { + @Nullable + private Map parseSeekTo(String topic, SeekTypeDTO seekType, List seekTo) { if (seekTo == null || seekTo.isEmpty()) { - return Map.of(); + if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) { + return null; + } + throw new ValidationException("seekTo should be set if seekType is " + seekType); } return seekTo.stream() .map(p -> { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java index 59db425b33..d2012355db 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -1,21 +1,18 @@ package com.provectus.kafka.ui.emitter; +import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; -import com.provectus.kafka.ui.util.OffsetsSeekBackward; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Map; -import java.util.SortedMap; import java.util.TreeMap; -import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; @@ -29,80 +26,68 @@ public class BackwardRecordEmitter private static final Duration POLL_TIMEOUT = Duration.ofMillis(200); - private final Function, KafkaConsumer> consumerSupplier; - private final OffsetsSeekBackward offsetsSeek; + private final Supplier> consumerSupplier; + private final ConsumerPosition consumerPosition; + private final int messagesPerPage; public BackwardRecordEmitter( - Function, KafkaConsumer> consumerSupplier, - OffsetsSeekBackward offsetsSeek, + Supplier> consumerSupplier, + ConsumerPosition consumerPosition, + int messagesPerPage, ConsumerRecordDeserializer recordDeserializer) { super(recordDeserializer); - this.offsetsSeek = offsetsSeek; + this.consumerPosition = consumerPosition; + this.messagesPerPage = messagesPerPage; this.consumerSupplier = consumerSupplier; } @Override public void accept(FluxSink sink) { - try (KafkaConsumer configConsumer = consumerSupplier.apply(Map.of())) { - final List requestedPartitions = - offsetsSeek.getRequestedPartitions(configConsumer); - sendPhase(sink, "Request partitions"); - final int msgsPerPartition = offsetsSeek.msgsPerPartition(requestedPartitions.size()); - try (KafkaConsumer consumer = - consumerSupplier.apply( - Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, msgsPerPartition) - ) - ) { - sendPhase(sink, "Created consumer"); + try (KafkaConsumer consumer = consumerSupplier.get()) { + sendPhase(sink, "Created consumer"); - SortedMap readUntilOffsets = - new TreeMap<>(Comparator.comparingInt(TopicPartition::partition)); - readUntilOffsets.putAll(offsetsSeek.getPartitionsOffsets(consumer)); + var seekOperations = SeekOperations.create(consumer, consumerPosition); + var readUntilOffsets = new TreeMap(Comparator.comparingInt(TopicPartition::partition)); + readUntilOffsets.putAll(seekOperations.getOffsetsForSeek()); - sendPhase(sink, "Requested partitions offsets"); - log.debug("partition offsets: {}", readUntilOffsets); - var waitingOffsets = - offsetsSeek.waitingOffsets(consumer, readUntilOffsets.keySet()); - log.debug("waiting offsets {} {}", - waitingOffsets.getBeginOffsets(), - waitingOffsets.getEndOffsets() - ); + int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size()); + log.debug("'Until' offsets for polling: {}", readUntilOffsets); - while (!sink.isCancelled() && !waitingOffsets.beginReached()) { - new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> { - long lowestOffset = waitingOffsets.getBeginOffsets().get(tp.partition()); - long readFromOffset = Math.max(lowestOffset, readToOffset - msgsPerPartition); - - partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink) - .stream() - .filter(r -> !sink.isCancelled()) - .forEach(r -> sendMessage(sink, r)); - - waitingOffsets.markPolled(tp.partition(), readFromOffset); - if (waitingOffsets.getBeginOffsets().get(tp.partition()) == null) { - // we fully read this partition -> removing it from polling iterations - readUntilOffsets.remove(tp); - } else { - readUntilOffsets.put(tp, readFromOffset); - } - }); - - if (waitingOffsets.beginReached()) { - log.debug("begin reached after partitions poll iteration"); - } else if (sink.isCancelled()) { - log.debug("sink is cancelled after partitions poll iteration"); + while (!sink.isCancelled() && !readUntilOffsets.isEmpty()) { + new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> { + if (sink.isCancelled()) { + return; //fast return in case of sink cancellation } + long beginOffset = seekOperations.getBeginOffsets().get(tp); + long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition); + + partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink) + .stream() + .filter(r -> !sink.isCancelled()) + .forEach(r -> sendMessage(sink, r)); + + if (beginOffset == readFromOffset) { + // we fully read this partition -> removing it from polling iterations + readUntilOffsets.remove(tp); + } else { + // updating 'to' offset for next polling iteration + readUntilOffsets.put(tp, readFromOffset); + } + }); + if (readUntilOffsets.isEmpty()) { + log.debug("begin reached after partitions poll iteration"); + } else if (sink.isCancelled()) { + log.debug("sink is cancelled after partitions poll iteration"); } - sink.complete(); - log.debug("Polling finished"); } + sink.complete(); + log.debug("Polling finished"); } catch (Exception e) { log.error("Error occurred while consuming records", e); sink.error(e); } } - private List> partitionPollIteration( TopicPartition tp, long fromOffset, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java index bc636cab7b..69d9801b70 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java @@ -1,8 +1,8 @@ package com.provectus.kafka.ui.emitter; +import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; -import com.provectus.kafka.ui.util.OffsetsSeek; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -17,34 +17,38 @@ public class ForwardRecordEmitter implements java.util.function.Consumer> { private final Supplier> consumerSupplier; - private final OffsetsSeek offsetsSeek; + private final ConsumerPosition position; public ForwardRecordEmitter( Supplier> consumerSupplier, - OffsetsSeek offsetsSeek, + ConsumerPosition position, ConsumerRecordDeserializer recordDeserializer) { super(recordDeserializer); + this.position = position; this.consumerSupplier = consumerSupplier; - this.offsetsSeek = offsetsSeek; } @Override public void accept(FluxSink sink) { try (KafkaConsumer consumer = consumerSupplier.get()) { sendPhase(sink, "Assigning partitions"); - var waitingOffsets = offsetsSeek.assignAndSeek(consumer); + var seekOperations = SeekOperations.create(consumer, position); + seekOperations.assignAndSeekNonEmptyPartitions(); + // we use empty polls counting to verify that topic was fully read int emptyPolls = 0; - while (!sink.isCancelled() && !waitingOffsets.endReached() && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) { + while (!sink.isCancelled() + && !seekOperations.assignedPartitionsFullyPolled() + && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) { + sendPhase(sink, "Polling"); ConsumerRecords records = poll(sink, consumer); log.info("{} records polled", records.count()); emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0; for (ConsumerRecord msg : records) { - if (!sink.isCancelled() && !waitingOffsets.endReached()) { + if (!sink.isCancelled()) { sendMessage(sink, msg); - waitingOffsets.markPolled(msg); } else { break; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java new file mode 100644 index 0000000000..1b1381ea70 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java @@ -0,0 +1,59 @@ +package com.provectus.kafka.ui.emitter; + +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; + +@Slf4j +@Getter +public class OffsetsInfo { + + private final Consumer consumer; + + private final Map beginOffsets; + private final Map endOffsets; + + private final Set nonEmptyPartitions = new HashSet<>(); + private final Set emptyPartitions = new HashSet<>(); + + public OffsetsInfo(Consumer consumer, String topic) { + this(consumer, + consumer.partitionsFor(topic).stream() + .map(pi -> new TopicPartition(topic, pi.partition())) + .collect(Collectors.toList()) + ); + } + + public OffsetsInfo(Consumer consumer, + Collection targetPartitions) { + this.consumer = consumer; + this.beginOffsets = consumer.beginningOffsets(targetPartitions); + this.endOffsets = consumer.endOffsets(targetPartitions); + endOffsets.forEach((tp, endOffset) -> { + var beginningOffset = beginOffsets.get(tp); + if (endOffset > beginningOffset) { + nonEmptyPartitions.add(tp); + } else { + emptyPartitions.add(tp); + } + }); + } + + public boolean assignedPartitionsFullyPolled() { + for (var tp: consumer.assignment()) { + Preconditions.checkArgument(endOffsets.containsKey(tp)); + if (endOffsets.get(tp) > consumer.position(tp)) { + return false; + } + } + return true; + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java new file mode 100644 index 0000000000..014b120757 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java @@ -0,0 +1,111 @@ +package com.provectus.kafka.ui.emitter; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.SeekTypeDTO; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; + +@RequiredArgsConstructor(access = AccessLevel.PACKAGE) +class SeekOperations { + + private final Consumer consumer; + private final OffsetsInfo offsetsInfo; + private final Map offsetsForSeek; //only contains non-empty partitions! + + static SeekOperations create(Consumer consumer, ConsumerPosition consumerPosition) { + OffsetsInfo offsetsInfo; + if (consumerPosition.getSeekTo() == null) { + offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic()); + } else { + offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet()); + } + return new SeekOperations( + consumer, + offsetsInfo, + getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo()) + ); + } + + void assignAndSeekNonEmptyPartitions() { + consumer.assign(offsetsForSeek.keySet()); + offsetsForSeek.forEach(consumer::seek); + } + + Map getBeginOffsets() { + return offsetsInfo.getBeginOffsets(); + } + + Map getEndOffsets() { + return offsetsInfo.getEndOffsets(); + } + + boolean assignedPartitionsFullyPolled() { + return offsetsInfo.assignedPartitionsFullyPolled(); + } + + // Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets + Map getOffsetsForSeek() { + return offsetsForSeek; + } + + /** + * Finds offsets for ConsumerPosition. Note: will return empty map if no offsets found for desired criteria. + */ + @VisibleForTesting + static Map getOffsetsForSeek(Consumer consumer, + OffsetsInfo offsetsInfo, + SeekTypeDTO seekType, + @Nullable Map seekTo) { + switch (seekType) { + case LATEST: + return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions()); + case BEGINNING: + return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions()); + case OFFSET: + Preconditions.checkNotNull(offsetsInfo); + return fixOffsets(offsetsInfo, seekTo); + case TIMESTAMP: + Preconditions.checkNotNull(offsetsInfo); + return offsetsForTimestamp(consumer, offsetsInfo, seekTo); + default: + throw new IllegalStateException(); + } + } + + private static Map fixOffsets(OffsetsInfo offsetsInfo, Map offsets) { + offsets = new HashMap<>(offsets); + offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions()); + + Map result = new HashMap<>(); + offsets.forEach((tp, targetOffset) -> { + long endOffset = offsetsInfo.getEndOffsets().get(tp); + long beginningOffset = offsetsInfo.getBeginOffsets().get(tp); + // fixing offsets with min - max bounds + if (targetOffset > endOffset) { + targetOffset = endOffset; + } else if (targetOffset < beginningOffset) { + targetOffset = beginningOffset; + } + result.put(tp, targetOffset); + }); + return result; + } + + private static Map offsetsForTimestamp(Consumer consumer, OffsetsInfo offsetsInfo, + Map timestamps) { + timestamps = new HashMap<>(timestamps); + timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions()); + + return consumer.offsetsForTimes(timestamps).entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java index 852c31038f..12a8ae183d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java @@ -1,8 +1,9 @@ package com.provectus.kafka.ui.emitter; +import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; -import com.provectus.kafka.ui.util.OffsetsSeek; +import java.util.HashMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -15,21 +16,21 @@ public class TailingEmitter extends AbstractEmitter implements java.util.function.Consumer> { private final Supplier> consumerSupplier; - private final OffsetsSeek offsetsSeek; + private final ConsumerPosition consumerPosition; - public TailingEmitter(ConsumerRecordDeserializer recordDeserializer, - Supplier> consumerSupplier, - OffsetsSeek offsetsSeek) { + public TailingEmitter(Supplier> consumerSupplier, + ConsumerPosition consumerPosition, + ConsumerRecordDeserializer recordDeserializer) { super(recordDeserializer); this.consumerSupplier = consumerSupplier; - this.offsetsSeek = offsetsSeek; + this.consumerPosition = consumerPosition; } @Override public void accept(FluxSink sink) { try (KafkaConsumer consumer = consumerSupplier.get()) { log.debug("Starting topic tailing"); - offsetsSeek.assignAndSeek(consumer); + assignAndSeek(consumer); while (!sink.isCancelled()) { sendPhase(sink, "Polling"); var polled = poll(sink, consumer); @@ -40,9 +41,17 @@ public class TailingEmitter extends AbstractEmitter } catch (InterruptException kafkaInterruptException) { sink.complete(); } catch (Exception e) { - log.error("Error consuming {}", offsetsSeek.getConsumerPosition(), e); + log.error("Error consuming {}", consumerPosition, e); sink.error(e); } } + private void assignAndSeek(KafkaConsumer consumer) { + var seekOperations = SeekOperations.create(consumer, consumerPosition); + var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end + seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions + consumer.assign(seekOffsets.keySet()); + seekOffsets.forEach(consumer::seek); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java index 7c3f5a6229..9d77923fbc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java @@ -1,12 +1,14 @@ package com.provectus.kafka.ui.model; import java.util.Map; +import javax.annotation.Nullable; import lombok.Value; import org.apache.kafka.common.TopicPartition; @Value public class ConsumerPosition { SeekTypeDTO seekType; - Map seekTo; - SeekDirectionDTO seekDirection; + String topic; + @Nullable + Map seekTo; // null if positioning should apply to all tps } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 9191a3840e..1f217b1e40 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -14,12 +14,9 @@ import com.provectus.kafka.ui.model.SeekDirectionDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; import com.provectus.kafka.ui.serdes.ProducerRecordCreator; -import com.provectus.kafka.ui.util.OffsetsSeekBackward; -import com.provectus.kafka.ui.util.OffsetsSeekForward; import com.provectus.kafka.ui.util.ResultSizeLimiter; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; @@ -129,58 +126,62 @@ public class MessagesService { } public Flux loadMessages(KafkaCluster cluster, String topic, - ConsumerPosition consumerPosition, String query, + ConsumerPosition consumerPosition, + @Nullable String query, MessageFilterTypeDTO filterQueryType, int limit, + SeekDirectionDTO seekDirection, @Nullable String keySerde, @Nullable String valueSerde) { return withExistingTopic(cluster, topic) .flux() .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query, - filterQueryType, limit, keySerde, valueSerde)); + filterQueryType, limit, seekDirection, keySerde, valueSerde)); } private Flux loadMessagesImpl(KafkaCluster cluster, String topic, ConsumerPosition consumerPosition, - String query, + @Nullable String query, MessageFilterTypeDTO filterQueryType, int limit, + SeekDirectionDTO seekDirection, @Nullable String keySerde, @Nullable String valueSerde) { java.util.function.Consumer> emitter; ConsumerRecordDeserializer recordDeserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde); - if (consumerPosition.getSeekDirection().equals(SeekDirectionDTO.FORWARD)) { + if (seekDirection.equals(SeekDirectionDTO.FORWARD)) { emitter = new ForwardRecordEmitter( () -> consumerGroupService.createConsumer(cluster), - new OffsetsSeekForward(topic, consumerPosition), + consumerPosition, recordDeserializer ); - } else if (consumerPosition.getSeekDirection().equals(SeekDirectionDTO.BACKWARD)) { + } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) { emitter = new BackwardRecordEmitter( - (Map props) -> consumerGroupService.createConsumer(cluster, props), - new OffsetsSeekBackward(topic, consumerPosition, limit), + () -> consumerGroupService.createConsumer(cluster), + consumerPosition, + limit, recordDeserializer ); } else { emitter = new TailingEmitter( - recordDeserializer, () -> consumerGroupService.createConsumer(cluster), - new OffsetsSeekForward(topic, consumerPosition) + consumerPosition, + recordDeserializer ); } return Flux.create(emitter) .filter(getMsgFilter(query, filterQueryType)) - .takeWhile(createTakeWhilePredicate(consumerPosition, limit)) + .takeWhile(createTakeWhilePredicate(seekDirection, limit)) .subscribeOn(Schedulers.boundedElastic()) .share(); } private Predicate createTakeWhilePredicate( - ConsumerPosition consumerPosition, int limit) { - return consumerPosition.getSeekDirection() == SeekDirectionDTO.TAILING + SeekDirectionDTO seekDirection, int limit) { + return seekDirection == SeekDirectionDTO.TAILING ? evt -> true // no limit for tailing : new ResultSizeLimiter(limit); } @@ -189,8 +190,6 @@ public class MessagesService { if (StringUtils.isEmpty(query)) { return evt -> true; } - filterQueryType = Optional.ofNullable(filterQueryType) - .orElse(MessageFilterTypeDTO.STRING_CONTAINS); var messageFilter = MessageFilters.createMsgFilter(query, filterQueryType); return evt -> { // we only apply filter for message events diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java index 3e72e8a07e..3eb61a56d2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java @@ -2,12 +2,12 @@ package com.provectus.kafka.ui.service.analyze; import static com.provectus.kafka.ui.emitter.AbstractEmitter.NO_MORE_DATA_EMPTY_POLLS_COUNT; +import com.provectus.kafka.ui.emitter.OffsetsInfo; import com.provectus.kafka.ui.exception.TopicAnalysisException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.TopicAnalysisDTO; import com.provectus.kafka.ui.service.ConsumerGroupService; import com.provectus.kafka.ui.service.TopicsService; -import com.provectus.kafka.ui.util.OffsetsSeek.WaitingOffsets; import java.io.Closeable; import java.time.Duration; import java.time.Instant; @@ -119,14 +119,14 @@ public class TopicAnalysisService { consumer.assign(topicPartitions); consumer.seekToBeginning(topicPartitions); - var waitingOffsets = new WaitingOffsets(topicId.topicName, consumer, topicPartitions); - for (int emptyPolls = 0; !waitingOffsets.endReached() && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) { + var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName); + for (int emptyPolls = 0; !offsetsInfo.assignedPartitionsFullyPolled() + && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) { var polled = consumer.poll(Duration.ofSeconds(3)); emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0; polled.forEach(r -> { totalStats.apply(r); partitionStats.get(r.partition()).apply(r); - waitingOffsets.markPolled(r); }); updateProgress(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java deleted file mode 100644 index e8d475a65d..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java +++ /dev/null @@ -1,143 +0,0 @@ -package com.provectus.kafka.ui.util; - -import com.provectus.kafka.ui.model.ConsumerPosition; -import com.provectus.kafka.ui.model.SeekTypeDTO; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Bytes; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; - -@Slf4j -public abstract class OffsetsSeek { - protected final String topic; - protected final ConsumerPosition consumerPosition; - - protected OffsetsSeek(String topic, ConsumerPosition consumerPosition) { - this.topic = topic; - this.consumerPosition = consumerPosition; - } - - public ConsumerPosition getConsumerPosition() { - return consumerPosition; - } - - public Map getPartitionsOffsets(Consumer consumer) { - SeekTypeDTO seekType = consumerPosition.getSeekType(); - List partitions = getRequestedPartitions(consumer); - log.info("Positioning consumer for topic {} with {}", topic, consumerPosition); - Map offsets; - switch (seekType) { - case OFFSET: - offsets = offsetsFromPositions(consumer, partitions); - break; - case TIMESTAMP: - offsets = offsetsForTimestamp(consumer); - break; - case BEGINNING: - offsets = offsetsFromBeginning(consumer, partitions); - break; - case LATEST: - offsets = endOffsets(consumer, partitions); - break; - default: - throw new IllegalArgumentException("Unknown seekType: " + seekType); - } - return offsets; - } - - public WaitingOffsets waitingOffsets(Consumer consumer, - Collection partitions) { - return new WaitingOffsets(topic, consumer, partitions); - } - - public WaitingOffsets assignAndSeek(Consumer consumer) { - final Map partitionsOffsets = getPartitionsOffsets(consumer); - consumer.assign(partitionsOffsets.keySet()); - partitionsOffsets.forEach(consumer::seek); - log.info("Assignment: {}", consumer.assignment()); - return waitingOffsets(consumer, partitionsOffsets.keySet()); - } - - - public List getRequestedPartitions(Consumer consumer) { - Map partitionPositions = consumerPosition.getSeekTo(); - return consumer.partitionsFor(topic).stream() - .filter( - p -> partitionPositions.isEmpty() - || partitionPositions.containsKey(new TopicPartition(p.topic(), p.partition())) - ).map(p -> new TopicPartition(p.topic(), p.partition())) - .collect(Collectors.toList()); - } - - protected Map endOffsets( - Consumer consumer, List partitions) { - return consumer.endOffsets(partitions); - } - - protected abstract Map offsetsFromBeginning( - Consumer consumer, List partitions); - - protected abstract Map offsetsForTimestamp( - Consumer consumer); - - protected abstract Map offsetsFromPositions( - Consumer consumer, List partitions); - - public static class WaitingOffsets { - private final Map endOffsets; // partition number -> offset - private final Map beginOffsets; // partition number -> offset - - public WaitingOffsets(String topic, Consumer consumer, - Collection partitions) { - var allBeginningOffsets = consumer.beginningOffsets(partitions); - var allEndOffsets = consumer.endOffsets(partitions); - - this.endOffsets = allEndOffsets.entrySet().stream() - .filter(entry -> !allBeginningOffsets.get(entry.getKey()).equals(entry.getValue())) - .map(e -> Tuples.of(e.getKey().partition(), e.getValue() - 1)) - .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); - - this.beginOffsets = this.endOffsets.keySet().stream() - .map(p -> Tuples.of(p, allBeginningOffsets.get(new TopicPartition(topic, p)))) - .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); - } - - public void markPolled(ConsumerRecord rec) { - markPolled(rec.partition(), rec.offset()); - } - - public void markPolled(int partition, long offset) { - Long endWaiting = endOffsets.get(partition); - if (endWaiting != null && endWaiting <= offset) { - endOffsets.remove(partition); - } - Long beginWaiting = beginOffsets.get(partition); - if (beginWaiting != null && beginWaiting >= offset) { - beginOffsets.remove(partition); - } - } - - public boolean endReached() { - return endOffsets.isEmpty(); - } - - public boolean beginReached() { - return beginOffsets.isEmpty(); - } - - public Map getEndOffsets() { - return endOffsets; - } - - public Map getBeginOffsets() { - return beginOffsets; - } - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java deleted file mode 100644 index e3d8f1b5b8..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java +++ /dev/null @@ -1,120 +0,0 @@ -package com.provectus.kafka.ui.util; - -import com.provectus.kafka.ui.model.ConsumerPosition; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Bytes; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; - -@Slf4j -public class OffsetsSeekBackward extends OffsetsSeek { - - private final int maxMessages; - - public OffsetsSeekBackward(String topic, - ConsumerPosition consumerPosition, int maxMessages) { - super(topic, consumerPosition); - this.maxMessages = maxMessages; - } - - public int msgsPerPartition(int partitionsSize) { - return msgsPerPartition(maxMessages, partitionsSize); - } - - public int msgsPerPartition(long awaitingMessages, int partitionsSize) { - return (int) Math.ceil((double) awaitingMessages / partitionsSize); - } - - - protected Map offsetsFromPositions(Consumer consumer, - List partitions) { - - return findOffsetsInt(consumer, consumerPosition.getSeekTo(), partitions); - } - - protected Map offsetsFromBeginning(Consumer consumer, - List partitions) { - return findOffsets(consumer, Map.of(), partitions); - } - - protected Map offsetsForTimestamp(Consumer consumer) { - Map timestampsToSearch = - consumerPosition.getSeekTo().entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - Map.Entry::getValue - )); - Map offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) - .entrySet().stream() - .filter(e -> e.getValue() != null) - .map(v -> Tuples.of(v.getKey(), v.getValue().offset())) - .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); - - if (offsetsForTimestamps.isEmpty()) { - throw new IllegalArgumentException("No offsets were found for requested timestamps"); - } - - log.info("Timestamps: {} to offsets: {}", timestampsToSearch, offsetsForTimestamps); - - return findOffsets(consumer, offsetsForTimestamps, offsetsForTimestamps.keySet()); - } - - protected Map findOffsetsInt( - Consumer consumer, Map seekTo, - List partitions) { - return findOffsets(consumer, seekTo, partitions); - } - - protected Map findOffsets( - Consumer consumer, Map seekTo, - Collection partitions) { - - final Map beginningOffsets = consumer.beginningOffsets(partitions); - final Map endOffsets = consumer.endOffsets(partitions); - - final Map seekMap = new HashMap<>(); - final Set emptyPartitions = new HashSet<>(); - - for (Map.Entry entry : seekTo.entrySet()) { - final Long endOffset = endOffsets.get(entry.getKey()); - final Long beginningOffset = beginningOffsets.get(entry.getKey()); - if (beginningOffset != null - && endOffset != null - && beginningOffset < endOffset - && entry.getValue() > beginningOffset - ) { - final Long value; - if (entry.getValue() > endOffset) { - value = endOffset; - } else { - value = entry.getValue(); - } - - seekMap.put(entry.getKey(), value); - } else { - emptyPartitions.add(entry.getKey()); - } - } - - Set waiting = new HashSet<>(partitions); - waiting.removeAll(emptyPartitions); - waiting.removeAll(seekMap.keySet()); - - for (TopicPartition topicPartition : waiting) { - seekMap.put(topicPartition, endOffsets.get(topicPartition)); - } - - return seekMap; - } - - -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java deleted file mode 100644 index 6b6ea735fc..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.provectus.kafka.ui.util; - -import com.provectus.kafka.ui.model.ConsumerPosition; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Bytes; - -@Slf4j -public class OffsetsSeekForward extends OffsetsSeek { - - public OffsetsSeekForward(String topic, ConsumerPosition consumerPosition) { - super(topic, consumerPosition); - } - - protected Map offsetsFromPositions(Consumer consumer, - List partitions) { - final Map offsets = - offsetsFromBeginning(consumer, partitions); - - final Map endOffsets = consumer.endOffsets(offsets.keySet()); - final Set set = new HashSet<>(consumerPosition.getSeekTo().keySet()); - final Map collect = consumerPosition.getSeekTo().entrySet().stream() - .filter(e -> e.getValue() < endOffsets.get(e.getKey())) - .filter(e -> endOffsets.get(e.getKey()) > offsets.get(e.getKey())) - .collect(Collectors.toMap( - Map.Entry::getKey, - Map.Entry::getValue - )); - offsets.putAll(collect); - set.removeAll(collect.keySet()); - set.forEach(offsets::remove); - - return offsets; - } - - protected Map offsetsForTimestamp(Consumer consumer) { - Map offsetsForTimestamps = - consumer.offsetsForTimes(consumerPosition.getSeekTo()) - .entrySet().stream() - .filter(e -> e.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); - - if (offsetsForTimestamps.isEmpty()) { - throw new IllegalArgumentException("No offsets were found for requested timestamps"); - } - - return offsetsForTimestamps; - } - - protected Map offsetsFromBeginning(Consumer consumer, - List partitions) { - return consumer.beginningOffsets(partitions); - } - -} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/OffsetsInfoTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/OffsetsInfoTest.java new file mode 100644 index 0000000000..156f62846b --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/OffsetsInfoTest.java @@ -0,0 +1,53 @@ +package com.provectus.kafka.ui.emitter; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class OffsetsInfoTest { + + final String topic = "test"; + final TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0 + final TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10 + final TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20 + final TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30 + + MockConsumer consumer; + + @BeforeEach + void initMockConsumer() { + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updatePartitions( + topic, + Stream.of(tp0, tp1, tp2, tp3) + .map(tp -> new PartitionInfo(topic, tp.partition(), null, null, null, null)) + .collect(Collectors.toList())); + consumer.updateBeginningOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 0L, tp3, 25L)); + consumer.updateEndOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 20L, tp3, 30L)); + } + + @Test + void fillsInnerFieldsAccordingToTopicState() { + var offsets = new OffsetsInfo(consumer, List.of(tp0, tp1, tp2, tp3)); + + assertThat(offsets.getBeginOffsets()).containsEntry(tp0, 0L).containsEntry(tp1, 10L).containsEntry(tp2, 0L) + .containsEntry(tp3, 25L); + + assertThat(offsets.getEndOffsets()).containsEntry(tp0, 0L).containsEntry(tp1, 10L).containsEntry(tp2, 20L) + .containsEntry(tp3, 30L); + + assertThat(offsets.getEmptyPartitions()).contains(tp0, tp1); + assertThat(offsets.getNonEmptyPartitions()).contains(tp2, tp3); + } + +} \ No newline at end of file diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java new file mode 100644 index 0000000000..affa423123 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java @@ -0,0 +1,88 @@ +package com.provectus.kafka.ui.emitter; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.model.SeekTypeDTO; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class SeekOperationsTest { + + final String topic = "test"; + final TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0 + final TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10 + final TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20 + final TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30 + + MockConsumer consumer; + + @BeforeEach + void initMockConsumer() { + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updatePartitions( + topic, + Stream.of(tp0, tp1, tp2, tp3) + .map(tp -> new PartitionInfo(topic, tp.partition(), null, null, null, null)) + .collect(Collectors.toList())); + consumer.updateBeginningOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 0L, tp3, 25L)); + consumer.updateEndOffsets(Map.of(tp0, 0L, tp1, 10L, tp2, 20L, tp3, 30L)); + } + + @Nested + class GetOffsetsForSeek { + + @Test + void latest() { + var offsets = SeekOperations.getOffsetsForSeek( + consumer, + new OffsetsInfo(consumer, topic), + SeekTypeDTO.LATEST, + null + ); + assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L)); + } + + @Test + void beginning() { + var offsets = SeekOperations.getOffsetsForSeek( + consumer, + new OffsetsInfo(consumer, topic), + SeekTypeDTO.BEGINNING, + null + ); + assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L)); + } + + @Test + void offsets() { + var offsets = SeekOperations.getOffsetsForSeek( + consumer, + new OffsetsInfo(consumer, topic), + SeekTypeDTO.OFFSET, + Map.of(tp1, 10L, tp2, 10L, tp3, 26L) + ); + assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 10L, tp3, 26L)); + } + + @Test + void offsetsWithBoundsFixing() { + var offsets = SeekOperations.getOffsetsForSeek( + consumer, + new OffsetsInfo(consumer, topic), + SeekTypeDTO.OFFSET, + Map.of(tp1, 10L, tp2, 21L, tp3, 24L) + ); + assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 25L)); + } + } + +} \ No newline at end of file diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java index 27fbda1e9d..cdb1eaaa54 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java @@ -111,10 +111,11 @@ class TailingEmitterTest extends AbstractIntegrationTest { return applicationContext.getBean(MessagesService.class) .loadMessages(cluster, topicName, - new ConsumerPosition(SeekTypeDTO.LATEST, Map.of(), SeekDirectionDTO.TAILING), + new ConsumerPosition(SeekTypeDTO.LATEST, topic, null), query, MessageFilterTypeDTO.STRING_CONTAINS, 0, + SeekDirectionDTO.TAILING, "String", "String"); } @@ -137,7 +138,7 @@ class TailingEmitterTest extends AbstractIntegrationTest { Awaitility.await() .pollInSameThread() .pollDelay(Duration.ofMillis(100)) - .atMost(Duration.ofSeconds(10)) + .atMost(Duration.ofSeconds(200)) .until(() -> fluxOutput.stream() .anyMatch(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.CONSUMING)); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java index 9dfbaed0b1..8c18e20882 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java @@ -45,7 +45,7 @@ class MessagesServiceTest extends AbstractIntegrationTest { @Test void loadMessagesReturnsExceptionWhenTopicNotFound() { StepVerifier.create(messagesService - .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, "String", "String")) + .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String")) .expectError(TopicNotFoundException.class) .verify(); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java index e6c9b3c83a..3289d177d2 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java @@ -1,8 +1,7 @@ package com.provectus.kafka.ui.service; -import static com.provectus.kafka.ui.model.SeekDirectionDTO.BACKWARD; -import static com.provectus.kafka.ui.model.SeekDirectionDTO.FORWARD; import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING; +import static com.provectus.kafka.ui.model.SeekTypeDTO.LATEST; import static com.provectus.kafka.ui.model.SeekTypeDTO.OFFSET; import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP; import static org.assertj.core.api.Assertions.assertThat; @@ -17,8 +16,6 @@ import com.provectus.kafka.ui.serde.api.Serde; import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; import com.provectus.kafka.ui.serdes.PropertyResolverImpl; import com.provectus.kafka.ui.serdes.builtin.StringSerde; -import com.provectus.kafka.ui.util.OffsetsSeekBackward; -import com.provectus.kafka.ui.util.OffsetsSeekForward; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -112,18 +109,15 @@ class RecordEmitterTest extends AbstractIntegrationTest { void pollNothingOnEmptyTopic() { var forwardEmitter = new ForwardRecordEmitter( this::createConsumer, - new OffsetsSeekForward(EMPTY_TOPIC, - new ConsumerPosition(BEGINNING, Map.of(), FORWARD) - ), RECORD_DESERIALIZER + new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null), + RECORD_DESERIALIZER ); var backwardEmitter = new BackwardRecordEmitter( this::createConsumer, - new OffsetsSeekBackward( - EMPTY_TOPIC, - new ConsumerPosition(BEGINNING, Map.of(), BACKWARD), - 100 - ), RECORD_DESERIALIZER + new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null), + 100, + RECORD_DESERIALIZER ); StepVerifier.create( @@ -143,17 +137,15 @@ class RecordEmitterTest extends AbstractIntegrationTest { void pollFullTopicFromBeginning() { var forwardEmitter = new ForwardRecordEmitter( this::createConsumer, - new OffsetsSeekForward(TOPIC, - new ConsumerPosition(BEGINNING, Map.of(), FORWARD) - ), RECORD_DESERIALIZER + new ConsumerPosition(BEGINNING, TOPIC, null), + RECORD_DESERIALIZER ); var backwardEmitter = new BackwardRecordEmitter( this::createConsumer, - new OffsetsSeekBackward(TOPIC, - new ConsumerPosition(BEGINNING, Map.of(), BACKWARD), - PARTITIONS * MSGS_PER_PARTITION - ), RECORD_DESERIALIZER + new ConsumerPosition(LATEST, TOPIC, null), + PARTITIONS * MSGS_PER_PARTITION, + RECORD_DESERIALIZER ); List expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()); @@ -172,17 +164,15 @@ class RecordEmitterTest extends AbstractIntegrationTest { var forwardEmitter = new ForwardRecordEmitter( this::createConsumer, - new OffsetsSeekForward(TOPIC, - new ConsumerPosition(OFFSET, targetOffsets, FORWARD) - ), RECORD_DESERIALIZER + new ConsumerPosition(OFFSET, TOPIC, targetOffsets), + RECORD_DESERIALIZER ); var backwardEmitter = new BackwardRecordEmitter( this::createConsumer, - new OffsetsSeekBackward(TOPIC, - new ConsumerPosition(OFFSET, targetOffsets, BACKWARD), - PARTITIONS * MSGS_PER_PARTITION - ), RECORD_DESERIALIZER + new ConsumerPosition(OFFSET, TOPIC, targetOffsets), + PARTITIONS * MSGS_PER_PARTITION, + RECORD_DESERIALIZER ); var expectedValues = SENT_RECORDS.stream() @@ -217,17 +207,15 @@ class RecordEmitterTest extends AbstractIntegrationTest { var forwardEmitter = new ForwardRecordEmitter( this::createConsumer, - new OffsetsSeekForward(TOPIC, - new ConsumerPosition(TIMESTAMP, targetTimestamps, FORWARD) - ), RECORD_DESERIALIZER + new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps), + RECORD_DESERIALIZER ); var backwardEmitter = new BackwardRecordEmitter( this::createConsumer, - new OffsetsSeekBackward(TOPIC, - new ConsumerPosition(TIMESTAMP, targetTimestamps, BACKWARD), - PARTITIONS * MSGS_PER_PARTITION - ), RECORD_DESERIALIZER + new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps), + PARTITIONS * MSGS_PER_PARTITION, + RECORD_DESERIALIZER ); var expectedValues = SENT_RECORDS.stream() @@ -255,10 +243,9 @@ class RecordEmitterTest extends AbstractIntegrationTest { var backwardEmitter = new BackwardRecordEmitter( this::createConsumer, - new OffsetsSeekBackward(TOPIC, - new ConsumerPosition(OFFSET, targetOffsets, BACKWARD), - numMessages - ), RECORD_DESERIALIZER + new ConsumerPosition(OFFSET, TOPIC, targetOffsets), + numMessages, + RECORD_DESERIALIZER ); var expectedValues = SENT_RECORDS.stream() @@ -281,10 +268,9 @@ class RecordEmitterTest extends AbstractIntegrationTest { var backwardEmitter = new BackwardRecordEmitter( this::createConsumer, - new OffsetsSeekBackward(TOPIC, - new ConsumerPosition(OFFSET, offsets, BACKWARD), - 100 - ), RECORD_DESERIALIZER + new ConsumerPosition(OFFSET, TOPIC, offsets), + 100, + RECORD_DESERIALIZER ); expectEmitter(backwardEmitter, @@ -331,7 +317,7 @@ class RecordEmitterTest extends AbstractIntegrationTest { final Map map = Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(), - ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20, // to check multiple polls + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class ); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java index 4de939e033..78c111cdd1 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java @@ -502,12 +502,13 @@ public class SendAndReadTests extends AbstractIntegrationTest { topic, new ConsumerPosition( SeekTypeDTO.BEGINNING, - Map.of(new TopicPartition(topic, 0), 0L), - SeekDirectionDTO.FORWARD + topic, + Map.of(new TopicPartition(topic, 0), 0L) ), null, null, 1, + SeekDirectionDTO.FORWARD, msgToSend.getKeySerde().get(), msgToSend.getValueSerde().get() ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java deleted file mode 100644 index 54c2064c1c..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java +++ /dev/null @@ -1,196 +0,0 @@ -package com.provectus.kafka.ui.util; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.provectus.kafka.ui.model.ConsumerPosition; -import com.provectus.kafka.ui.model.SeekDirectionDTO; -import com.provectus.kafka.ui.model.SeekTypeDTO; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Bytes; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -class OffsetsSeekTest { - - final String topic = "test"; - final TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0 - final TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10 - final TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20 - final TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30 - - MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - - @BeforeEach - void initConsumer() { - consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - consumer.updatePartitions( - topic, - Stream.of(tp0, tp1, tp2, tp3) - .map(tp -> new PartitionInfo(topic, tp.partition(), null, null, null, null)) - .collect(Collectors.toList())); - consumer.updateBeginningOffsets(Map.of( - tp0, 0L, - tp1, 10L, - tp2, 0L, - tp3, 25L - )); - consumer.updateEndOffsets(Map.of( - tp0, 0L, - tp1, 10L, - tp2, 20L, - tp3, 30L - )); - } - - @Test - void forwardSeekToBeginningAllPartitions() { - var seek = new OffsetsSeekForward( - topic, - new ConsumerPosition( - SeekTypeDTO.BEGINNING, - Map.of(tp0, 0L, tp1, 0L), - SeekDirectionDTO.FORWARD - ) - ); - - seek.assignAndSeek(consumer); - assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1); - assertThat(consumer.position(tp0)).isZero(); - assertThat(consumer.position(tp1)).isEqualTo(10L); - } - - @Test - void backwardSeekToBeginningAllPartitions() { - var seek = new OffsetsSeekBackward( - topic, - new ConsumerPosition( - SeekTypeDTO.BEGINNING, - Map.of(tp2, 0L, tp3, 0L), - SeekDirectionDTO.BACKWARD - ), - 10 - ); - - seek.assignAndSeek(consumer); - assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2, tp3); - assertThat(consumer.position(tp2)).isEqualTo(20L); - assertThat(consumer.position(tp3)).isEqualTo(30L); - } - - @Test - void forwardSeekToBeginningWithPartitionsList() { - var seek = new OffsetsSeekForward( - topic, - new ConsumerPosition(SeekTypeDTO.BEGINNING, Map.of(), SeekDirectionDTO.FORWARD)); - seek.assignAndSeek(consumer); - assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3); - assertThat(consumer.position(tp0)).isZero(); - assertThat(consumer.position(tp1)).isEqualTo(10L); - assertThat(consumer.position(tp2)).isZero(); - assertThat(consumer.position(tp3)).isEqualTo(25L); - } - - @Test - void backwardSeekToBeginningWithPartitionsList() { - var seek = new OffsetsSeekBackward( - topic, - new ConsumerPosition(SeekTypeDTO.BEGINNING, Map.of(), SeekDirectionDTO.BACKWARD), - 10 - ); - seek.assignAndSeek(consumer); - assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3); - assertThat(consumer.position(tp0)).isZero(); - assertThat(consumer.position(tp1)).isEqualTo(10L); - assertThat(consumer.position(tp2)).isEqualTo(20L); - assertThat(consumer.position(tp3)).isEqualTo(30L); - } - - - @Test - void forwardSeekToOffset() { - var seek = new OffsetsSeekForward( - topic, - new ConsumerPosition( - SeekTypeDTO.OFFSET, - Map.of(tp0, 0L, tp1, 1L, tp2, 2L), - SeekDirectionDTO.FORWARD - ) - ); - seek.assignAndSeek(consumer); - assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2); - assertThat(consumer.position(tp2)).isEqualTo(2L); - } - - @Test - void backwardSeekToOffset() { - var seek = new OffsetsSeekBackward( - topic, - new ConsumerPosition( - SeekTypeDTO.OFFSET, - Map.of(tp0, 0L, tp1, 1L, tp2, 20L), - SeekDirectionDTO.BACKWARD - ), - 2 - ); - seek.assignAndSeek(consumer); - assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2); - assertThat(consumer.position(tp2)).isEqualTo(20L); - } - - @Test - void backwardSeekToOffsetOnlyOnePartition() { - var seek = new OffsetsSeekBackward( - topic, - new ConsumerPosition( - SeekTypeDTO.OFFSET, - Map.of(tp2, 20L), - SeekDirectionDTO.BACKWARD - ), - 20 - ); - seek.assignAndSeek(consumer); - assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2); - assertThat(consumer.position(tp2)).isEqualTo(20L); - } - - - @Nested - class WaitingOffsetsTest { - - OffsetsSeekForward.WaitingOffsets offsets; - - @BeforeEach - void assignAndCreateOffsets() { - consumer.assign(List.of(tp0, tp1, tp2, tp3)); - offsets = new OffsetsSeek.WaitingOffsets(topic, consumer, List.of(tp0, tp1, tp2, tp3)); - } - - @Test - void collectsSignificantOffsetsMinus1ForAssignedPartitions() { - // offsets for partition 0 & 1 should be skipped because they - // effectively contains no data (start offset = end offset) - assertThat(offsets.getEndOffsets()).containsExactlyInAnyOrderEntriesOf( - Map.of(2, 19L, 3, 29L) - ); - } - - @Test - void returnTrueWhenOffsetsReachedReached() { - assertThat(offsets.endReached()).isFalse(); - offsets.markPolled(new ConsumerRecord<>(topic, 2, 19, null, null)); - assertThat(offsets.endReached()).isFalse(); - offsets.markPolled(new ConsumerRecord<>(topic, 3, 29, null, null)); - assertThat(offsets.endReached()).isTrue(); - } - } - -}