From d3bf65cfb6c7929d874c5cf70e05582b2fc48b25 Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 18 Jun 2021 11:56:14 +0300 Subject: [PATCH] #122 Seek direction backend support (#562) --- .../ui/controller/MessagesController.java | 11 +- .../kafka/ui/model/ConsumerPosition.java | 2 +- .../kafka/ui/service/ConsumingService.java | 107 ++-------------- .../provectus/kafka/ui/util/OffsetsSeek.java | 84 ++++++++++++ .../kafka/ui/util/OffsetsSeekBackward.java | 121 ++++++++++++++++++ .../kafka/ui/util/OffsetsSeekForward.java | 59 +++++++++ .../kafka/ui/service/RecordEmitterTest.java | 23 +++- .../ui/{service => util}/OffsetsSeekTest.java | 89 +++++++++++-- .../main/resources/swagger/kafka-ui-api.yaml | 11 ++ 9 files changed, 387 insertions(+), 120 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java rename kafka-ui-api/src/test/java/com/provectus/kafka/ui/{service => util}/OffsetsSeekTest.java (58%) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index 5dcb94a989..a401b8268e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.MessagesApi; import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.SeekDirection; import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.service.ClusterService; @@ -40,13 +41,15 @@ public class MessagesController implements MessagesApi { @Override public Mono>> getTopicMessages( String clusterName, String topicName, @Valid SeekType seekType, @Valid List seekTo, - @Valid Integer limit, @Valid String q, ServerWebExchange exchange) { - return parseConsumerPosition(seekType, seekTo) + @Valid Integer limit, @Valid String q, @Valid SeekDirection seekDirection, + ServerWebExchange exchange) { + return parseConsumerPosition(seekType, seekTo, seekDirection) .map(consumerPosition -> ResponseEntity .ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit))); } - private Mono parseConsumerPosition(SeekType seekType, List seekTo) { + private Mono parseConsumerPosition( + SeekType seekType, List seekTo, SeekDirection seekDirection) { return Mono.justOrEmpty(seekTo) .defaultIfEmpty(Collections.emptyList()) .flatMapIterable(Function.identity()) @@ -61,7 +64,7 @@ public class MessagesController implements MessagesApi { }) .collectMap(Pair::getKey, Pair::getValue) .map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING, - positions)); + positions, seekDirection)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java index 0f947be7fc..a867002b34 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java @@ -8,5 +8,5 @@ public class ConsumerPosition { private SeekType seekType; private Map seekTo; - + private SeekDirection seekDirection; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java index 310292bc6e..fceb80aa9e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java @@ -6,12 +6,14 @@ import com.provectus.kafka.ui.deserialization.DeserializationService; import com.provectus.kafka.ui.deserialization.RecordDeserializer; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.SeekType; +import com.provectus.kafka.ui.model.SeekDirection; import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.util.ClusterUtil; +import com.provectus.kafka.ui.util.OffsetsSeek; +import com.provectus.kafka.ui.util.OffsetsSeekBackward; +import com.provectus.kafka.ui.util.OffsetsSeekForward; import java.time.Duration; import java.util.Collection; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -53,7 +55,10 @@ public class ConsumingService { .orElse(DEFAULT_RECORD_LIMIT); RecordEmitter emitter = new RecordEmitter( () -> kafkaService.createConsumer(cluster), - new OffsetsSeek(topic, consumerPosition)); + consumerPosition.getSeekDirection().equals(SeekDirection.FORWARD) + ? new OffsetsSeekForward(topic, consumerPosition) + : new OffsetsSeekBackward(topic, consumerPosition, recordsLimit) + ); RecordDeserializer recordDeserializer = deserializationService.getRecordDeserializerForCluster(cluster); return Flux.create(emitter) @@ -79,7 +84,7 @@ public class ConsumingService { * returns end offsets for partitions where start offset != end offsets. * This is useful when we need to verify that partition is not empty. */ - private static Map significantOffsets(Consumer consumer, + public static Map significantOffsets(Consumer consumer, String topicName, Collection partitionsToInclude) { @@ -159,98 +164,4 @@ public class ConsumingService { } } - @RequiredArgsConstructor - static class OffsetsSeek { - - private final String topic; - private final ConsumerPosition consumerPosition; - - public WaitingOffsets assignAndSeek(Consumer consumer) { - SeekType seekType = consumerPosition.getSeekType(); - log.info("Positioning consumer for topic {} with {}", topic, consumerPosition); - switch (seekType) { - case OFFSET: - assignAndSeekForOffset(consumer); - break; - case TIMESTAMP: - assignAndSeekForTimestamp(consumer); - break; - case BEGINNING: - assignAndSeekFromBeginning(consumer); - break; - default: - throw new IllegalArgumentException("Unknown seekType: " + seekType); - } - log.info("Assignment: {}", consumer.assignment()); - return new WaitingOffsets(topic, consumer); - } - - private List getRequestedPartitions(Consumer consumer) { - Map partitionPositions = consumerPosition.getSeekTo(); - return consumer.partitionsFor(topic).stream() - .filter( - p -> partitionPositions.isEmpty() || partitionPositions.containsKey(p.partition())) - .map(p -> new TopicPartition(p.topic(), p.partition())) - .collect(Collectors.toList()); - } - - private void assignAndSeekForOffset(Consumer consumer) { - List partitions = getRequestedPartitions(consumer); - consumer.assign(partitions); - consumerPosition.getSeekTo().forEach((partition, offset) -> { - TopicPartition topicPartition = new TopicPartition(topic, partition); - consumer.seek(topicPartition, offset); - }); - } - - private void assignAndSeekForTimestamp(Consumer consumer) { - Map timestampsToSearch = - consumerPosition.getSeekTo().entrySet().stream() - .collect(Collectors.toMap( - partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), - Map.Entry::getValue - )); - Map offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) - .entrySet().stream() - .filter(e -> e.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); - - if (offsetsForTimestamps.isEmpty()) { - throw new IllegalArgumentException("No offsets were found for requested timestamps"); - } - - consumer.assign(offsetsForTimestamps.keySet()); - offsetsForTimestamps.forEach(consumer::seek); - } - - private void assignAndSeekFromBeginning(Consumer consumer) { - List partitions = getRequestedPartitions(consumer); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); - } - - static class WaitingOffsets { - final Map offsets = new HashMap<>(); // partition number -> offset - - WaitingOffsets(String topic, Consumer consumer) { - var partitions = consumer.assignment().stream() - .map(TopicPartition::partition) - .collect(Collectors.toList()); - significantOffsets(consumer, topic, partitions) - .forEach((tp, offset) -> offsets.put(tp.partition(), offset - 1)); - } - - void markPolled(ConsumerRecord rec) { - Long waiting = offsets.get(rec.partition()); - if (waiting != null && waiting <= rec.offset()) { - offsets.remove(rec.partition()); - } - } - - boolean endReached() { - return offsets.isEmpty(); - } - } - - } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java new file mode 100644 index 0000000000..9d082c67bc --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java @@ -0,0 +1,84 @@ +package com.provectus.kafka.ui.util; + +import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.SeekType; +import com.provectus.kafka.ui.service.ConsumingService; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; + +@Log4j2 +public abstract class OffsetsSeek { + protected final String topic; + protected final ConsumerPosition consumerPosition; + + public OffsetsSeek(String topic, ConsumerPosition consumerPosition) { + this.topic = topic; + this.consumerPosition = consumerPosition; + } + + public WaitingOffsets assignAndSeek(Consumer consumer) { + SeekType seekType = consumerPosition.getSeekType(); + log.info("Positioning consumer for topic {} with {}", topic, consumerPosition); + switch (seekType) { + case OFFSET: + assignAndSeekForOffset(consumer); + break; + case TIMESTAMP: + assignAndSeekForTimestamp(consumer); + break; + case BEGINNING: + assignAndSeekFromBeginning(consumer); + break; + default: + throw new IllegalArgumentException("Unknown seekType: " + seekType); + } + log.info("Assignment: {}", consumer.assignment()); + return new WaitingOffsets(topic, consumer); + } + + protected List getRequestedPartitions(Consumer consumer) { + Map partitionPositions = consumerPosition.getSeekTo(); + return consumer.partitionsFor(topic).stream() + .filter( + p -> partitionPositions.isEmpty() || partitionPositions.containsKey(p.partition())) + .map(p -> new TopicPartition(p.topic(), p.partition())) + .collect(Collectors.toList()); + } + + + protected abstract void assignAndSeekFromBeginning(Consumer consumer); + + protected abstract void assignAndSeekForTimestamp(Consumer consumer); + + protected abstract void assignAndSeekForOffset(Consumer consumer); + + public static class WaitingOffsets { + final Map offsets = new HashMap<>(); // partition number -> offset + + public WaitingOffsets(String topic, Consumer consumer) { + var partitions = consumer.assignment().stream() + .map(TopicPartition::partition) + .collect(Collectors.toList()); + ConsumingService.significantOffsets(consumer, topic, partitions) + .forEach((tp, offset) -> offsets.put(tp.partition(), offset - 1)); + } + + public void markPolled(ConsumerRecord rec) { + Long waiting = offsets.get(rec.partition()); + if (waiting != null && waiting <= rec.offset()) { + offsets.remove(rec.partition()); + } + } + + public boolean endReached() { + return offsets.isEmpty(); + } + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java new file mode 100644 index 0000000000..7500ab0bbb --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java @@ -0,0 +1,121 @@ +package com.provectus.kafka.ui.util; + +import com.provectus.kafka.ui.model.ConsumerPosition; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +@Log4j2 +public class OffsetsSeekBackward extends OffsetsSeek { + + private final int maxMessages; + + public OffsetsSeekBackward(String topic, + ConsumerPosition consumerPosition, int maxMessages) { + super(topic, consumerPosition); + this.maxMessages = maxMessages; + } + + + protected void assignAndSeekForOffset(Consumer consumer) { + List partitions = getRequestedPartitions(consumer); + consumer.assign(partitions); + final Map offsets = + findOffsetsInt(consumer, consumerPosition.getSeekTo()); + offsets.forEach(consumer::seek); + } + + protected void assignAndSeekFromBeginning(Consumer consumer) { + List partitions = getRequestedPartitions(consumer); + consumer.assign(partitions); + final Map offsets = findOffsets(consumer, Map.of()); + offsets.forEach(consumer::seek); + } + + protected void assignAndSeekForTimestamp(Consumer consumer) { + Map timestampsToSearch = + consumerPosition.getSeekTo().entrySet().stream() + .collect(Collectors.toMap( + partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), + e -> e.getValue() + 1 + )); + Map offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) + .entrySet().stream() + .filter(e -> e.getValue() != null) + .map(v -> Tuples.of(v.getKey(), v.getValue().offset() - 1)) + .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); + + if (offsetsForTimestamps.isEmpty()) { + throw new IllegalArgumentException("No offsets were found for requested timestamps"); + } + + consumer.assign(offsetsForTimestamps.keySet()); + final Map offsets = findOffsets(consumer, offsetsForTimestamps); + offsets.forEach(consumer::seek); + } + + protected Map findOffsetsInt( + Consumer consumer, Map seekTo) { + + final Map seekMap = seekTo.entrySet() + .stream().map(p -> + Tuples.of( + new TopicPartition(topic, p.getKey()), + p.getValue() + ) + ).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); + + return findOffsets(consumer, seekMap); + } + + protected Map findOffsets( + Consumer consumer, Map seekTo) { + + List partitions = getRequestedPartitions(consumer); + final Map beginningOffsets = consumer.beginningOffsets(partitions); + final Map endOffsets = consumer.endOffsets(partitions); + + final Map seekMap = new HashMap<>(seekTo); + int awaitingMessages = maxMessages; + + Set waiting = new HashSet<>(partitions); + + while (awaitingMessages > 0 && !waiting.isEmpty()) { + final int msgsPerPartition = (int) Math.ceil((double) awaitingMessages / partitions.size()); + for (TopicPartition partition : partitions) { + final Long offset = Optional.ofNullable(seekMap.get(partition)) + .orElseGet(() -> endOffsets.get(partition)); + final Long beginning = beginningOffsets.get(partition); + + if (offset - beginning > msgsPerPartition) { + seekMap.put(partition, offset - msgsPerPartition); + awaitingMessages -= msgsPerPartition; + } else { + final long num = offset - beginning; + if (num > 0) { + seekMap.put(partition, offset - num); + awaitingMessages -= num; + } else { + waiting.remove(partition); + } + } + + if (awaitingMessages <= 0) { + break; + } + } + } + + return seekMap; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java new file mode 100644 index 0000000000..0a31cc7753 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java @@ -0,0 +1,59 @@ +package com.provectus.kafka.ui.util; + +import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.SeekType; +import com.provectus.kafka.ui.service.ConsumingService; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; + +@Log4j2 +public class OffsetsSeekForward extends OffsetsSeek { + + public OffsetsSeekForward(String topic, ConsumerPosition consumerPosition) { + super(topic, consumerPosition); + } + + protected void assignAndSeekForOffset(Consumer consumer) { + List partitions = getRequestedPartitions(consumer); + consumer.assign(partitions); + consumerPosition.getSeekTo().forEach((partition, offset) -> { + TopicPartition topicPartition = new TopicPartition(topic, partition); + consumer.seek(topicPartition, offset); + }); + } + + protected void assignAndSeekForTimestamp(Consumer consumer) { + Map timestampsToSearch = + consumerPosition.getSeekTo().entrySet().stream() + .collect(Collectors.toMap( + partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), + Map.Entry::getValue + )); + Map offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) + .entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + + if (offsetsForTimestamps.isEmpty()) { + throw new IllegalArgumentException("No offsets were found for requested timestamps"); + } + + consumer.assign(offsetsForTimestamps.keySet()); + offsetsForTimestamps.forEach(consumer::seek); + } + + protected void assignAndSeekFromBeginning(Consumer consumer) { + List partitions = getRequestedPartitions(consumer); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java index 11af012277..358b818ccc 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java @@ -1,13 +1,14 @@ package com.provectus.kafka.ui.service; -import static com.provectus.kafka.ui.service.ConsumingService.OffsetsSeek; import static com.provectus.kafka.ui.service.ConsumingService.RecordEmitter; import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.AbstractBaseTest; import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.SeekDirection; import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.producer.KafkaTestProducer; +import com.provectus.kafka.ui.util.OffsetsSeekForward; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -65,7 +66,10 @@ class RecordEmitterTest extends AbstractBaseTest { void pollNothingOnEmptyTopic() { var emitter = new RecordEmitter( this::createConsumer, - new OffsetsSeek(EMPTY_TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of()))); + new OffsetsSeekForward(EMPTY_TOPIC, + new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD) + ) + ); Long polledValues = Flux.create(emitter) .limitRequest(100) @@ -79,7 +83,10 @@ class RecordEmitterTest extends AbstractBaseTest { void pollFullTopicFromBeginning() { var emitter = new RecordEmitter( this::createConsumer, - new OffsetsSeek(TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of()))); + new OffsetsSeekForward(TOPIC, + new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD) + ) + ); var polledValues = Flux.create(emitter) .map(this::deserialize) @@ -101,7 +108,10 @@ class RecordEmitterTest extends AbstractBaseTest { var emitter = new RecordEmitter( this::createConsumer, - new OffsetsSeek(TOPIC, new ConsumerPosition(SeekType.OFFSET, targetOffsets))); + new OffsetsSeekForward(TOPIC, + new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.FORWARD) + ) + ); var polledValues = Flux.create(emitter) .map(this::deserialize) @@ -127,7 +137,10 @@ class RecordEmitterTest extends AbstractBaseTest { var emitter = new RecordEmitter( this::createConsumer, - new OffsetsSeek(TOPIC, new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps))); + new OffsetsSeekForward(TOPIC, + new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps, SeekDirection.FORWARD) + ) + ); var polledValues = Flux.create(emitter) .map(this::deserialize) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java similarity index 58% rename from kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java rename to kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java index 550b2ce558..302c2a6b58 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java @@ -1,8 +1,9 @@ -package com.provectus.kafka.ui.service; +package com.provectus.kafka.ui.util; import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.SeekDirection; import com.provectus.kafka.ui.model.SeekType; import java.util.List; import java.util.Map; @@ -51,10 +52,16 @@ class OffsetsSeekTest { } @Test - void seekToBeginningAllPartitions() { - var seek = new ConsumingService.OffsetsSeek( + void forwardSeekToBeginningAllPartitions() { + var seek = new OffsetsSeekForward( topic, - new ConsumerPosition(SeekType.BEGINNING, Map.of(0, 0L, 1, 0L))); + new ConsumerPosition( + SeekType.BEGINNING, + Map.of(0, 0L, 1, 0L), + SeekDirection.FORWARD + ) + ); + seek.assignAndSeek(consumer); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1); assertThat(consumer.position(tp0)).isZero(); @@ -62,10 +69,28 @@ class OffsetsSeekTest { } @Test - void seekToBeginningWithPartitionsList() { - var seek = new ConsumingService.OffsetsSeek( + void backwardSeekToBeginningAllPartitions() { + var seek = new OffsetsSeekBackward( topic, - new ConsumerPosition(SeekType.BEGINNING, Map.of())); + new ConsumerPosition( + SeekType.BEGINNING, + Map.of(2, 0L, 3, 0L), + SeekDirection.BACKWARD + ), + 10 + ); + + seek.assignAndSeek(consumer); + assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2, tp3); + assertThat(consumer.position(tp2)).isEqualTo(15L); + assertThat(consumer.position(tp3)).isEqualTo(25L); + } + + @Test + void forwardSeekToBeginningWithPartitionsList() { + var seek = new OffsetsSeekForward( + topic, + new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD)); seek.assignAndSeek(consumer); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3); assertThat(consumer.position(tp0)).isZero(); @@ -75,10 +100,31 @@ class OffsetsSeekTest { } @Test - void seekToOffset() { - var seek = new ConsumingService.OffsetsSeek( + void backwardSeekToBeginningWithPartitionsList() { + var seek = new OffsetsSeekBackward( topic, - new ConsumerPosition(SeekType.OFFSET, Map.of(0, 0L, 1, 1L, 2, 2L))); + new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.BACKWARD), + 10 + ); + seek.assignAndSeek(consumer); + assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3); + assertThat(consumer.position(tp0)).isZero(); + assertThat(consumer.position(tp1)).isEqualTo(10L); + assertThat(consumer.position(tp2)).isEqualTo(15L); + assertThat(consumer.position(tp3)).isEqualTo(25L); + } + + + @Test + void forwardSeekToOffset() { + var seek = new OffsetsSeekForward( + topic, + new ConsumerPosition( + SeekType.OFFSET, + Map.of(0, 0L, 1, 1L, 2, 2L), + SeekDirection.FORWARD + ) + ); seek.assignAndSeek(consumer); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2); assertThat(consumer.position(tp0)).isZero(); @@ -86,15 +132,34 @@ class OffsetsSeekTest { assertThat(consumer.position(tp2)).isEqualTo(2L); } + @Test + void backwardSeekToOffset() { + var seek = new OffsetsSeekBackward( + topic, + new ConsumerPosition( + SeekType.OFFSET, + Map.of(0, 0L, 1, 1L, 2, 2L), + SeekDirection.FORWARD + ), + 2 + ); + seek.assignAndSeek(consumer); + assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2); + assertThat(consumer.position(tp0)).isZero(); + assertThat(consumer.position(tp1)).isEqualTo(1L); + assertThat(consumer.position(tp2)).isEqualTo(0L); + } + + @Nested class WaitingOffsetsTest { - ConsumingService.OffsetsSeek.WaitingOffsets offsets; + OffsetsSeekForward.WaitingOffsets offsets; @BeforeEach void assignAndCreateOffsets() { consumer.assign(List.of(tp0, tp1, tp2, tp3)); - offsets = new ConsumingService.OffsetsSeek.WaitingOffsets(topic, consumer); + offsets = new OffsetsSeek.WaitingOffsets(topic, consumer); } @Test diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index e04566b7ed..f77b6b1881 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -323,6 +323,10 @@ paths: in: query schema: type: string + - name: seekDirection + in: query + schema: + $ref: "#/components/schemas/SeekDirection" responses: 200: description: OK @@ -1448,6 +1452,13 @@ components: - OFFSET - TIMESTAMP + SeekDirection: + type: string + enum: + - FORWARD + - BACKWARD + default: FORWARD + Partition: type: object properties: