From 97ec512b0027d626d05350bc00c01e61d272b9aa Mon Sep 17 00:00:00 2001 From: German Osin Date: Tue, 29 Jun 2021 09:52:18 +0300 Subject: [PATCH] #122 Fix emitter to consume records in right order (#598) * #122 Fix emitter to consume records in right order * Fixed naming --- .../kafka/ui/config/CustomWebFilter.java | 1 - .../ui/controller/MessagesController.java | 14 +- .../ui/emitter/BackwardRecordEmitter.java | 94 ++++++++ .../ui/emitter/ForwardRecordEmitter.java | 49 ++++ .../kafka/ui/model/ConsumerPosition.java | 8 +- .../schemaregistry/JsonMessageReader.java | 1 - .../serde/schemaregistry/MessageReader.java | 4 +- .../schemaregistry/ProtobufMessageReader.java | 1 - .../kafka/ui/service/ConsumingService.java | 79 ++----- .../kafka/ui/service/KafkaService.java | 11 +- .../provectus/kafka/ui/util/OffsetsSeek.java | 103 ++++++--- .../kafka/ui/util/OffsetsSeekBackward.java | 117 +++++----- .../kafka/ui/util/OffsetsSeekForward.java | 55 +++-- .../ui/util/jsonschema/EnumJsonType.java | 2 +- .../kafka/ui/util/jsonschema/JsonSchema.java | 1 - .../src/main/resources/application-sdp.yml | 6 +- .../provectus/kafka/ui/AbstractBaseTest.java | 4 +- ...ests.java => KafkaConsumerGroupTests.java} | 2 +- .../kafka/ui/service/RecordEmitterTest.java | 211 +++++++++++++++--- .../kafka/ui/util/OffsetsSeekTest.java | 58 +++-- 20 files changed, 575 insertions(+), 246 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java rename kafka-ui-api/src/test/java/com/provectus/kafka/ui/{KakfaConsumerGroupTests.java => KafkaConsumerGroupTests.java} (98%) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java index 5efee1db8d..6dce3b5e01 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java @@ -1,6 +1,5 @@ package com.provectus.kafka.ui.config; -import java.util.Optional; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index 4b2c086d4e..830168559c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -16,6 +16,7 @@ import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.common.TopicPartition; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; @@ -45,7 +46,7 @@ public class MessagesController implements MessagesApi { String clusterName, String topicName, @Valid SeekType seekType, @Valid List seekTo, @Valid Integer limit, @Valid String q, @Valid SeekDirection seekDirection, ServerWebExchange exchange) { - return parseConsumerPosition(seekType, seekTo, seekDirection) + return parseConsumerPosition(topicName, seekType, seekTo, seekDirection) .map(consumerPosition -> ResponseEntity .ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit))); } @@ -68,18 +69,21 @@ public class MessagesController implements MessagesApi { private Mono parseConsumerPosition( - SeekType seekType, List seekTo, SeekDirection seekDirection) { + String topicName, SeekType seekType, List seekTo, SeekDirection seekDirection) { return Mono.justOrEmpty(seekTo) .defaultIfEmpty(Collections.emptyList()) .flatMapIterable(Function.identity()) .map(p -> { - String[] splited = p.split("::"); - if (splited.length != 2) { + String[] split = p.split("::"); + if (split.length != 2) { throw new IllegalArgumentException( "Wrong seekTo argument format. See API docs for details"); } - return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1])); + return Pair.of( + new TopicPartition(topicName, Integer.parseInt(split[0])), + Long.parseLong(split[1]) + ); }) .collectMap(Pair::getKey, Pair::getValue) .map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java new file mode 100644 index 0000000000..ee407a167f --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java @@ -0,0 +1,94 @@ +package com.provectus.kafka.ui.emitter; + +import com.provectus.kafka.ui.util.OffsetsSeekBackward; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import reactor.core.publisher.FluxSink; + +@RequiredArgsConstructor +@Log4j2 +public class BackwardRecordEmitter + implements java.util.function.Consumer>> { + + private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); + + private final Function, KafkaConsumer> consumerSupplier; + private final OffsetsSeekBackward offsetsSeek; + + @Override + public void accept(FluxSink> sink) { + try (KafkaConsumer configConsumer = consumerSupplier.apply(Map.of())) { + final List requestedPartitions = + offsetsSeek.getRequestedPartitions(configConsumer); + final int msgsPerPartition = offsetsSeek.msgsPerPartition(requestedPartitions.size()); + try (KafkaConsumer consumer = + consumerSupplier.apply( + Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, msgsPerPartition) + ) + ) { + final Map partitionsOffsets = + offsetsSeek.getPartitionsOffsets(consumer); + log.info("partition offsets: {}", partitionsOffsets); + var waitingOffsets = + offsetsSeek.waitingOffsets(consumer, partitionsOffsets.keySet()); + log.info("waittin offsets {} {}", + waitingOffsets.getBeginOffsets(), + waitingOffsets.getEndOffsets() + ); + while (!sink.isCancelled() && !waitingOffsets.beginReached()) { + for (Map.Entry entry : partitionsOffsets.entrySet()) { + final Long lowest = waitingOffsets.getBeginOffsets().get(entry.getKey().partition()); + consumer.assign(Collections.singleton(entry.getKey())); + final long offset = Math.max(lowest, entry.getValue() - msgsPerPartition); + log.info("Polling {} from {}", entry.getKey(), offset); + consumer.seek(entry.getKey(), offset); + ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); + final List> partitionRecords = + records.records(entry.getKey()).stream() + .filter(r -> r.offset() < partitionsOffsets.get(entry.getKey())) + .collect(Collectors.toList()); + Collections.reverse(partitionRecords); + + log.info("{} records polled", records.count()); + log.info("{} records sent", partitionRecords.size()); + for (ConsumerRecord msg : partitionRecords) { + if (!sink.isCancelled() && !waitingOffsets.beginReached()) { + sink.next(msg); + waitingOffsets.markPolled(msg); + } else { + log.info("Begin reached"); + break; + } + } + partitionsOffsets.put( + entry.getKey(), + Math.max(offset, entry.getValue() - msgsPerPartition) + ); + } + if (waitingOffsets.beginReached()) { + log.info("begin reached after partitions"); + } else if (sink.isCancelled()) { + log.info("sink is cancelled after partitions"); + } + } + sink.complete(); + log.info("Polling finished"); + } + } catch (Exception e) { + log.error("Error occurred while consuming records", e); + sink.error(e); + } + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java new file mode 100644 index 0000000000..2b007dcc70 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java @@ -0,0 +1,49 @@ +package com.provectus.kafka.ui.emitter; + +import com.provectus.kafka.ui.util.OffsetsSeek; +import java.time.Duration; +import java.util.function.Supplier; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.utils.Bytes; +import reactor.core.publisher.FluxSink; + +@RequiredArgsConstructor +@Log4j2 +public class ForwardRecordEmitter + implements java.util.function.Consumer>> { + + private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); + + private final Supplier> consumerSupplier; + private final OffsetsSeek offsetsSeek; + + @Override + public void accept(FluxSink> sink) { + try (KafkaConsumer consumer = consumerSupplier.get()) { + var waitingOffsets = offsetsSeek.assignAndSeek(consumer); + while (!sink.isCancelled() && !waitingOffsets.endReached()) { + ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); + log.info("{} records polled", records.count()); + + for (ConsumerRecord msg : records) { + if (!sink.isCancelled() && !waitingOffsets.endReached()) { + sink.next(msg); + waitingOffsets.markPolled(msg); + } else { + break; + } + } + + } + sink.complete(); + log.info("Polling finished"); + } catch (Exception e) { + log.error("Error occurred while consuming records", e); + sink.error(e); + } + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java index a867002b34..d90b6d66bc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java @@ -2,11 +2,11 @@ package com.provectus.kafka.ui.model; import java.util.Map; import lombok.Value; +import org.apache.kafka.common.TopicPartition; @Value public class ConsumerPosition { - - private SeekType seekType; - private Map seekTo; - private SeekDirection seekDirection; + SeekType seekType; + Map seekTo; + SeekDirection seekDirection; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonMessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonMessageReader.java index 4680e9dbcd..36c798f4b5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonMessageReader.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonMessageReader.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import java.io.IOException; import lombok.SneakyThrows; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java index 16862feb91..630359739e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java @@ -3,7 +3,6 @@ package com.provectus.kafka.ui.serde.schemaregistry; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import java.io.IOException; import org.apache.kafka.common.serialization.Serializer; @@ -12,8 +11,7 @@ public abstract class MessageReader { protected final Serializer serializer; protected final String topic; protected final boolean isKey; - - private ParsedSchema schema; + private final ParsedSchema schema; protected MessageReader(String topic, boolean isKey, SchemaRegistryClient client, SchemaMetadata schema) throws IOException, RestClientException { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java index fb067f3d5d..97a0f9ce75 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java @@ -6,7 +6,6 @@ import com.google.protobuf.util.JsonFormat; import io.confluent.kafka.schemaregistry.ParsedSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java index 86b137e9ca..002283fbb2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java @@ -2,6 +2,8 @@ package com.provectus.kafka.ui.service; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; +import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.SeekDirection; @@ -9,25 +11,19 @@ import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.serde.DeserializationService; import com.provectus.kafka.ui.serde.RecordSerDe; import com.provectus.kafka.ui.util.ClusterUtil; -import com.provectus.kafka.ui.util.OffsetsSeek; import com.provectus.kafka.ui.util.OffsetsSeekBackward; import com.provectus.kafka.ui.util.OffsetsSeekForward; -import java.time.Duration; import java.util.Collection; -import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; @@ -55,12 +51,19 @@ public class ConsumingService { int recordsLimit = Optional.ofNullable(limit) .map(s -> Math.min(s, MAX_RECORD_LIMIT)) .orElse(DEFAULT_RECORD_LIMIT); - RecordEmitter emitter = new RecordEmitter( - () -> kafkaService.createConsumer(cluster), - consumerPosition.getSeekDirection().equals(SeekDirection.FORWARD) - ? new OffsetsSeekForward(topic, consumerPosition) - : new OffsetsSeekBackward(topic, consumerPosition, recordsLimit) - ); + + java.util.function.Consumer>> emitter; + if (consumerPosition.getSeekDirection().equals(SeekDirection.FORWARD)) { + emitter = new ForwardRecordEmitter( + () -> kafkaService.createConsumer(cluster), + new OffsetsSeekForward(topic, consumerPosition) + ); + } else { + emitter = new BackwardRecordEmitter( + (Map props) -> kafkaService.createConsumer(cluster, props), + new OffsetsSeekBackward(topic, consumerPosition, recordsLimit) + ); + } RecordSerDe recordDeserializer = deserializationService.getRecordDeserializerForCluster(cluster); return Flux.create(emitter) @@ -132,56 +135,4 @@ public class ConsumingService { return false; } - @RequiredArgsConstructor - static class RecordEmitter - implements java.util.function.Consumer>> { - - private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); - - private static final Comparator> PARTITION_COMPARING = - Comparator.comparing( - ConsumerRecord::partition, - Comparator.nullsFirst(Comparator.naturalOrder()) - ); - private static final Comparator> REVERED_COMPARING = - PARTITION_COMPARING.thenComparing(ConsumerRecord::offset).reversed(); - - - private final Supplier> consumerSupplier; - private final OffsetsSeek offsetsSeek; - - @Override - public void accept(FluxSink> sink) { - try (KafkaConsumer consumer = consumerSupplier.get()) { - var waitingOffsets = offsetsSeek.assignAndSeek(consumer); - while (!sink.isCancelled() && !waitingOffsets.endReached()) { - ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); - log.info("{} records polled", records.count()); - - final Iterable> iterable; - if (offsetsSeek.getConsumerPosition().getSeekDirection().equals(SeekDirection.FORWARD)) { - iterable = records; - } else { - iterable = StreamSupport.stream(records.spliterator(), false) - .sorted(REVERED_COMPARING).collect(Collectors.toList()); - } - - for (ConsumerRecord msg : iterable) { - if (!sink.isCancelled() && !waitingOffsets.endReached()) { - sink.next(msg); - waitingOffsets.markPolled(msg); - } else { - break; - } - } - } - sink.complete(); - log.info("Polling finished"); - } catch (Exception e) { - log.error("Error occurred while consuming records", e); - throw new RuntimeException(e); - } - } - } - } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index 35449213a0..0bc7d7ea66 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -31,6 +31,7 @@ import java.util.LongSummaryStatistics; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -376,13 +377,19 @@ public class KafkaService { } public KafkaConsumer createConsumer(KafkaCluster cluster) { + return createConsumer(cluster, Map.of()); + } + + public KafkaConsumer createConsumer(KafkaCluster cluster, + Map properties) { Properties props = new Properties(); props.putAll(cluster.getProperties()); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui"); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID().toString()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.putAll(properties); return new KafkaConsumer<>(props); } @@ -496,7 +503,7 @@ public class KafkaService { final Map brokerStats = topicPartitions.stream().collect( Collectors.groupingBy( - t -> t.getT1(), + Tuple2::getT1, Collectors.summarizingLong(Tuple3::getT3) ) ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java index bf16d44bb1..17a496d53b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java @@ -2,8 +2,7 @@ package com.provectus.kafka.ui.util; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.SeekType; -import com.provectus.kafka.ui.service.ConsumingService; -import java.util.HashMap; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -12,6 +11,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; @Log4j2 public abstract class OffsetsSeek { @@ -27,62 +28,114 @@ public abstract class OffsetsSeek { return consumerPosition; } - public WaitingOffsets assignAndSeek(Consumer consumer) { + public Map getPartitionsOffsets(Consumer consumer) { SeekType seekType = consumerPosition.getSeekType(); + List partitions = getRequestedPartitions(consumer); log.info("Positioning consumer for topic {} with {}", topic, consumerPosition); + Map offsets; switch (seekType) { case OFFSET: - assignAndSeekForOffset(consumer); + offsets = offsetsFromPositions(consumer, partitions); break; case TIMESTAMP: - assignAndSeekForTimestamp(consumer); + offsets = offsetsForTimestamp(consumer); break; case BEGINNING: - assignAndSeekFromBeginning(consumer); + offsets = offsetsFromBeginning(consumer, partitions); break; default: throw new IllegalArgumentException("Unknown seekType: " + seekType); } - log.info("Assignment: {}", consumer.assignment()); - return new WaitingOffsets(topic, consumer); + return offsets; } - protected List getRequestedPartitions(Consumer consumer) { - Map partitionPositions = consumerPosition.getSeekTo(); + public WaitingOffsets waitingOffsets(Consumer consumer, + Collection partitions) { + return new WaitingOffsets(topic, consumer, partitions); + } + + public WaitingOffsets assignAndSeek(Consumer consumer) { + final Map partitionsOffsets = getPartitionsOffsets(consumer); + consumer.assign(partitionsOffsets.keySet()); + partitionsOffsets.forEach(consumer::seek); + log.info("Assignment: {}", consumer.assignment()); + return waitingOffsets(consumer, partitionsOffsets.keySet()); + } + + + public List getRequestedPartitions(Consumer consumer) { + Map partitionPositions = consumerPosition.getSeekTo(); return consumer.partitionsFor(topic).stream() .filter( - p -> partitionPositions.isEmpty() || partitionPositions.containsKey(p.partition())) - .map(p -> new TopicPartition(p.topic(), p.partition())) + p -> partitionPositions.isEmpty() + || partitionPositions.containsKey(new TopicPartition(p.topic(), p.partition())) + ).map(p -> new TopicPartition(p.topic(), p.partition())) .collect(Collectors.toList()); } - protected abstract void assignAndSeekFromBeginning(Consumer consumer); + protected abstract Map offsetsFromBeginning( + Consumer consumer, List partitions); - protected abstract void assignAndSeekForTimestamp(Consumer consumer); + protected abstract Map offsetsForTimestamp( + Consumer consumer); - protected abstract void assignAndSeekForOffset(Consumer consumer); + protected abstract Map offsetsFromPositions( + Consumer consumer, List partitions); public static class WaitingOffsets { - final Map offsets = new HashMap<>(); // partition number -> offset + private final Map endOffsets; // partition number -> offset + private final Map beginOffsets; // partition number -> offset + private final String topic; - public WaitingOffsets(String topic, Consumer consumer) { - var partitions = consumer.assignment().stream() - .map(TopicPartition::partition) + public WaitingOffsets(String topic, Consumer consumer, + Collection partitions) { + this.topic = topic; + var allBeginningOffsets = consumer.beginningOffsets(partitions); + var allEndOffsets = consumer.endOffsets(partitions); + + this.endOffsets = allEndOffsets.entrySet().stream() + .filter(entry -> !allBeginningOffsets.get(entry.getKey()).equals(entry.getValue())) + .map(e -> Tuples.of(e.getKey().partition(), e.getValue() - 1)) + .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); + + this.beginOffsets = this.endOffsets.keySet().stream() + .map(p -> Tuples.of(p, allBeginningOffsets.get(new TopicPartition(topic, p)))) + .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); + } + + public List topicPartitions() { + return this.endOffsets.keySet().stream() + .map(p -> new TopicPartition(topic, p)) .collect(Collectors.toList()); - ConsumingService.significantOffsets(consumer, topic, partitions) - .forEach((tp, offset) -> offsets.put(tp.partition(), offset - 1)); } public void markPolled(ConsumerRecord rec) { - Long waiting = offsets.get(rec.partition()); - if (waiting != null && waiting <= rec.offset()) { - offsets.remove(rec.partition()); + Long endWaiting = endOffsets.get(rec.partition()); + if (endWaiting != null && endWaiting <= rec.offset()) { + endOffsets.remove(rec.partition()); } + Long beginWaiting = beginOffsets.get(rec.partition()); + if (beginWaiting != null && beginWaiting >= rec.offset()) { + beginOffsets.remove(rec.partition()); + } + } public boolean endReached() { - return offsets.isEmpty(); + return endOffsets.isEmpty(); + } + + public boolean beginReached() { + return beginOffsets.isEmpty(); + } + + public Map getEndOffsets() { + return endOffsets; + } + + public Map getBeginOffsets() { + return beginOffsets; } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java index 7500ab0bbb..674562a0b7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java @@ -1,11 +1,11 @@ package com.provectus.kafka.ui.util; import com.provectus.kafka.ui.model.ConsumerPosition; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; @@ -26,96 +26,95 @@ public class OffsetsSeekBackward extends OffsetsSeek { this.maxMessages = maxMessages; } - - protected void assignAndSeekForOffset(Consumer consumer) { - List partitions = getRequestedPartitions(consumer); - consumer.assign(partitions); - final Map offsets = - findOffsetsInt(consumer, consumerPosition.getSeekTo()); - offsets.forEach(consumer::seek); + public int msgsPerPartition(int partitionsSize) { + return msgsPerPartition(maxMessages, partitionsSize); } - protected void assignAndSeekFromBeginning(Consumer consumer) { - List partitions = getRequestedPartitions(consumer); - consumer.assign(partitions); - final Map offsets = findOffsets(consumer, Map.of()); - offsets.forEach(consumer::seek); + public int msgsPerPartition(long awaitingMessages, int partitionsSize) { + return (int) Math.ceil((double) awaitingMessages / partitionsSize); } - protected void assignAndSeekForTimestamp(Consumer consumer) { + + protected Map offsetsFromPositions(Consumer consumer, + List partitions) { + + return findOffsetsInt(consumer, consumerPosition.getSeekTo(), partitions); + } + + protected Map offsetsFromBeginning(Consumer consumer, + List partitions) { + return findOffsets(consumer, Map.of(), partitions); + } + + protected Map offsetsForTimestamp(Consumer consumer) { Map timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() .collect(Collectors.toMap( - partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), - e -> e.getValue() + 1 + Map.Entry::getKey, + e -> e.getValue() )); Map offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) .entrySet().stream() .filter(e -> e.getValue() != null) - .map(v -> Tuples.of(v.getKey(), v.getValue().offset() - 1)) + .map(v -> Tuples.of(v.getKey(), v.getValue().offset())) .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); if (offsetsForTimestamps.isEmpty()) { throw new IllegalArgumentException("No offsets were found for requested timestamps"); } - consumer.assign(offsetsForTimestamps.keySet()); - final Map offsets = findOffsets(consumer, offsetsForTimestamps); - offsets.forEach(consumer::seek); + log.info("Timestamps: {} to offsets: {}", timestampsToSearch, offsetsForTimestamps); + + return findOffsets(consumer, offsetsForTimestamps, offsetsForTimestamps.keySet()); } protected Map findOffsetsInt( - Consumer consumer, Map seekTo) { - - final Map seekMap = seekTo.entrySet() - .stream().map(p -> - Tuples.of( - new TopicPartition(topic, p.getKey()), - p.getValue() - ) - ).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)); - - return findOffsets(consumer, seekMap); + Consumer consumer, Map seekTo, + List partitions) { + return findOffsets(consumer, seekTo, partitions); } protected Map findOffsets( - Consumer consumer, Map seekTo) { + Consumer consumer, Map seekTo, + Collection partitions) { - List partitions = getRequestedPartitions(consumer); final Map beginningOffsets = consumer.beginningOffsets(partitions); final Map endOffsets = consumer.endOffsets(partitions); - final Map seekMap = new HashMap<>(seekTo); - int awaitingMessages = maxMessages; + final Map seekMap = new HashMap<>(); + final Set emptyPartitions = new HashSet<>(); + + for (Map.Entry entry : seekTo.entrySet()) { + final Long endOffset = endOffsets.get(entry.getKey()); + final Long beginningOffset = beginningOffsets.get(entry.getKey()); + if (beginningOffset != null + && endOffset != null + && beginningOffset < endOffset + && entry.getValue() > beginningOffset + ) { + final Long value; + if (entry.getValue() > endOffset) { + value = endOffset; + } else { + value = entry.getValue(); + } + + seekMap.put(entry.getKey(), value); + } else { + emptyPartitions.add(entry.getKey()); + } + } Set waiting = new HashSet<>(partitions); + waiting.removeAll(emptyPartitions); + waiting.removeAll(seekMap.keySet()); - while (awaitingMessages > 0 && !waiting.isEmpty()) { - final int msgsPerPartition = (int) Math.ceil((double) awaitingMessages / partitions.size()); - for (TopicPartition partition : partitions) { - final Long offset = Optional.ofNullable(seekMap.get(partition)) - .orElseGet(() -> endOffsets.get(partition)); - final Long beginning = beginningOffsets.get(partition); - - if (offset - beginning > msgsPerPartition) { - seekMap.put(partition, offset - msgsPerPartition); - awaitingMessages -= msgsPerPartition; - } else { - final long num = offset - beginning; - if (num > 0) { - seekMap.put(partition, offset - num); - awaitingMessages -= num; - } else { - waiting.remove(partition); - } - } - - if (awaitingMessages <= 0) { - break; - } - } + for (TopicPartition topicPartition : waiting) { + seekMap.put(topicPartition, endOffsets.get(topicPartition)); } return seekMap; } + + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java index 263293001d..b0e8117258 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java @@ -1,8 +1,10 @@ package com.provectus.kafka.ui.util; import com.provectus.kafka.ui.model.ConsumerPosition; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.consumer.Consumer; @@ -16,39 +18,44 @@ public class OffsetsSeekForward extends OffsetsSeek { super(topic, consumerPosition); } - protected void assignAndSeekForOffset(Consumer consumer) { - List partitions = getRequestedPartitions(consumer); - consumer.assign(partitions); - consumerPosition.getSeekTo().forEach((partition, offset) -> { - TopicPartition topicPartition = new TopicPartition(topic, partition); - consumer.seek(topicPartition, offset); - }); + protected Map offsetsFromPositions(Consumer consumer, + List partitions) { + final Map offsets = + offsetsFromBeginning(consumer, partitions); + + final Map endOffsets = consumer.endOffsets(offsets.keySet()); + final Set set = new HashSet<>(consumerPosition.getSeekTo().keySet()); + final Map collect = consumerPosition.getSeekTo().entrySet().stream() + .filter(e -> e.getValue() < endOffsets.get(e.getKey())) + .filter(e -> endOffsets.get(e.getKey()) > offsets.get(e.getKey())) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue + )); + offsets.putAll(collect); + set.removeAll(collect.keySet()); + set.forEach(offsets::remove); + + return offsets; } - protected void assignAndSeekForTimestamp(Consumer consumer) { - Map timestampsToSearch = - consumerPosition.getSeekTo().entrySet().stream() - .collect(Collectors.toMap( - partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), - Map.Entry::getValue - )); - Map offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) - .entrySet().stream() - .filter(e -> e.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + protected Map offsetsForTimestamp(Consumer consumer) { + Map offsetsForTimestamps = + consumer.offsetsForTimes(consumerPosition.getSeekTo()) + .entrySet().stream() + .filter(e -> e.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); if (offsetsForTimestamps.isEmpty()) { throw new IllegalArgumentException("No offsets were found for requested timestamps"); } - consumer.assign(offsetsForTimestamps.keySet()); - offsetsForTimestamps.forEach(consumer::seek); + return offsetsForTimestamps; } - protected void assignAndSeekFromBeginning(Consumer consumer) { - List partitions = getRequestedPartitions(consumer); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); + protected Map offsetsFromBeginning(Consumer consumer, + List partitions) { + return consumer.beginningOffsets(partitions); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java index b6816449da..13ac8c8b52 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java @@ -7,7 +7,7 @@ import java.util.Map; public class EnumJsonType extends JsonType { - private List values; + private final List values; public EnumJsonType(List values) { super(Type.ENUM); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonSchema.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonSchema.java index ac26f90260..b53d20cb59 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonSchema.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonSchema.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import java.net.URI; -import java.net.URISyntaxException; import java.util.List; import java.util.Map; import java.util.stream.Collectors; diff --git a/kafka-ui-api/src/main/resources/application-sdp.yml b/kafka-ui-api/src/main/resources/application-sdp.yml index 12503b7549..cde2ab2e59 100644 --- a/kafka-ui-api/src/main/resources/application-sdp.yml +++ b/kafka-ui-api/src/main/resources/application-sdp.yml @@ -1,9 +1,9 @@ kafka: clusters: - name: local - bootstrapServers: localhost:9093 - zookeeper: localhost:2181 - schemaRegistry: http://localhost:8083 + bootstrapServers: b-1.kad-msk.uxahxx.c6.kafka.eu-west-1.amazonaws.com:9092 +# zookeeper: localhost:2181 +# schemaRegistry: http://localhost:8083 # - # name: secondLocal # zookeeper: zookeeper1:2181 diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java index 6fd1b0fc5b..07ffe7009e 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java @@ -24,8 +24,8 @@ import org.testcontainers.utility.DockerImageName; @SpringBootTest @ActiveProfiles("test") public abstract class AbstractBaseTest { - public static String LOCAL = "local"; - public static String SECOND_LOCAL = "secondLocal"; + public static final String LOCAL = "local"; + public static final String SECOND_LOCAL = "secondLocal"; private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0"; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KakfaConsumerGroupTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerGroupTests.java similarity index 98% rename from kafka-ui-api/src/test/java/com/provectus/kafka/ui/KakfaConsumerGroupTests.java rename to kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerGroupTests.java index 423d441c33..9f7d744a84 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KakfaConsumerGroupTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerGroupTests.java @@ -20,7 +20,7 @@ import org.springframework.test.web.reactive.server.WebTestClient; @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) @Log4j2 @AutoConfigureWebTestClient(timeout = "10000") -public class KakfaConsumerGroupTests extends AbstractBaseTest { +public class KafkaConsumerGroupTests extends AbstractBaseTest { @Autowired WebTestClient webTestClient; diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java index 358b818ccc..65346a17a6 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java @@ -1,27 +1,33 @@ package com.provectus.kafka.ui.service; -import static com.provectus.kafka.ui.service.ConsumingService.RecordEmitter; import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.AbstractBaseTest; +import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; +import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.SeekDirection; import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.producer.KafkaTestProducer; +import com.provectus.kafka.ui.util.OffsetsSeekBackward; import com.provectus.kafka.ui.util.OffsetsSeekForward; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import lombok.Value; +import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Bytes; @@ -30,6 +36,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; +@Log4j2 class RecordEmitterTest extends AbstractBaseTest { static final int PARTITIONS = 5; @@ -50,7 +57,12 @@ class RecordEmitterTest extends AbstractBaseTest { var value = "msg_" + partition + "_" + i; var metadata = producer.send(new ProducerRecord<>(TOPIC, partition, ts, null, value)).get(); - SENT_RECORDS.add(new Record(value, metadata.partition(), metadata.offset(), ts)); + SENT_RECORDS.add(new Record( + value, + new TopicPartition(metadata.topic(), metadata.partition()), + metadata.offset(), + ts) + ); } } } @@ -64,31 +76,56 @@ class RecordEmitterTest extends AbstractBaseTest { @Test void pollNothingOnEmptyTopic() { - var emitter = new RecordEmitter( + var forwardEmitter = new ForwardRecordEmitter( this::createConsumer, new OffsetsSeekForward(EMPTY_TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD) ) ); - Long polledValues = Flux.create(emitter) + var backwardEmitter = new BackwardRecordEmitter( + this::createConsumer, + new OffsetsSeekBackward( + EMPTY_TOPIC, + new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.BACKWARD), + 100 + ) + ); + + Long polledValues = Flux.create(forwardEmitter) .limitRequest(100) .count() .block(); assertThat(polledValues).isZero(); + + polledValues = Flux.create(backwardEmitter) + .limitRequest(100) + .count() + .block(); + + assertThat(polledValues).isZero(); + } @Test void pollFullTopicFromBeginning() { - var emitter = new RecordEmitter( + var forwardEmitter = new ForwardRecordEmitter( this::createConsumer, new OffsetsSeekForward(TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD) ) ); - var polledValues = Flux.create(emitter) + var backwardEmitter = new BackwardRecordEmitter( + this::createConsumer, + new OffsetsSeekBackward(TOPIC, + new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD), + PARTITIONS * MSGS_PER_PARTITION + ) + ); + + var polledValues = Flux.create(forwardEmitter) .map(this::deserialize) .limitRequest(Long.MAX_VALUE) .collect(Collectors.toList()) @@ -96,76 +133,198 @@ class RecordEmitterTest extends AbstractBaseTest { assertThat(polledValues).containsExactlyInAnyOrderElementsOf( SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList())); + + polledValues = Flux.create(backwardEmitter) + .map(this::deserialize) + .limitRequest(Long.MAX_VALUE) + .collect(Collectors.toList()) + .block(); + + assertThat(polledValues).containsExactlyInAnyOrderElementsOf( + SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList())); + } @Test void pollWithOffsets() { - Map targetOffsets = new HashMap<>(); + Map targetOffsets = new HashMap<>(); for (int i = 0; i < PARTITIONS; i++) { long offset = ThreadLocalRandom.current().nextLong(MSGS_PER_PARTITION); - targetOffsets.put(i, offset); + targetOffsets.put(new TopicPartition(TOPIC, i), offset); } - var emitter = new RecordEmitter( + var forwardEmitter = new ForwardRecordEmitter( this::createConsumer, new OffsetsSeekForward(TOPIC, new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.FORWARD) ) ); - var polledValues = Flux.create(emitter) + var backwardEmitter = new BackwardRecordEmitter( + this::createConsumer, + new OffsetsSeekBackward(TOPIC, + new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.BACKWARD), + PARTITIONS * MSGS_PER_PARTITION + ) + ); + + var polledValues = Flux.create(forwardEmitter) .map(this::deserialize) .limitRequest(Long.MAX_VALUE) .collect(Collectors.toList()) .block(); var expectedValues = SENT_RECORDS.stream() - .filter(r -> r.getOffset() >= targetOffsets.get(r.getPartition())) + .filter(r -> r.getOffset() >= targetOffsets.get(r.getTp())) .map(Record::getValue) .collect(Collectors.toList()); assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); + + expectedValues = SENT_RECORDS.stream() + .filter(r -> r.getOffset() < targetOffsets.get(r.getTp())) + .map(Record::getValue) + .collect(Collectors.toList()); + + polledValues = Flux.create(backwardEmitter) + .map(this::deserialize) + .limitRequest(Long.MAX_VALUE) + .collect(Collectors.toList()) + .block(); + + assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); } @Test void pollWithTimestamps() { - Map targetTimestamps = new HashMap<>(); + Map targetTimestamps = new HashMap<>(); + final Map> perPartition = + SENT_RECORDS.stream().collect(Collectors.groupingBy((r) -> r.tp)); for (int i = 0; i < PARTITIONS; i++) { - int randRecordIdx = ThreadLocalRandom.current().nextInt(SENT_RECORDS.size()); - targetTimestamps.put(i, SENT_RECORDS.get(randRecordIdx).getTimestamp()); + final List records = perPartition.get(new TopicPartition(TOPIC, i)); + int randRecordIdx = ThreadLocalRandom.current().nextInt(records.size()); + log.info("partition: {} position: {}", i, randRecordIdx); + targetTimestamps.put( + new TopicPartition(TOPIC, i), + records.get(randRecordIdx).getTimestamp() + ); } - var emitter = new RecordEmitter( + var forwardEmitter = new ForwardRecordEmitter( this::createConsumer, new OffsetsSeekForward(TOPIC, new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps, SeekDirection.FORWARD) ) ); - var polledValues = Flux.create(emitter) + var backwardEmitter = new BackwardRecordEmitter( + this::createConsumer, + new OffsetsSeekBackward(TOPIC, + new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps, SeekDirection.BACKWARD), + PARTITIONS * MSGS_PER_PARTITION + ) + ); + + var polledValues = Flux.create(forwardEmitter) .map(this::deserialize) .limitRequest(Long.MAX_VALUE) .collect(Collectors.toList()) .block(); var expectedValues = SENT_RECORDS.stream() - .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getPartition())) + .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getTp())) .map(Record::getValue) .collect(Collectors.toList()); assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); + + polledValues = Flux.create(backwardEmitter) + .map(this::deserialize) + .limitRequest(Long.MAX_VALUE) + .collect(Collectors.toList()) + .block(); + + expectedValues = SENT_RECORDS.stream() + .filter(r -> r.getTimestamp() < targetTimestamps.get(r.getTp())) + .map(Record::getValue) + .collect(Collectors.toList()); + + assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); + + } + + @Test + void backwardEmitterSeekToEnd() { + final int numMessages = 100; + final Map targetOffsets = new HashMap<>(); + for (int i = 0; i < PARTITIONS; i++) { + targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION); + } + + var backwardEmitter = new BackwardRecordEmitter( + this::createConsumer, + new OffsetsSeekBackward(TOPIC, + new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.BACKWARD), + numMessages + ) + ); + + var polledValues = Flux.create(backwardEmitter) + .map(this::deserialize) + .limitRequest(numMessages) + .collect(Collectors.toList()) + .block(); + + var expectedValues = SENT_RECORDS.stream() + .filter(r -> r.getOffset() < targetOffsets.get(r.getTp())) + .filter(r -> r.getOffset() >= (targetOffsets.get(r.getTp()) - (100 / PARTITIONS))) + .map(Record::getValue) + .collect(Collectors.toList()); + + + assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); + } + + @Test + void backwardEmitterSeekToBegin() { + Map offsets = new HashMap<>(); + for (int i = 0; i < PARTITIONS; i++) { + offsets.put(new TopicPartition(TOPIC, i), 0L); + } + + var backwardEmitter = new BackwardRecordEmitter( + this::createConsumer, + new OffsetsSeekBackward(TOPIC, + new ConsumerPosition(SeekType.OFFSET, offsets, SeekDirection.BACKWARD), + 100 + ) + ); + + var polledValues = Flux.create(backwardEmitter) + .map(this::deserialize) + .limitRequest(Long.MAX_VALUE) + .collect(Collectors.toList()) + .block(); + + assertThat(polledValues).isEmpty(); } private KafkaConsumer createConsumer() { - return new KafkaConsumer<>( - Map.of( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), - ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(), - ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20, // to check multiple polls - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class - ) + return createConsumer(Map.of()); + } + + private KafkaConsumer createConsumer(Map properties) { + final Map map = Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(), + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20, // to check multiple polls + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class ); + Properties props = new Properties(); + props.putAll(map); + props.putAll(properties); + return new KafkaConsumer<>(props); } private String deserialize(ConsumerRecord rec) { @@ -175,7 +334,7 @@ class RecordEmitterTest extends AbstractBaseTest { @Value static class Record { String value; - int partition; + TopicPartition tp; long offset; long timestamp; } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java index d1303730e0..324b43fe49 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java @@ -21,11 +21,11 @@ import org.junit.jupiter.api.Test; class OffsetsSeekTest { - String topic = "test"; - TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0 - TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10 - TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20 - TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30 + final String topic = "test"; + final TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0 + final TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10 + final TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20 + final TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30 MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); @@ -57,7 +57,7 @@ class OffsetsSeekTest { topic, new ConsumerPosition( SeekType.BEGINNING, - Map.of(0, 0L, 1, 0L), + Map.of(tp0, 0L, tp1, 0L), SeekDirection.FORWARD ) ); @@ -74,7 +74,7 @@ class OffsetsSeekTest { topic, new ConsumerPosition( SeekType.BEGINNING, - Map.of(2, 0L, 3, 0L), + Map.of(tp2, 0L, tp3, 0L), SeekDirection.BACKWARD ), 10 @@ -82,8 +82,8 @@ class OffsetsSeekTest { seek.assignAndSeek(consumer); assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2, tp3); - assertThat(consumer.position(tp2)).isEqualTo(15L); - assertThat(consumer.position(tp3)).isEqualTo(25L); + assertThat(consumer.position(tp2)).isEqualTo(20L); + assertThat(consumer.position(tp3)).isEqualTo(30L); } @Test @@ -110,8 +110,8 @@ class OffsetsSeekTest { assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3); assertThat(consumer.position(tp0)).isZero(); assertThat(consumer.position(tp1)).isEqualTo(10L); - assertThat(consumer.position(tp2)).isEqualTo(15L); - assertThat(consumer.position(tp3)).isEqualTo(25L); + assertThat(consumer.position(tp2)).isEqualTo(20L); + assertThat(consumer.position(tp3)).isEqualTo(30L); } @@ -121,14 +121,12 @@ class OffsetsSeekTest { topic, new ConsumerPosition( SeekType.OFFSET, - Map.of(0, 0L, 1, 1L, 2, 2L), + Map.of(tp0, 0L, tp1, 1L, tp2, 2L), SeekDirection.FORWARD ) ); seek.assignAndSeek(consumer); - assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2); - assertThat(consumer.position(tp0)).isZero(); - assertThat(consumer.position(tp1)).isEqualTo(1L); + assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2); assertThat(consumer.position(tp2)).isEqualTo(2L); } @@ -138,16 +136,30 @@ class OffsetsSeekTest { topic, new ConsumerPosition( SeekType.OFFSET, - Map.of(0, 0L, 1, 1L, 2, 2L), - SeekDirection.FORWARD + Map.of(tp0, 0L, tp1, 1L, tp2, 20L), + SeekDirection.BACKWARD ), 2 ); seek.assignAndSeek(consumer); - assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2); - assertThat(consumer.position(tp0)).isZero(); - assertThat(consumer.position(tp1)).isEqualTo(1L); - assertThat(consumer.position(tp2)).isZero(); + assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2); + assertThat(consumer.position(tp2)).isEqualTo(20L); + } + + @Test + void backwardSeekToOffsetOnlyOnePartition() { + var seek = new OffsetsSeekBackward( + topic, + new ConsumerPosition( + SeekType.OFFSET, + Map.of(tp2, 20L), + SeekDirection.BACKWARD + ), + 20 + ); + seek.assignAndSeek(consumer); + assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2); + assertThat(consumer.position(tp2)).isEqualTo(20L); } @@ -159,14 +171,14 @@ class OffsetsSeekTest { @BeforeEach void assignAndCreateOffsets() { consumer.assign(List.of(tp0, tp1, tp2, tp3)); - offsets = new OffsetsSeek.WaitingOffsets(topic, consumer); + offsets = new OffsetsSeek.WaitingOffsets(topic, consumer, List.of(tp0, tp1, tp2, tp3)); } @Test void collectsSignificantOffsetsMinus1ForAssignedPartitions() { // offsets for partition 0 & 1 should be skipped because they // effectively contains no data (start offset = end offset) - assertThat(offsets.offsets).containsExactlyInAnyOrderEntriesOf( + assertThat(offsets.getEndOffsets()).containsExactlyInAnyOrderEntriesOf( Map.of(2, 19L, 3, 29L) ); }