Ver código fonte

#253: Get messages implementation improvement (#286)

* 1. End offsets check added to RecordEmitter
2. Tests for OffsetsSeek, RecordEmitter added

* Tests improvements

* ISSUE-257: checkstyle

* ISSUE-257: withSchemaType sonar fix

* ISSUE-257: withSchemaType sonar fix

Co-authored-by: Ilya Kuramshin <ikuramshin@provectus.com>
iliax 4 anos atrás
pai
commit
992e8b0898

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -247,7 +247,7 @@ public class ClusterService {
     if (!cluster.getTopics().containsKey(topicName)) {
       throw new NotFoundException("No such topic");
     }
-    return consumingService.loadOffsets(cluster, topicName, partitions)
+    return consumingService.offsetsForDeletion(cluster, topicName, partitions)
         .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
   }
-}
+}

+ 95 - 48
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java

@@ -10,14 +10,18 @@ import com.provectus.kafka.ui.model.SeekType;
 import com.provectus.kafka.ui.model.TopicMessage;
 import com.provectus.kafka.ui.util.ClusterUtil;
 import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -47,30 +51,23 @@ public class ConsumingService {
     int recordsLimit = Optional.ofNullable(limit)
         .map(s -> Math.min(s, MAX_RECORD_LIMIT))
         .orElse(DEFAULT_RECORD_LIMIT);
-    RecordEmitter emitter = new RecordEmitter(kafkaService, cluster, topic, consumerPosition);
+    RecordEmitter emitter = new RecordEmitter(
+        () -> kafkaService.createConsumer(cluster),
+        new OffsetsSeek(topic, consumerPosition));
     RecordDeserializer recordDeserializer =
         deserializationService.getRecordDeserializerForCluster(cluster);
-    return Flux.create(emitter::emit)
+    return Flux.create(emitter)
         .subscribeOn(Schedulers.boundedElastic())
         .map(r -> ClusterUtil.mapToTopicMessage(r, recordDeserializer))
         .filter(m -> filterTopicMessage(m, query))
         .limitRequest(recordsLimit);
   }
 
