diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 9142968eb6..3676d71b8c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -247,7 +247,7 @@ public class ClusterService { if (!cluster.getTopics().containsKey(topicName)) { throw new NotFoundException("No such topic"); } - return consumingService.loadOffsets(cluster, topicName, partitions) + return consumingService.offsetsForDeletion(cluster, topicName, partitions) .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets)); } -} +} \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java index f34f62dbbf..310292bc6e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java @@ -10,14 +10,18 @@ import com.provectus.kafka.ui.model.SeekType; import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.util.ClusterUtil; import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -47,30 +51,23 @@ public class ConsumingService { int recordsLimit = Optional.ofNullable(limit) .map(s -> Math.min(s, MAX_RECORD_LIMIT)) .orElse(DEFAULT_RECORD_LIMIT); - RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition); + RecordEmitter emitter = new RecordEmitter( + () -> kafkaService.createConsumer(cluster), + new OffsetsSeek(topic, consumerPosition)); RecordDeserializer recordDeserializer = deserializationService.getRecordDeserializerForCluster(cluster); - return Flux.create(emitter::emit) + return Flux.create(emitter) .subscribeOn(Schedulers.boundedElastic()) .map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer)) .filter(m -> filterTopicMessage(m, query)) .limitRequest(recordsLimit); } - public Mono> loadOffsets(KafkaCluster cluster, String topicName, - List partitionsToInclude) { + public Mono> offsetsForDeletion(KafkaCluster cluster, String topicName, + List partitionsToInclude) { return Mono.fromSupplier(() -> { try (KafkaConsumer consumer = kafkaService.createConsumer(cluster)) { - var partitions = consumer.partitionsFor(topicName).stream() - .filter( - p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition())) - .map(p -> new TopicPartition(topicName, p.partition())) - .collect(Collectors.toList()); - var beginningOffsets = consumer.beginningOffsets(partitions); - var endOffsets = consumer.endOffsets(partitions); - return endOffsets.entrySet().stream() - .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return significantOffsets(consumer, topicName, partitionsToInclude); } catch (Exception e) { log.error("Error occurred while consuming records", e); throw new RuntimeException(e); @@ -78,6 +75,25 @@ public class ConsumingService { }); } + /** + * returns end offsets for partitions where start offset != end offsets. + * This is useful when we need to verify that partition is not empty. + */ + private static Map significantOffsets(Consumer consumer, + String topicName, + Collection + partitionsToInclude) { + var partitions = consumer.partitionsFor(topicName).stream() + .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition())) + .map(p -> new TopicPartition(topicName, p.partition())) + .collect(Collectors.toList()); + var beginningOffsets = consumer.beginningOffsets(partitions); + var endOffsets = consumer.endOffsets(partitions); + return endOffsets.entrySet().stream() + .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + private boolean filterTopicMessage(TopicMessage message, String query) { if (StringUtils.isEmpty(query)) { return true; @@ -110,52 +126,48 @@ public class ConsumingService { } @RequiredArgsConstructor - private static class RecordEmitter { - private static final int MAX_EMPTY_POLLS_COUNT = 3; + static class RecordEmitter + implements java.util.function.Consumer>> { + private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L); - private final KafkaService kafkaService; - private final KafkaCluster cluster; - private final String topic; - private final ConsumerPosition consumerPosition; + private final Supplier> consumerSupplier; + private final OffsetsSeek offsetsSeek; - public void emit(FluxSink> sink) { - try (KafkaConsumer consumer = kafkaService.createConsumer(cluster)) { - assignAndSeek(consumer); - int emptyPollsCount = 0; - log.info("assignment: {}", consumer.assignment()); - while (!sink.isCancelled()) { + @Override + public void accept(FluxSink> sink) { + try (KafkaConsumer consumer = consumerSupplier.get()) { + var waitingOffsets = offsetsSeek.assignAndSeek(consumer); + while (!sink.isCancelled() && !waitingOffsets.endReached()) { ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS); log.info("{} records polled", records.count()); - if (records.count() == 0 && emptyPollsCount > MAX_EMPTY_POLLS_COUNT) { - break; - } else { - emptyPollsCount++; + for (ConsumerRecord record : records) { + if (!sink.isCancelled() && !waitingOffsets.endReached()) { + sink.next(record); + waitingOffsets.markPolled(record); + } else { + break; + } } - records.iterator() - .forEachRemaining(sink::next); } sink.complete(); + log.info("Polling finished"); } catch (Exception e) { log.error("Error occurred while consuming records", e); throw new RuntimeException(e); } } + } - private List getRequestedPartitions() { - Map partitionPositions = consumerPosition.getSeekTo(); + @RequiredArgsConstructor + static class OffsetsSeek { - return Optional.ofNullable(cluster.getTopics().get(topic)) - .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic)) - .getPartitions().values().stream() - .filter(internalPartition -> partitionPositions.isEmpty() - || partitionPositions.containsKey(internalPartition.getPartition())) - .map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition())) - .collect(Collectors.toList()); - } + private final String topic; + private final ConsumerPosition consumerPosition; - private void assignAndSeek(KafkaConsumer consumer) { + public WaitingOffsets assignAndSeek(Consumer consumer) { SeekType seekType = consumerPosition.getSeekType(); + log.info("Positioning consumer for topic {} with {}", topic, consumerPosition); switch (seekType) { case OFFSET: assignAndSeekForOffset(consumer); @@ -169,10 +181,21 @@ public class ConsumingService { default: throw new IllegalArgumentException("Unknown seekType: " + seekType); } + log.info("Assignment: {}", consumer.assignment()); + return new WaitingOffsets(topic, consumer); } - private void assignAndSeekForOffset(KafkaConsumer consumer) { - List partitions = getRequestedPartitions(); + private List getRequestedPartitions(Consumer consumer) { + Map partitionPositions = consumerPosition.getSeekTo(); + return consumer.partitionsFor(topic).stream() + .filter( + p -> partitionPositions.isEmpty() || partitionPositions.containsKey(p.partition())) + .map(p -> new TopicPartition(p.topic(), p.partition())) + .collect(Collectors.toList()); + } + + private void assignAndSeekForOffset(Consumer consumer) { + List partitions = getRequestedPartitions(consumer); consumer.assign(partitions); consumerPosition.getSeekTo().forEach((partition, offset) -> { TopicPartition topicPartition = new TopicPartition(topic, partition); @@ -180,7 +203,7 @@ public class ConsumingService { }); } - private void assignAndSeekForTimestamp(KafkaConsumer consumer) { + private void assignAndSeekForTimestamp(Consumer consumer) { Map timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() .collect(Collectors.toMap( @@ -200,10 +223,34 @@ public class ConsumingService { offsetsForTimestamps.forEach(consumer::seek); } - private void assignAndSeekFromBeginning(KafkaConsumer consumer) { - List partitions = getRequestedPartitions(); + private void assignAndSeekFromBeginning(Consumer consumer) { + List partitions = getRequestedPartitions(consumer); consumer.assign(partitions); consumer.seekToBeginning(partitions); } + + static class WaitingOffsets { + final Map offsets = new HashMap<>(); // partition number -> offset + + WaitingOffsets(String topic, Consumer consumer) { + var partitions = consumer.assignment().stream() + .map(TopicPartition::partition) + .collect(Collectors.toList()); + significantOffsets(consumer, topic, partitions) + .forEach((tp, offset) -> offsets.put(tp.partition(), offset - 1)); + } + + void markPolled(ConsumerRecord rec) { + Long waiting = offsets.get(rec.partition()); + if (waiting != null && waiting <= rec.offset()) { + offsets.remove(rec.partition()); + } + } + + boolean endReached() { + return offsets.isEmpty(); + } + } + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index 1e51a7e44e..e66b5cdf2f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -19,6 +19,7 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema; import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse; import java.util.Formatter; import java.util.Objects; +import java.util.Optional; import java.util.function.Function; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -121,14 +122,7 @@ public class SchemaRegistryService { */ @NotNull private SchemaSubject withSchemaType(SchemaSubject s) { - SchemaType schemaType = - Objects.nonNull(s.getSchemaType()) ? s.getSchemaType() : SchemaType.AVRO; - return new SchemaSubject() - .schema(s.getSchema()) - .subject(s.getSubject()) - .version(s.getVersion()) - .id(s.getId()) - .schemaType(schemaType); + return s.schemaType(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO)); } public Mono> deleteSchemaSubjectByVersion(String clusterName, diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java index 3bdca7ef21..6fd1b0fc5b 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java @@ -2,8 +2,14 @@ package com.provectus.kafka.ui; import com.provectus.kafka.ui.container.KafkaConnectContainer; import com.provectus.kafka.ui.container.SchemaRegistryContainer; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.function.ThrowingConsumer; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ConfigurableApplicationContext; @@ -13,25 +19,30 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.utility.DockerImageName; + @ExtendWith(SpringExtension.class) @SpringBootTest @ActiveProfiles("test") public abstract class AbstractBaseTest { + public static String LOCAL = "local"; + public static String SECOND_LOCAL = "secondLocal"; + private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0"; + public static final KafkaContainer kafka = new KafkaContainer( DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION)) .withNetwork(Network.SHARED); + public static final SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION) .withKafka(kafka) .dependsOn(kafka); + public static final KafkaConnectContainer kafkaConnect = new KafkaConnectContainer(CONFLUENT_PLATFORM_VERSION) .withKafka(kafka) .dependsOn(kafka) .dependsOn(schemaRegistry); - public static String LOCAL = "local"; - public static String SECOND_LOCAL = "secondLocal"; static { kafka.start(); @@ -57,4 +68,24 @@ public abstract class AbstractBaseTest { System.setProperty("kafka.clusters.1.kafkaConnect.0.address", kafkaConnect.getTarget()); } } + + public static void createTopic(NewTopic topic) { + withAdminClient(client -> client.createTopics(List.of(topic)).all().get()); + } + + public static void deleteTopic(String topic) { + withAdminClient(client -> client.deleteTopics(List.of(topic)).all().get()); + } + + private static void withAdminClient(ThrowingConsumer consumer) { + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + try (var client = AdminClient.create(properties)) { + try { + consumer.accept(client); + } catch (Throwable throwable) { + throw new RuntimeException(throwable); + } + } + } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java index 9d8f7ba7d4..eb59d4977a 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java @@ -1,9 +1,11 @@ package com.provectus.kafka.ui.producer; import java.util.Map; +import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.testcontainers.containers.KafkaContainer; @@ -23,8 +25,12 @@ public class KafkaTestProducer implements AutoCloseable { ))); } - public void send(String topic, ValueT value) { - producer.send(new ProducerRecord<>(topic, value)); + public Future send(String topic, ValueT value) { + return producer.send(new ProducerRecord<>(topic, value)); + } + + public Future send(ProducerRecord record) { + return producer.send(record); } @Override diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java new file mode 100644 index 0000000000..8f5ec97ecc --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java @@ -0,0 +1,119 @@ +package com.provectus.kafka.ui.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.SeekType; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class OffsetsSeekTest { + + String topic = "test"; + TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0 + TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10 + TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20 + TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30 + + MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + + @BeforeEach + void initConsumer() { + consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + consumer.updatePartitions( + topic, + Stream.of(tp0, tp1, tp2, tp3) + .map(tp -> new PartitionInfo(topic, tp.partition(), null, null, null, null)) + .collect(Collectors.toList())); + consumer.updateBeginningOffsets(Map.of( + tp0, 0L, + tp1, 10L, + tp2, 0L, + tp3, 25L + )); + consumer.addEndOffsets(Map.of( + tp0, 0L, + tp1, 10L, + tp2, 20L, + tp3, 30L + )); + } + + @Test + void seekToBeginningAllPartitions() { + var seek = new ConsumingService.OffsetsSeek( + topic, + new ConsumerPosition(SeekType.BEGINNING, Map.of(0, 0L, 1, 0L))); + seek.assignAndSeek(consumer); + assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1); + assertThat(consumer.position(tp0)).isEqualTo(0L); + assertThat(consumer.position(tp1)).isEqualTo(10L); + } + + @Test + void seekToBeginningWithPartitionsList() { + var seek = new ConsumingService.OffsetsSeek( + topic, + new ConsumerPosition(SeekType.BEGINNING, Map.of())); + seek.assignAndSeek(consumer); + assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3); + assertThat(consumer.position(tp0)).isEqualTo(0L); + assertThat(consumer.position(tp1)).isEqualTo(10L); + assertThat(consumer.position(tp2)).isEqualTo(0L); + assertThat(consumer.position(tp3)).isEqualTo(25L); + } + + @Test + void seekToOffset() { + var seek = new ConsumingService.OffsetsSeek( + topic, + new ConsumerPosition(SeekType.OFFSET, Map.of(0, 0L, 1, 1L, 2, 2L))); + seek.assignAndSeek(consumer); + assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2); + assertThat(consumer.position(tp0)).isEqualTo(0L); + assertThat(consumer.position(tp1)).isEqualTo(1L); + assertThat(consumer.position(tp2)).isEqualTo(2L); + } + + @Nested + class WaitingOffsetsTest { + + ConsumingService.OffsetsSeek.WaitingOffsets offsets; + + @BeforeEach + void assignAndCreateOffsets() { + consumer.assign(List.of(tp0, tp1, tp2, tp3)); + offsets = new ConsumingService.OffsetsSeek.WaitingOffsets(topic, consumer); + } + + @Test + void collectsSignificantOffsetsMinus1ForAssignedPartitions() { + // offsets for partition 0 & 1 should be skipped because they + // effectively contains no data (start offset = end offset) + assertThat(offsets.offsets).containsExactlyInAnyOrderEntriesOf( + Map.of(2, 19L, 3, 29L) + ); + } + + @Test + void returnTrueWhenOffsetsReachedReached() { + assertThat(offsets.endReached()).isFalse(); + offsets.markPolled(new ConsumerRecord<>(topic, 2, 19, null, null)); + assertThat(offsets.endReached()).isFalse(); + offsets.markPolled(new ConsumerRecord<>(topic, 3, 29, null, null)); + assertThat(offsets.endReached()).isTrue(); + } + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java new file mode 100644 index 0000000000..11af012277 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java @@ -0,0 +1,169 @@ +package com.provectus.kafka.ui.service; + +import static com.provectus.kafka.ui.service.ConsumingService.OffsetsSeek; +import static com.provectus.kafka.ui.service.ConsumingService.RecordEmitter; +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.AbstractBaseTest; +import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.SeekType; +import com.provectus.kafka.ui.producer.KafkaTestProducer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import lombok.Value; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +class RecordEmitterTest extends AbstractBaseTest { + + static final int PARTITIONS = 5; + static final int MSGS_PER_PARTITION = 100; + + static final String TOPIC = RecordEmitterTest.class.getSimpleName() + "_" + UUID.randomUUID(); + static final String EMPTY_TOPIC = TOPIC + "_empty"; + static final List SENT_RECORDS = new ArrayList<>(); + + @BeforeAll + static void generateMsgs() throws Exception { + createTopic(new NewTopic(TOPIC, PARTITIONS, (short) 1)); + createTopic(new NewTopic(EMPTY_TOPIC, PARTITIONS, (short) 1)); + try (var producer = KafkaTestProducer.forKafka(kafka)) { + for (int partition = 0; partition < PARTITIONS; partition++) { + for (int i = 0; i < MSGS_PER_PARTITION; i++) { + long ts = System.currentTimeMillis() + i; + var value = "msg_" + partition + "_" + i; + var metadata = + producer.send(new ProducerRecord<>(TOPIC, partition, ts, null, value)).get(); + SENT_RECORDS.add(new Record(value, metadata.partition(), metadata.offset(), ts)); + } + } + } + } + + @AfterAll + static void cleanup() { + deleteTopic(TOPIC); + deleteTopic(EMPTY_TOPIC); + } + + @Test + void pollNothingOnEmptyTopic() { + var emitter = new RecordEmitter( + this::createConsumer, + new OffsetsSeek(EMPTY_TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of()))); + + Long polledValues = Flux.create(emitter) + .limitRequest(100) + .count() + .block(); + + assertThat(polledValues).isZero(); + } + + @Test + void pollFullTopicFromBeginning() { + var emitter = new RecordEmitter( + this::createConsumer, + new OffsetsSeek(TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of()))); + + var polledValues = Flux.create(emitter) + .map(this::deserialize) + .limitRequest(Long.MAX_VALUE) + .collect(Collectors.toList()) + .block(); + + assertThat(polledValues).containsExactlyInAnyOrderElementsOf( + SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList())); + } + + @Test + void pollWithOffsets() { + Map targetOffsets = new HashMap<>(); + for (int i = 0; i < PARTITIONS; i++) { + long offset = ThreadLocalRandom.current().nextLong(MSGS_PER_PARTITION); + targetOffsets.put(i, offset); + } + + var emitter = new RecordEmitter( + this::createConsumer, + new OffsetsSeek(TOPIC, new ConsumerPosition(SeekType.OFFSET, targetOffsets))); + + var polledValues = Flux.create(emitter) + .map(this::deserialize) + .limitRequest(Long.MAX_VALUE) + .collect(Collectors.toList()) + .block(); + + var expectedValues = SENT_RECORDS.stream() + .filter(r -> r.getOffset() >= targetOffsets.get(r.getPartition())) + .map(Record::getValue) + .collect(Collectors.toList()); + + assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); + } + + @Test + void pollWithTimestamps() { + Map targetTimestamps = new HashMap<>(); + for (int i = 0; i < PARTITIONS; i++) { + int randRecordIdx = ThreadLocalRandom.current().nextInt(SENT_RECORDS.size()); + targetTimestamps.put(i, SENT_RECORDS.get(randRecordIdx).getTimestamp()); + } + + var emitter = new RecordEmitter( + this::createConsumer, + new OffsetsSeek(TOPIC, new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps))); + + var polledValues = Flux.create(emitter) + .map(this::deserialize) + .limitRequest(Long.MAX_VALUE) + .collect(Collectors.toList()) + .block(); + + var expectedValues = SENT_RECORDS.stream() + .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getPartition())) + .map(Record::getValue) + .collect(Collectors.toList()); + + assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues); + } + + private KafkaConsumer createConsumer() { + return new KafkaConsumer<>( + Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(), + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20, // to check multiple polls + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class + ) + ); + } + + private String deserialize(ConsumerRecord rec) { + return new StringDeserializer().deserialize(TOPIC, rec.value().get()); + } + + @Value + static class Record { + String value; + int partition; + long offset; + long timestamp; + } +}