-  public Mono<Map<TopicPartition, Long>> loadOffsets(KafkaCluster cluster, String topicName,
-                                                     List<Integer> partitionsToInclude) {
+  public Mono<Map<TopicPartition, Long>> offsetsForDeletion(KafkaCluster cluster, String topicName,
+                                                            List<Integer> partitionsToInclude) {
     return Mono.fromSupplier(() -> {
       try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
-        var partitions = consumer.partitionsFor(topicName).stream()
-            .filter(
-                p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
-            .map(p -> new TopicPartition(topicName, p.partition()))
-            .collect(Collectors.toList());
-        var beginningOffsets = consumer.beginningOffsets(partitions);
-        var endOffsets = consumer.endOffsets(partitions);
-        return endOffsets.entrySet().stream()
-            .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        return significantOffsets(consumer, topicName, partitionsToInclude);
       } catch (Exception e) {
         log.error("Error occurred while consuming records", e);
         throw new RuntimeException(e);
@@ -78,6 +75,25 @@ public class ConsumingService {
     });
   }
 
+  /**
+   * returns end offsets for partitions where start offset != end offsets.
+   * This is useful when we need to verify that partition is not empty.
+   */
+  private static Map<TopicPartition, Long> significantOffsets(Consumer<?, ?> consumer,
+                                                              String topicName,
+                                                              Collection<Integer>
+                                                                  partitionsToInclude) {
+    var partitions = consumer.partitionsFor(topicName).stream()
+        .filter(p -> partitionsToInclude.isEmpty() || partitionsToInclude.contains(p.partition()))
+        .map(p -> new TopicPartition(topicName, p.partition()))
+        .collect(Collectors.toList());
+    var beginningOffsets = consumer.beginningOffsets(partitions);
+    var endOffsets = consumer.endOffsets(partitions);
+    return endOffsets.entrySet().stream()
+        .filter(entry -> !beginningOffsets.get(entry.getKey()).equals(entry.getValue()))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
   private boolean filterTopicMessage(TopicMessage message, String query) {
     if (StringUtils.isEmpty(query)) {
       return true;
@@ -110,52 +126,48 @@ public class ConsumingService {
   }
 
   @RequiredArgsConstructor
-  private static class RecordEmitter {
-    private static final int MAX_EMPTY_POLLS_COUNT = 3;
+  static class RecordEmitter
+      implements java.util.function.Consumer<FluxSink<ConsumerRecord<Bytes, Bytes>>> {
+
     private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
 
-    private final KafkaService kafkaService;
-    private final KafkaCluster cluster;
-    private final String topic;
-    private final ConsumerPosition consumerPosition;
+    private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+    private final OffsetsSeek offsetsSeek;
 
-    public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
-      try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) {
-        assignAndSeek(consumer);
-        int emptyPollsCount = 0;
-        log.info("assignment: {}", consumer.assignment());
-        while (!sink.isCancelled()) {
+    @Override
+    public void accept(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
+      try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+        var waitingOffsets = offsetsSeek.assignAndSeek(consumer);
+        while (!sink.isCancelled() && !waitingOffsets.endReached()) {
           ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
           log.info("{} records polled", records.count());
-          if (records.count() == 0 && emptyPollsCount > MAX_EMPTY_POLLS_COUNT) {
-            break;
-          } else {
-            emptyPollsCount++;
+          for (ConsumerRecord<Bytes, Bytes> record : records) {
+            if (!sink.isCancelled() && !waitingOffsets.endReached()) {
+              sink.next(record);
+              waitingOffsets.markPolled(record);
+            } else {
+              break;
+            }
           }
-          records.iterator()
-              .forEachRemaining(sink::next);
         }
         sink.complete();
+        log.info("Polling finished");
       } catch (Exception e) {
         log.error("Error occurred while consuming records", e);
         throw new RuntimeException(e);
       }
     }
+  }
 
-    private List<TopicPartition> getRequestedPartitions() {
-      Map<Integer, Long> partitionPositions = consumerPosition.getSeekTo();
+  @RequiredArgsConstructor
+  static class OffsetsSeek {
 
-      return Optional.ofNullable(cluster.getTopics().get(topic))
-          .orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic))
-          .getPartitions().values().stream()
-          .filter(internalPartition -> partitionPositions.isEmpty()
-              || partitionPositions.containsKey(internalPartition.getPartition()))
-          .map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition()))
-          .collect(Collectors.toList());
-    }
+    private final String topic;
+    private final ConsumerPosition consumerPosition;
 
-    private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
+    public WaitingOffsets assignAndSeek(Consumer<Bytes, Bytes> consumer) {
       SeekType seekType = consumerPosition.getSeekType();
+      log.info("Positioning consumer for topic {} with {}", topic, consumerPosition);
       switch (seekType) {
         case OFFSET:
           assignAndSeekForOffset(consumer);
@@ -169,10 +181,21 @@ public class ConsumingService {
         default:
           throw new IllegalArgumentException("Unknown seekType: " + seekType);
       }
+      log.info("Assignment: {}", consumer.assignment());
+      return new WaitingOffsets(topic, consumer);
     }
 
-    private void assignAndSeekForOffset(KafkaConsumer<Bytes, Bytes> consumer) {
-      List<TopicPartition> partitions = getRequestedPartitions();
+    private List<TopicPartition> getRequestedPartitions(Consumer<Bytes, Bytes> consumer) {
+      Map<Integer, Long> partitionPositions = consumerPosition.getSeekTo();
+      return consumer.partitionsFor(topic).stream()
+          .filter(
+              p -> partitionPositions.isEmpty() || partitionPositions.containsKey(p.partition()))
+          .map(p -> new TopicPartition(p.topic(), p.partition()))
+          .collect(Collectors.toList());
+    }
+
+    private void assignAndSeekForOffset(Consumer<Bytes, Bytes> consumer) {
+      List<TopicPartition> partitions = getRequestedPartitions(consumer);
       consumer.assign(partitions);
       consumerPosition.getSeekTo().forEach((partition, offset) -> {
         TopicPartition topicPartition = new TopicPartition(topic, partition);
@@ -180,7 +203,7 @@ public class ConsumingService {
       });
     }
 
-    private void assignAndSeekForTimestamp(KafkaConsumer<Bytes, Bytes> consumer) {
+    private void assignAndSeekForTimestamp(Consumer<Bytes, Bytes> consumer) {
       Map<TopicPartition, Long> timestampsToSearch =
           consumerPosition.getSeekTo().entrySet().stream()
               .collect(Collectors.toMap(
@@ -200,10 +223,34 @@ public class ConsumingService {
       offsetsForTimestamps.forEach(consumer::seek);
     }
 
-    private void assignAndSeekFromBeginning(KafkaConsumer<Bytes, Bytes> consumer) {
-      List<TopicPartition> partitions = getRequestedPartitions();
+    private void assignAndSeekFromBeginning(Consumer<Bytes, Bytes> consumer) {
+      List<TopicPartition> partitions = getRequestedPartitions(consumer);
       consumer.assign(partitions);
       consumer.seekToBeginning(partitions);
     }
+
+    static class WaitingOffsets {
+      final Map<Integer, Long> offsets = new HashMap<>(); // partition number -> offset
+
+      WaitingOffsets(String topic, Consumer<?, ?> consumer) {
+        var partitions = consumer.assignment().stream()
+            .map(TopicPartition::partition)
+            .collect(Collectors.toList());
+        significantOffsets(consumer, topic, partitions)
+            .forEach((tp, offset) -> offsets.put(tp.partition(), offset - 1));
+      }
+
+      void markPolled(ConsumerRecord<?, ?> rec) {
+        Long waiting = offsets.get(rec.partition());
+        if (waiting != null && waiting <= rec.offset()) {
+          offsets.remove(rec.partition());
+        }
+      }
+
+      boolean endReached() {
+        return offsets.isEmpty();
+      }
+    }
+
   }
 }

+ 2 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java

@@ -19,6 +19,7 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema;
 import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse;
 import java.util.Formatter;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.function.Function;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
@@ -121,14 +122,7 @@ public class SchemaRegistryService {
    */
   @NotNull
   private SchemaSubject withSchemaType(SchemaSubject s) {
-    SchemaType schemaType =
-        Objects.nonNull(s.getSchemaType()) ? s.getSchemaType() : SchemaType.AVRO;
-    return new SchemaSubject()
-        .schema(s.getSchema())
-        .subject(s.getSubject())
-        .version(s.getVersion())
-        .id(s.getId())
-        .schemaType(schemaType);
+    return s.schemaType(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO));
   }
 
   public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName,

+ 33 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java

@@ -2,8 +2,14 @@ package com.provectus.kafka.ui;
 
 import com.provectus.kafka.ui.container.KafkaConnectContainer;
 import com.provectus.kafka.ui.container.SchemaRegistryContainer;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.function.ThrowingConsumer;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.context.ApplicationContextInitializer;
 import org.springframework.context.ConfigurableApplicationContext;
@@ -13,25 +19,30 @@ import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.utility.DockerImageName;
 
+
 @ExtendWith(SpringExtension.class)
 @SpringBootTest
 @ActiveProfiles("test")
 public abstract class AbstractBaseTest {
+  public static String LOCAL = "local";
+  public static String SECOND_LOCAL = "secondLocal";
+
   private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0";
+
   public static final KafkaContainer kafka = new KafkaContainer(
       DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION))
       .withNetwork(Network.SHARED);
+
   public static final SchemaRegistryContainer schemaRegistry =
       new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION)
           .withKafka(kafka)
           .dependsOn(kafka);
+
   public static final KafkaConnectContainer kafkaConnect =
       new KafkaConnectContainer(CONFLUENT_PLATFORM_VERSION)
           .withKafka(kafka)
           .dependsOn(kafka)
           .dependsOn(schemaRegistry);
-  public static String LOCAL = "local";
-  public static String SECOND_LOCAL = "secondLocal";
 
   static {
     kafka.start();
@@ -57,4 +68,24 @@ public abstract class AbstractBaseTest {
       System.setProperty("kafka.clusters.1.kafkaConnect.0.address", kafkaConnect.getTarget());
     }
   }
+
+  public static void createTopic(NewTopic topic) {
+    withAdminClient(client -> client.createTopics(List.of(topic)).all().get());
+  }
+
+  public static void deleteTopic(String topic) {
+    withAdminClient(client -> client.deleteTopics(List.of(topic)).all().get());
+  }
+
+  private static void withAdminClient(ThrowingConsumer<AdminClient> consumer) {
+    Properties properties = new Properties();
+    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+    try (var client = AdminClient.create(properties)) {
+      try {
+        consumer.accept(client);
+      } catch (Throwable throwable) {
+        throw new RuntimeException(throwable);
+      }
+    }
+  }
 }

+ 8 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/producer/KafkaTestProducer.java

@@ -1,9 +1,11 @@
 package com.provectus.kafka.ui.producer;
 
 import java.util.Map;
+import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.testcontainers.containers.KafkaContainer;
 
@@ -23,8 +25,12 @@ public class KafkaTestProducer<KeyT, ValueT> implements AutoCloseable {
     )));
   }
 
-  public void send(String topic, ValueT value) {
-    producer.send(new ProducerRecord<>(topic, value));
+  public Future<RecordMetadata> send(String topic, ValueT value) {
+    return producer.send(new ProducerRecord<>(topic, value));
+  }
+
+  public Future<RecordMetadata> send(ProducerRecord<KeyT, ValueT> record) {
+    return producer.send(record);
   }
 
   @Override

+ 119 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsSeekTest.java

@@ -0,0 +1,119 @@
+package com.provectus.kafka.ui.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.SeekType;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+class OffsetsSeekTest {
+
+  String topic = "test";
+  TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0
+  TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10
+  TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20
+  TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30
+
+  MockConsumer<Bytes, Bytes> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+
+  @BeforeEach
+  void initConsumer() {
+    consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    consumer.updatePartitions(
+        topic,
+        Stream.of(tp0, tp1, tp2, tp3)
+            .map(tp -> new PartitionInfo(topic, tp.partition(), null, null, null, null))
+            .collect(Collectors.toList()));
+    consumer.updateBeginningOffsets(Map.of(
+        tp0, 0L,
+        tp1, 10L,
+        tp2, 0L,
+        tp3, 25L
+    ));
+    consumer.addEndOffsets(Map.of(
+        tp0, 0L,
+        tp1, 10L,
+        tp2, 20L,
+        tp3, 30L
+    ));
+  }
+
+  @Test
+  void seekToBeginningAllPartitions() {
+    var seek = new ConsumingService.OffsetsSeek(
+        topic,
+        new ConsumerPosition(SeekType.BEGINNING, Map.of(0, 0L, 1, 0L)));
+    seek.assignAndSeek(consumer);
+    assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1);
+    assertThat(consumer.position(tp0)).isEqualTo(0L);
+    assertThat(consumer.position(tp1)).isEqualTo(10L);
+  }
+
+  @Test
+  void seekToBeginningWithPartitionsList() {
+    var seek = new ConsumingService.OffsetsSeek(
+        topic,
+        new ConsumerPosition(SeekType.BEGINNING, Map.of()));
+    seek.assignAndSeek(consumer);
+    assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3);
+    assertThat(consumer.position(tp0)).isEqualTo(0L);
+    assertThat(consumer.position(tp1)).isEqualTo(10L);
+    assertThat(consumer.position(tp2)).isEqualTo(0L);
+    assertThat(consumer.position(tp3)).isEqualTo(25L);
+  }
+
+  @Test
+  void seekToOffset() {
+    var seek = new ConsumingService.OffsetsSeek(
+        topic,
+        new ConsumerPosition(SeekType.OFFSET, Map.of(0, 0L, 1, 1L, 2, 2L)));
+    seek.assignAndSeek(consumer);
+    assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2);
+    assertThat(consumer.position(tp0)).isEqualTo(0L);
+    assertThat(consumer.position(tp1)).isEqualTo(1L);
+    assertThat(consumer.position(tp2)).isEqualTo(2L);
+  }
+
+  @Nested
+  class WaitingOffsetsTest {
+
+    ConsumingService.OffsetsSeek.WaitingOffsets offsets;
+
+    @BeforeEach
+    void assignAndCreateOffsets() {
+      consumer.assign(List.of(tp0, tp1, tp2, tp3));
+      offsets = new ConsumingService.OffsetsSeek.WaitingOffsets(topic, consumer);
+    }
+
+    @Test
+    void collectsSignificantOffsetsMinus1ForAssignedPartitions() {
+      // offsets for partition 0 & 1 should be skipped because they
+      // effectively contains no data (start offset = end offset)
+      assertThat(offsets.offsets).containsExactlyInAnyOrderEntriesOf(
+          Map.of(2, 19L, 3, 29L)
+      );
+    }
+
+    @Test
+    void returnTrueWhenOffsetsReachedReached() {
+      assertThat(offsets.endReached()).isFalse();
+      offsets.markPolled(new ConsumerRecord<>(topic, 2, 19, null, null));
+      assertThat(offsets.endReached()).isFalse();
+      offsets.markPolled(new ConsumerRecord<>(topic, 3, 29, null, null));
+      assertThat(offsets.endReached()).isTrue();
+    }
+  }
+
+}

+ 169 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -0,0 +1,169 @@
+package com.provectus.kafka.ui.service;
+
+import static com.provectus.kafka.ui.service.ConsumingService.OffsetsSeek;
+import static com.provectus.kafka.ui.service.ConsumingService.RecordEmitter;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractBaseTest;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.SeekType;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import lombok.Value;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+
+class RecordEmitterTest extends AbstractBaseTest {
+
+  static final int PARTITIONS = 5;
+  static final int MSGS_PER_PARTITION = 100;
+
+  static final String TOPIC = RecordEmitterTest.class.getSimpleName() + "_" + UUID.randomUUID();
+  static final String EMPTY_TOPIC = TOPIC + "_empty";
+  static final List<Record> SENT_RECORDS = new ArrayList<>();
+
+  @BeforeAll
+  static void generateMsgs() throws Exception {
+    createTopic(new NewTopic(TOPIC, PARTITIONS, (short) 1));
+    createTopic(new NewTopic(EMPTY_TOPIC, PARTITIONS, (short) 1));
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      for (int partition = 0; partition < PARTITIONS; partition++) {
+        for (int i = 0; i < MSGS_PER_PARTITION; i++) {
+          long ts = System.currentTimeMillis() + i;
+          var value = "msg_" + partition + "_" + i;
+          var metadata =
+              producer.send(new ProducerRecord<>(TOPIC, partition, ts, null, value)).get();
+          SENT_RECORDS.add(new Record(value, metadata.partition(), metadata.offset(), ts));
+        }
+      }
+    }
+  }
+
+  @AfterAll
+  static void cleanup() {
+    deleteTopic(TOPIC);
+    deleteTopic(EMPTY_TOPIC);
+  }
+
+  @Test
+  void pollNothingOnEmptyTopic() {
+    var emitter = new RecordEmitter(
+        this::createConsumer,
+        new OffsetsSeek(EMPTY_TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of())));
+
+    Long polledValues = Flux.create(emitter)
+        .limitRequest(100)
+        .count()
+        .block();
+
+    assertThat(polledValues).isZero();
+  }
+
+  @Test
+  void pollFullTopicFromBeginning() {
+    var emitter = new RecordEmitter(
+        this::createConsumer,
+        new OffsetsSeek(TOPIC, new ConsumerPosition(SeekType.BEGINNING, Map.of())));
+
+    var polledValues = Flux.create(emitter)
+        .map(this::deserialize)
+        .limitRequest(Long.MAX_VALUE)
+        .collect(Collectors.toList())
+        .block();
+
+    assertThat(polledValues).containsExactlyInAnyOrderElementsOf(
+        SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()));
+  }
+
+  @Test
+  void pollWithOffsets() {
+    Map<Integer, Long> targetOffsets = new HashMap<>();
+    for (int i = 0; i < PARTITIONS; i++) {
+      long offset = ThreadLocalRandom.current().nextLong(MSGS_PER_PARTITION);
+      targetOffsets.put(i, offset);
+    }
+
+    var emitter = new RecordEmitter(
+        this::createConsumer,
+        new OffsetsSeek(TOPIC, new ConsumerPosition(SeekType.OFFSET, targetOffsets)));
+
+    var polledValues = Flux.create(emitter)
+        .map(this::deserialize)
+        .limitRequest(Long.MAX_VALUE)
+        .collect(Collectors.toList())
+        .block();
+
+    var expectedValues = SENT_RECORDS.stream()
+        .filter(r -> r.getOffset() >= targetOffsets.get(r.getPartition()))
+        .map(Record::getValue)
+        .collect(Collectors.toList());
+
+    assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
+  }
+
+  @Test
+  void pollWithTimestamps() {
+    Map<Integer, Long> targetTimestamps = new HashMap<>();
+    for (int i = 0; i < PARTITIONS; i++) {
+      int randRecordIdx = ThreadLocalRandom.current().nextInt(SENT_RECORDS.size());
+      targetTimestamps.put(i, SENT_RECORDS.get(randRecordIdx).getTimestamp());
+    }
+
+    var emitter = new RecordEmitter(
+        this::createConsumer,
+        new OffsetsSeek(TOPIC, new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps)));
+
+    var polledValues = Flux.create(emitter)
+        .map(this::deserialize)
+        .limitRequest(Long.MAX_VALUE)
+        .collect(Collectors.toList())
+        .block();
+
+    var expectedValues = SENT_RECORDS.stream()
+        .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getPartition()))
+        .map(Record::getValue)
+        .collect(Collectors.toList());
+
+    assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
+  }
+
+  private KafkaConsumer<Bytes, Bytes> createConsumer() {
+    return new KafkaConsumer<>(
+        Map.of(
+            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
+            ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
+            ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20, // to check multiple polls
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
+        )
+    );
+  }
+
+  private String deserialize(ConsumerRecord<Bytes, Bytes> rec) {
+    return new StringDeserializer().deserialize(TOPIC, rec.value().get());
+  }
+
+  @Value
+  static class Record {
+    String value;
+    int partition;
+    long offset;
+    long timestamp;
+  }
+}