Sfoglia il codice sorgente

#122 Fix emitter to consume records in right order (#598)

* #122 Fix emitter to consume records in right order

* Fixed naming
German Osin 4 anni fa
parent
commit
97ec512b00
20 ha cambiato i file con 572 aggiunte e 243 eliminazioni
  1. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java
  2. 9 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
  3. 94 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  4. 49 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  5. 4 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java
  6. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonMessageReader.java
  7. 1 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java
  8. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java
  9. 15 64
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java
  10. 9 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java
  11. 77 24
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java
  12. 56 57
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java
  13. 31 24
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java
  14. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java
  15. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonSchema.java
  16. 3 3
      kafka-ui-api/src/main/resources/application-sdp.yml
  17. 2 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java
  18. 1 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerGroupTests.java
  19. 185 26
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
  20. 35 23
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java

@@ -1,6 +1,5 @@
 package com.provectus.kafka.ui.config;
 
-import java.util.Optional;
 import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.stereotype.Component;
 import org.springframework.web.server.ServerWebExchange;

+ 9 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -16,6 +16,7 @@ import javax.validation.Valid;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -45,7 +46,7 @@ public class MessagesController implements MessagesApi {
       String clusterName, String topicName, @Valid SeekType seekType, @Valid List<String> seekTo,
       @Valid Integer limit, @Valid String q, @Valid SeekDirection seekDirection,
       ServerWebExchange exchange) {
-    return parseConsumerPosition(seekType, seekTo, seekDirection)
+    return parseConsumerPosition(topicName, seekType, seekTo, seekDirection)
         .map(consumerPosition -> ResponseEntity
             .ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
   }
@@ -68,18 +69,21 @@ public class MessagesController implements MessagesApi {
 
 
   private Mono<ConsumerPosition> parseConsumerPosition(
-      SeekType seekType, List<String> seekTo,  SeekDirection seekDirection) {
+      String topicName, SeekType seekType, List<String> seekTo,  SeekDirection seekDirection) {
     return Mono.justOrEmpty(seekTo)
         .defaultIfEmpty(Collections.emptyList())
         .flatMapIterable(Function.identity())
         .map(p -> {
-          String[] splited = p.split("::");
-          if (splited.length != 2) {
+          String[] split = p.split("::");
+          if (split.length != 2) {
             throw new IllegalArgumentException(
                 "Wrong seekTo argument format. See API docs for details");
           }
 
-          return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1]));
+          return Pair.of(
+              new TopicPartition(topicName, Integer.parseInt(split[0])),
+              Long.parseLong(split[1])
+          );
         })
         .collectMap(Pair::getKey, Pair::getValue)
         .map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING,

+ 94 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -0,0 +1,94 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.util.OffsetsSeekBackward;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+@RequiredArgsConstructor
+@Log4j2
+public class BackwardRecordEmitter
+    implements java.util.function.Consumer<FluxSink<ConsumerRecord<Bytes, Bytes>>> {
+
+  private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
+
+  private final Function<Map<String, Object>, KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final OffsetsSeekBackward offsetsSeek;
+
+  @Override
+  public void accept(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
+    try (KafkaConsumer<Bytes, Bytes> configConsumer = consumerSupplier.apply(Map.of())) {
+      final List<TopicPartition> requestedPartitions =
+          offsetsSeek.getRequestedPartitions(configConsumer);
+      final int msgsPerPartition = offsetsSeek.msgsPerPartition(requestedPartitions.size());
+      try (KafkaConsumer<Bytes, Bytes> consumer =
+               consumerSupplier.apply(
+                   Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, msgsPerPartition)
+               )
+      ) {
+        final Map<TopicPartition, Long> partitionsOffsets =
+            offsetsSeek.getPartitionsOffsets(consumer);
+        log.info("partition offsets: {}", partitionsOffsets);
+        var waitingOffsets =
+            offsetsSeek.waitingOffsets(consumer, partitionsOffsets.keySet());
+        log.info("waittin offsets {} {}",
+            waitingOffsets.getBeginOffsets(),
+            waitingOffsets.getEndOffsets()
+        );
+        while (!sink.isCancelled() && !waitingOffsets.beginReached()) {
+          for (Map.Entry<TopicPartition, Long> entry : partitionsOffsets.entrySet()) {
+            final Long lowest = waitingOffsets.getBeginOffsets().get(entry.getKey().partition());
+            consumer.assign(Collections.singleton(entry.getKey()));
+            final long offset = Math.max(lowest, entry.getValue() - msgsPerPartition);
+            log.info("Polling {} from {}", entry.getKey(), offset);
+            consumer.seek(entry.getKey(), offset);
+            ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
+            final List<ConsumerRecord<Bytes, Bytes>> partitionRecords =
+                records.records(entry.getKey()).stream()
+                  .filter(r -> r.offset() < partitionsOffsets.get(entry.getKey()))
+                  .collect(Collectors.toList());
+            Collections.reverse(partitionRecords);
+
+            log.info("{} records polled", records.count());
+            log.info("{} records sent", partitionRecords.size());
+            for (ConsumerRecord<Bytes, Bytes> msg : partitionRecords) {
+              if (!sink.isCancelled() && !waitingOffsets.beginReached()) {
+                sink.next(msg);
+                waitingOffsets.markPolled(msg);
+              } else {
+                log.info("Begin reached");
+                break;
+              }
+            }
+            partitionsOffsets.put(
+                entry.getKey(),
+                Math.max(offset, entry.getValue() - msgsPerPartition)
+            );
+          }
+          if (waitingOffsets.beginReached()) {
+            log.info("begin reached after partitions");
+          } else if (sink.isCancelled()) {
+            log.info("sink is cancelled after partitions");
+          }
+        }
+        sink.complete();
+        log.info("Polling finished");
+      }
+    } catch (Exception e) {
+      log.error("Error occurred while consuming records", e);
+      sink.error(e);
+    }
+  }
+}

+ 49 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -0,0 +1,49 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.util.OffsetsSeek;
+import java.time.Duration;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+@RequiredArgsConstructor
+@Log4j2
+public class ForwardRecordEmitter
+    implements java.util.function.Consumer<FluxSink<ConsumerRecord<Bytes, Bytes>>> {
+
+  private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
+
+  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final OffsetsSeek offsetsSeek;
+
+  @Override
+  public void accept(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
+    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+      var waitingOffsets = offsetsSeek.assignAndSeek(consumer);
+      while (!sink.isCancelled() && !waitingOffsets.endReached()) {
+        ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
+        log.info("{} records polled", records.count());
+
+        for (ConsumerRecord<Bytes, Bytes> msg : records) {
+          if (!sink.isCancelled() && !waitingOffsets.endReached()) {
+            sink.next(msg);
+            waitingOffsets.markPolled(msg);
+          } else {
+            break;
+          }
+        }
+
+      }
+      sink.complete();
+      log.info("Polling finished");
+    } catch (Exception e) {
+      log.error("Error occurred while consuming records", e);
+      sink.error(e);
+    }
+  }
+}

+ 4 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java

@@ -2,11 +2,11 @@ package com.provectus.kafka.ui.model;
 
 import java.util.Map;
 import lombok.Value;
+import org.apache.kafka.common.TopicPartition;
 
 @Value
 public class ConsumerPosition {
-
-  private SeekType seekType;
-  private Map<Integer, Long> seekTo;
-  private SeekDirection seekDirection;
+  SeekType seekType;
+  Map<TopicPartition, Long> seekTo;
+  SeekDirection seekDirection;
 }

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/JsonMessageReader.java

@@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import java.io.IOException;
 import lombok.SneakyThrows;

+ 1 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/MessageReader.java

@@ -3,7 +3,6 @@ package com.provectus.kafka.ui.serde.schemaregistry;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import java.io.IOException;
 import org.apache.kafka.common.serialization.Serializer;
@@ -12,8 +11,7 @@ public abstract class MessageReader<T> {
   protected final Serializer<T> serializer;
   protected final String topic;
   protected final boolean isKey;
-
-  private ParsedSchema schema;
+  private final ParsedSchema schema;
 
   protected MessageReader(String topic, boolean isKey, SchemaRegistryClient client,
                           SchemaMetadata schema) throws IOException, RestClientException {

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/ProtobufMessageReader.java

@@ -6,7 +6,6 @@ import com.google.protobuf.util.JsonFormat;
 import io.confluent.kafka.schemaregistry.ParsedSchema;
 import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
 import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;

+ 15 - 64
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumingService.java

@@ -2,6 +2,8 @@ package com.provectus.kafka.ui.service;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SeekDirection;
@@ -9,25 +11,19 @@ import com.provectus.kafka.ui.model.TopicMessage;
 import com.provectus.kafka.ui.serde.DeserializationService;
 import com.provectus.kafka.ui.serde.RecordSerDe;
 import com.provectus.kafka.ui.util.ClusterUtil;
-import com.provectus.kafka.ui.util.OffsetsSeek;
 import com.provectus.kafka.ui.util.OffsetsSeekBackward;
 import com.provectus.kafka.ui.util.OffsetsSeekForward;
-import java.time.Duration;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
@@ -55,12 +51,19 @@ public class ConsumingService {
     int recordsLimit = Optional.ofNullable(limit)
         .map(s -> Math.min(s, MAX_RECORD_LIMIT))
         .orElse(DEFAULT_RECORD_LIMIT);
-    RecordEmitter emitter = new RecordEmitter(
-        () -> kafkaService.createConsumer(cluster),
-        consumerPosition.getSeekDirection().equals(SeekDirection.FORWARD)
-            ? new OffsetsSeekForward(topic, consumerPosition)
-            : new OffsetsSeekBackward(topic, consumerPosition, recordsLimit)
-    );
+
+    java.util.function.Consumer<? super FluxSink<ConsumerRecord<Bytes, Bytes>>> emitter;
+    if (consumerPosition.getSeekDirection().equals(SeekDirection.FORWARD)) {
+      emitter = new ForwardRecordEmitter(
+          () -> kafkaService.createConsumer(cluster),
+          new OffsetsSeekForward(topic, consumerPosition)
+      );
+    } else {
+      emitter = new BackwardRecordEmitter(
+          (Map<String, Object> props) -> kafkaService.createConsumer(cluster, props),
+          new OffsetsSeekBackward(topic, consumerPosition, recordsLimit)
+      );
+    }
     RecordSerDe recordDeserializer =
         deserializationService.getRecordDeserializerForCluster(cluster);
     return Flux.create(emitter)
@@ -132,56 +135,4 @@ public class ConsumingService {
     return false;
   }
 
-  @RequiredArgsConstructor
-  static class RecordEmitter
-      implements java.util.function.Consumer<FluxSink<ConsumerRecord<Bytes, Bytes>>> {
-
-    private static final Duration POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
-
-    private static final Comparator<ConsumerRecord<?, ?>> PARTITION_COMPARING =
-        Comparator.comparing(
-            ConsumerRecord::partition,
-            Comparator.nullsFirst(Comparator.naturalOrder())
-        );
-    private static final Comparator<ConsumerRecord<?, ?>> REVERED_COMPARING =
-        PARTITION_COMPARING.thenComparing(ConsumerRecord::offset).reversed();
-
-
-    private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
-    private final OffsetsSeek offsetsSeek;
-
-    @Override
-    public void accept(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
-      try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
-        var waitingOffsets = offsetsSeek.assignAndSeek(consumer);
-        while (!sink.isCancelled() && !waitingOffsets.endReached()) {
-          ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
-          log.info("{} records polled", records.count());
-
-          final Iterable<ConsumerRecord<Bytes, Bytes>> iterable;
-          if (offsetsSeek.getConsumerPosition().getSeekDirection().equals(SeekDirection.FORWARD)) {
-            iterable = records;
-          } else {
-            iterable = StreamSupport.stream(records.spliterator(), false)
-                .sorted(REVERED_COMPARING).collect(Collectors.toList());
-          }
-
-          for (ConsumerRecord<Bytes, Bytes> msg : iterable) {
-            if (!sink.isCancelled() && !waitingOffsets.endReached()) {
-              sink.next(msg);
-              waitingOffsets.markPolled(msg);
-            } else {
-              break;
-            }
-          }
-        }
-        sink.complete();
-        log.info("Polling finished");
-      } catch (Exception e) {
-        log.error("Error occurred while consuming records", e);
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
 }

+ 9 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java

@@ -31,6 +31,7 @@ import java.util.LongSummaryStatistics;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -376,13 +377,19 @@ public class KafkaService {
   }
 
   public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
+    return createConsumer(cluster, Map.of());
+  }
+
+  public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
+                                                    Map<String, Object> properties) {
     Properties props = new Properties();
     props.putAll(cluster.getProperties());
-    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
+    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID().toString());
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.putAll(properties);
 
     return new KafkaConsumer<>(props);
   }
@@ -496,7 +503,7 @@ public class KafkaService {
                   final Map<Integer, LongSummaryStatistics> brokerStats =
                       topicPartitions.stream().collect(
                           Collectors.groupingBy(
-                              t -> t.getT1(),
+                              Tuple2::getT1,
                               Collectors.summarizingLong(Tuple3::getT3)
                           )
                       );

+ 77 - 24
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeek.java

@@ -2,8 +2,7 @@ package com.provectus.kafka.ui.util;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.SeekType;
-import com.provectus.kafka.ui.service.ConsumingService;
-import java.util.HashMap;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -12,6 +11,8 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 @Log4j2
 public abstract class OffsetsSeek {
@@ -27,62 +28,114 @@ public abstract class OffsetsSeek {
     return consumerPosition;
   }
 
-  public WaitingOffsets assignAndSeek(Consumer<Bytes, Bytes> consumer) {
+  public Map<TopicPartition, Long> getPartitionsOffsets(Consumer<Bytes, Bytes> consumer) {
     SeekType seekType = consumerPosition.getSeekType();
+    List<TopicPartition> partitions = getRequestedPartitions(consumer);
     log.info("Positioning consumer for topic {} with {}", topic, consumerPosition);
+    Map<TopicPartition, Long> offsets;
     switch (seekType) {
       case OFFSET:
-        assignAndSeekForOffset(consumer);
+        offsets = offsetsFromPositions(consumer, partitions);
         break;
       case TIMESTAMP:
-        assignAndSeekForTimestamp(consumer);
+        offsets = offsetsForTimestamp(consumer);
         break;
       case BEGINNING:
-        assignAndSeekFromBeginning(consumer);
+        offsets = offsetsFromBeginning(consumer, partitions);
         break;
       default:
         throw new IllegalArgumentException("Unknown seekType: " + seekType);
     }
+    return offsets;
+  }
+
+  public WaitingOffsets waitingOffsets(Consumer<Bytes, Bytes> consumer,
+                                       Collection<TopicPartition> partitions) {
+    return new WaitingOffsets(topic, consumer, partitions);
+  }
+
+  public WaitingOffsets assignAndSeek(Consumer<Bytes, Bytes> consumer) {
+    final Map<TopicPartition, Long> partitionsOffsets = getPartitionsOffsets(consumer);
+    consumer.assign(partitionsOffsets.keySet());
+    partitionsOffsets.forEach(consumer::seek);
     log.info("Assignment: {}", consumer.assignment());
-    return new WaitingOffsets(topic, consumer);
+    return waitingOffsets(consumer, partitionsOffsets.keySet());
   }
 
-  protected List<TopicPartition> getRequestedPartitions(Consumer<Bytes, Bytes> consumer) {
-    Map<Integer, Long> partitionPositions = consumerPosition.getSeekTo();
+
+  public List<TopicPartition> getRequestedPartitions(Consumer<Bytes, Bytes> consumer) {
+    Map<TopicPartition, Long> partitionPositions = consumerPosition.getSeekTo();
     return consumer.partitionsFor(topic).stream()
         .filter(
-            p -> partitionPositions.isEmpty() || partitionPositions.containsKey(p.partition()))
-        .map(p -> new TopicPartition(p.topic(), p.partition()))
+            p -> partitionPositions.isEmpty()
+                || partitionPositions.containsKey(new TopicPartition(p.topic(), p.partition()))
+        ).map(p -> new TopicPartition(p.topic(), p.partition()))
         .collect(Collectors.toList());
   }
 
 
-  protected abstract void assignAndSeekFromBeginning(Consumer<Bytes, Bytes> consumer);
+  protected abstract Map<TopicPartition, Long> offsetsFromBeginning(
+      Consumer<Bytes, Bytes> consumer, List<TopicPartition> partitions);
 
-  protected abstract void assignAndSeekForTimestamp(Consumer<Bytes, Bytes> consumer);
+  protected abstract Map<TopicPartition, Long> offsetsForTimestamp(
+      Consumer<Bytes, Bytes> consumer);
 
-  protected abstract void assignAndSeekForOffset(Consumer<Bytes, Bytes> consumer);
+  protected abstract Map<TopicPartition, Long> offsetsFromPositions(
+      Consumer<Bytes, Bytes> consumer, List<TopicPartition> partitions);
 
   public static class WaitingOffsets {
-    final Map<Integer, Long> offsets = new HashMap<>(); // partition number -> offset
+    private final Map<Integer, Long> endOffsets; // partition number -> offset
+    private final Map<Integer, Long> beginOffsets; // partition number -> offset
+    private final String topic;
+
+    public WaitingOffsets(String topic, Consumer<?, ?> consumer,
+                          Collection<TopicPartition> partitions) {
+      this.topic = topic;
+      var allBeginningOffsets = consumer.beginningOffsets(partitions);
+      var allEndOffsets = consumer.endOffsets(partitions);
 
-    public WaitingOffsets(String topic, Consumer<?, ?> consumer) {
-      var partitions = consumer.assignment().stream()
-          .map(TopicPartition::partition)
+      this.endOffsets = allEndOffsets.entrySet().stream()
+          .filter(entry -> !allBeginningOffsets.get(entry.getKey()).equals(entry.getValue()))
+          .map(e -> Tuples.of(e.getKey().partition(), e.getValue() - 1))
+          .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
+
+      this.beginOffsets = this.endOffsets.keySet().stream()
+         .map(p -> Tuples.of(p, allBeginningOffsets.get(new TopicPartition(topic, p))))
+         .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
+    }
+
+    public List<TopicPartition> topicPartitions() {
+      return this.endOffsets.keySet().stream()
+          .map(p -> new TopicPartition(topic, p))
           .collect(Collectors.toList());
-      ConsumingService.significantOffsets(consumer, topic, partitions)
-          .forEach((tp, offset) -> offsets.put(tp.partition(), offset - 1));
     }
 
     public void markPolled(ConsumerRecord<?, ?> rec) {
-      Long waiting = offsets.get(rec.partition());
-      if (waiting != null && waiting <= rec.offset()) {
-        offsets.remove(rec.partition());
+      Long endWaiting = endOffsets.get(rec.partition());
+      if (endWaiting != null && endWaiting <= rec.offset()) {
+        endOffsets.remove(rec.partition());
+      }
+      Long beginWaiting = beginOffsets.get(rec.partition());
+      if (beginWaiting != null && beginWaiting >= rec.offset()) {
+        beginOffsets.remove(rec.partition());
       }
+
     }
 
     public boolean endReached() {
-      return offsets.isEmpty();
+      return endOffsets.isEmpty();
+    }
+
+    public boolean beginReached() {
+      return beginOffsets.isEmpty();
+    }
+
+    public Map<Integer, Long> getEndOffsets() {
+      return endOffsets;
+    }
+
+    public Map<Integer, Long> getBeginOffsets() {
+      return beginOffsets;
     }
   }
 }

+ 56 - 57
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekBackward.java

@@ -1,11 +1,11 @@
 package com.provectus.kafka.ui.util;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.extern.log4j.Log4j2;
@@ -26,96 +26,95 @@ public class OffsetsSeekBackward extends OffsetsSeek {
     this.maxMessages = maxMessages;
   }
 
+  public int msgsPerPartition(int partitionsSize) {
+    return msgsPerPartition(maxMessages, partitionsSize);
+  }
+
+  public int msgsPerPartition(long awaitingMessages, int partitionsSize) {
+    return (int) Math.ceil((double) awaitingMessages / partitionsSize);
+  }
+
+
+  protected Map<TopicPartition, Long> offsetsFromPositions(Consumer<Bytes, Bytes> consumer,
+                                        List<TopicPartition> partitions) {
 
-  protected void assignAndSeekForOffset(Consumer<Bytes, Bytes> consumer) {
-    List<TopicPartition> partitions = getRequestedPartitions(consumer);
-    consumer.assign(partitions);
-    final Map<TopicPartition, Long> offsets =
-        findOffsetsInt(consumer, consumerPosition.getSeekTo());
-    offsets.forEach(consumer::seek);
+    return findOffsetsInt(consumer, consumerPosition.getSeekTo(), partitions);
   }
 
-  protected void assignAndSeekFromBeginning(Consumer<Bytes, Bytes> consumer) {
-    List<TopicPartition> partitions = getRequestedPartitions(consumer);
-    consumer.assign(partitions);
-    final Map<TopicPartition, Long> offsets = findOffsets(consumer, Map.of());
-    offsets.forEach(consumer::seek);
+  protected Map<TopicPartition, Long> offsetsFromBeginning(Consumer<Bytes, Bytes> consumer,
+                                            List<TopicPartition> partitions) {
+    return findOffsets(consumer, Map.of(), partitions);
   }
 
-  protected void assignAndSeekForTimestamp(Consumer<Bytes, Bytes> consumer) {
+  protected Map<TopicPartition, Long> offsetsForTimestamp(Consumer<Bytes, Bytes> consumer) {
     Map<TopicPartition, Long> timestampsToSearch =
         consumerPosition.getSeekTo().entrySet().stream()
             .collect(Collectors.toMap(
-                partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
-                e -> e.getValue() + 1
+                Map.Entry::getKey,
+                e -> e.getValue()
             ));
     Map<TopicPartition, Long> offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch)
         .entrySet().stream()
         .filter(e -> e.getValue() != null)
-        .map(v -> Tuples.of(v.getKey(), v.getValue().offset() - 1))
+        .map(v -> Tuples.of(v.getKey(), v.getValue().offset()))
         .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
 
     if (offsetsForTimestamps.isEmpty()) {
       throw new IllegalArgumentException("No offsets were found for requested timestamps");
     }
 
-    consumer.assign(offsetsForTimestamps.keySet());
-    final Map<TopicPartition, Long> offsets = findOffsets(consumer, offsetsForTimestamps);
-    offsets.forEach(consumer::seek);
+    log.info("Timestamps: {} to offsets: {}", timestampsToSearch, offsetsForTimestamps);
+
+    return findOffsets(consumer, offsetsForTimestamps, offsetsForTimestamps.keySet());
   }
 
   protected Map<TopicPartition, Long> findOffsetsInt(
-      Consumer<Bytes, Bytes> consumer, Map<Integer, Long> seekTo) {
-
-    final Map<TopicPartition, Long> seekMap = seekTo.entrySet()
-        .stream().map(p ->
-            Tuples.of(
-                new TopicPartition(topic, p.getKey()),
-                p.getValue()
-            )
-        ).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2));
-
-    return findOffsets(consumer, seekMap);
+      Consumer<Bytes, Bytes> consumer, Map<TopicPartition, Long> seekTo,
+      List<TopicPartition> partitions) {
+    return findOffsets(consumer, seekTo, partitions);
   }
 
   protected Map<TopicPartition, Long> findOffsets(
-      Consumer<Bytes, Bytes> consumer, Map<TopicPartition, Long> seekTo) {
+      Consumer<Bytes, Bytes> consumer, Map<TopicPartition, Long> seekTo,
+      Collection<TopicPartition> partitions) {
 
-    List<TopicPartition> partitions = getRequestedPartitions(consumer);
     final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
     final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
 
-    final Map<TopicPartition, Long> seekMap = new HashMap<>(seekTo);
-    int awaitingMessages = maxMessages;
-
-    Set<TopicPartition> waiting = new HashSet<>(partitions);
-
-    while (awaitingMessages > 0 && !waiting.isEmpty()) {
-      final int msgsPerPartition = (int) Math.ceil((double) awaitingMessages / partitions.size());
-      for (TopicPartition partition : partitions) {
-        final Long offset = Optional.ofNullable(seekMap.get(partition))
-            .orElseGet(() -> endOffsets.get(partition));
-        final Long beginning = beginningOffsets.get(partition);
-
-        if (offset - beginning > msgsPerPartition) {
-          seekMap.put(partition, offset - msgsPerPartition);
-          awaitingMessages -= msgsPerPartition;
+    final Map<TopicPartition, Long> seekMap = new HashMap<>();
+    final Set<TopicPartition> emptyPartitions = new HashSet<>();
+
+    for (Map.Entry<TopicPartition, Long> entry : seekTo.entrySet()) {
+      final Long endOffset = endOffsets.get(entry.getKey());
+      final Long beginningOffset = beginningOffsets.get(entry.getKey());
+      if (beginningOffset != null
+          && endOffset != null
+          && beginningOffset < endOffset
+          && entry.getValue() > beginningOffset
+      ) {
+        final Long value;
+        if (entry.getValue() > endOffset) {
+          value = endOffset;
         } else {
-          final long num = offset - beginning;
-          if (num > 0) {
-            seekMap.put(partition, offset - num);
-            awaitingMessages -= num;
-          } else {
-            waiting.remove(partition);
-          }
+          value = entry.getValue();
         }
 
-        if (awaitingMessages <= 0) {
-          break;
-        }
+        seekMap.put(entry.getKey(), value);
+      } else {
+        emptyPartitions.add(entry.getKey());
       }
     }
 
+    Set<TopicPartition> waiting = new HashSet<>(partitions);
+    waiting.removeAll(emptyPartitions);
+    waiting.removeAll(seekMap.keySet());
+
+    for (TopicPartition topicPartition : waiting) {
+      seekMap.put(topicPartition, endOffsets.get(topicPartition));
+    }
+
     return seekMap;
   }
+
+
 }

+ 31 - 24
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/OffsetsSeekForward.java

@@ -1,8 +1,10 @@
 package com.provectus.kafka.ui.util;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -16,39 +18,44 @@ public class OffsetsSeekForward extends OffsetsSeek {
     super(topic, consumerPosition);
   }
 
-  protected void assignAndSeekForOffset(Consumer<Bytes, Bytes> consumer) {
-    List<TopicPartition> partitions = getRequestedPartitions(consumer);
-    consumer.assign(partitions);
-    consumerPosition.getSeekTo().forEach((partition, offset) -> {
-      TopicPartition topicPartition = new TopicPartition(topic, partition);
-      consumer.seek(topicPartition, offset);
-    });
+  protected Map<TopicPartition, Long> offsetsFromPositions(Consumer<Bytes, Bytes> consumer,
+                                        List<TopicPartition> partitions) {
+    final Map<TopicPartition, Long> offsets =
+        offsetsFromBeginning(consumer, partitions);
+
+    final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(offsets.keySet());
+    final Set<TopicPartition> set = new HashSet<>(consumerPosition.getSeekTo().keySet());
+    final Map<TopicPartition, Long> collect = consumerPosition.getSeekTo().entrySet().stream()
+        .filter(e -> e.getValue() < endOffsets.get(e.getKey()))
+        .filter(e -> endOffsets.get(e.getKey()) > offsets.get(e.getKey()))
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            Map.Entry::getValue
+        ));
+    offsets.putAll(collect);
+    set.removeAll(collect.keySet());
+    set.forEach(offsets::remove);
+
+    return offsets;
   }
 
-  protected void assignAndSeekForTimestamp(Consumer<Bytes, Bytes> consumer) {
-    Map<TopicPartition, Long> timestampsToSearch =
-        consumerPosition.getSeekTo().entrySet().stream()
-            .collect(Collectors.toMap(
-                partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
-                Map.Entry::getValue
-            ));
-    Map<TopicPartition, Long> offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch)
-        .entrySet().stream()
-        .filter(e -> e.getValue() != null)
-        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
+  protected Map<TopicPartition, Long> offsetsForTimestamp(Consumer<Bytes, Bytes> consumer) {
+    Map<TopicPartition, Long> offsetsForTimestamps =
+        consumer.offsetsForTimes(consumerPosition.getSeekTo())
+            .entrySet().stream()
+            .filter(e -> e.getValue() != null)
+            .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
 
     if (offsetsForTimestamps.isEmpty()) {
       throw new IllegalArgumentException("No offsets were found for requested timestamps");
     }
 
-    consumer.assign(offsetsForTimestamps.keySet());
-    offsetsForTimestamps.forEach(consumer::seek);
+    return offsetsForTimestamps;
   }
 
-  protected void assignAndSeekFromBeginning(Consumer<Bytes, Bytes> consumer) {
-    List<TopicPartition> partitions = getRequestedPartitions(consumer);
-    consumer.assign(partitions);
-    consumer.seekToBeginning(partitions);
+  protected Map<TopicPartition, Long> offsetsFromBeginning(Consumer<Bytes, Bytes> consumer,
+                                            List<TopicPartition> partitions) {
+    return consumer.beginningOffsets(partitions);
   }
 
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/EnumJsonType.java

@@ -7,7 +7,7 @@ import java.util.Map;
 
 
 public class EnumJsonType extends JsonType {
-  private List<String> values;
+  private final List<String> values;
 
   public EnumJsonType(List<String> values) {
     super(Type.ENUM);

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/jsonschema/JsonSchema.java

@@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;

+ 3 - 3
kafka-ui-api/src/main/resources/application-sdp.yml

@@ -1,9 +1,9 @@
 kafka:
   clusters:
     - name: local
-      bootstrapServers: localhost:9093
-      zookeeper: localhost:2181
-      schemaRegistry: http://localhost:8083
+      bootstrapServers: b-1.kad-msk.uxahxx.c6.kafka.eu-west-1.amazonaws.com:9092
+#      zookeeper: localhost:2181
+#      schemaRegistry: http://localhost:8083
   #    -
   #      name: secondLocal
   #      zookeeper: zookeeper1:2181

+ 2 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java

@@ -24,8 +24,8 @@ import org.testcontainers.utility.DockerImageName;
 @SpringBootTest
 @ActiveProfiles("test")
 public abstract class AbstractBaseTest {
-  public static String LOCAL = "local";
-  public static String SECOND_LOCAL = "secondLocal";
+  public static final String LOCAL = "local";
+  public static final String SECOND_LOCAL = "secondLocal";
 
   private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0";
 

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KakfaConsumerGroupTests.java → kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerGroupTests.java

@@ -20,7 +20,7 @@ import org.springframework.test.web.reactive.server.WebTestClient;
 @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
 @Log4j2
 @AutoConfigureWebTestClient(timeout = "10000")
-public class KakfaConsumerGroupTests extends AbstractBaseTest {
+public class KafkaConsumerGroupTests extends AbstractBaseTest {
   @Autowired
   WebTestClient webTestClient;
 

+ 185 - 26
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -1,27 +1,33 @@
 package com.provectus.kafka.ui.service;
 
-import static com.provectus.kafka.ui.service.ConsumingService.RecordEmitter;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractBaseTest;
+import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.SeekDirection;
 import com.provectus.kafka.ui.model.SeekType;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import com.provectus.kafka.ui.util.OffsetsSeekBackward;
 import com.provectus.kafka.ui.util.OffsetsSeekForward;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import lombok.Value;
+import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -30,6 +36,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import reactor.core.publisher.Flux;
 
+@Log4j2
 class RecordEmitterTest extends AbstractBaseTest {
 
   static final int PARTITIONS = 5;
@@ -50,7 +57,12 @@ class RecordEmitterTest extends AbstractBaseTest {
           var value = "msg_" + partition + "_" + i;
           var metadata =
               producer.send(new ProducerRecord<>(TOPIC, partition, ts, null, value)).get();
-          SENT_RECORDS.add(new Record(value, metadata.partition(), metadata.offset(), ts));
+          SENT_RECORDS.add(new Record(
+              value,
+              new TopicPartition(metadata.topic(), metadata.partition()),
+              metadata.offset(),
+              ts)
+          );
         }
       }
     }
@@ -64,31 +76,65 @@ class RecordEmitterTest extends AbstractBaseTest {
 
   @Test
   void pollNothingOnEmptyTopic() {
-    var emitter = new RecordEmitter(
+    var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new OffsetsSeekForward(EMPTY_TOPIC,
             new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD)
         )
     );
 
-    Long polledValues = Flux.create(emitter)
+    var backwardEmitter = new BackwardRecordEmitter(
+        this::createConsumer,
+        new OffsetsSeekBackward(
+            EMPTY_TOPIC,
+            new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.BACKWARD),
+            100
+        )
+    );
+
+    Long polledValues = Flux.create(forwardEmitter)
+        .limitRequest(100)
+        .count()
+        .block();
+
+    assertThat(polledValues).isZero();
+
+    polledValues = Flux.create(backwardEmitter)
         .limitRequest(100)
         .count()
         .block();
 
     assertThat(polledValues).isZero();
+
   }
 
   @Test
   void pollFullTopicFromBeginning() {
-    var emitter = new RecordEmitter(
+    var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new OffsetsSeekForward(TOPIC,
             new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD)
         )
     );
 
-    var polledValues = Flux.create(emitter)
+    var backwardEmitter = new BackwardRecordEmitter(
+        this::createConsumer,
+        new OffsetsSeekBackward(TOPIC,
+            new ConsumerPosition(SeekType.BEGINNING, Map.of(), SeekDirection.FORWARD),
+            PARTITIONS * MSGS_PER_PARTITION
+        )
+    );
+
+    var polledValues = Flux.create(forwardEmitter)
+        .map(this::deserialize)
+        .limitRequest(Long.MAX_VALUE)
+        .collect(Collectors.toList())
+        .block();
+
+    assertThat(polledValues).containsExactlyInAnyOrderElementsOf(
+        SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()));
+
+    polledValues = Flux.create(backwardEmitter)
         .map(this::deserialize)
         .limitRequest(Long.MAX_VALUE)
         .collect(Collectors.toList())
@@ -96,76 +142,189 @@ class RecordEmitterTest extends AbstractBaseTest {
 
     assertThat(polledValues).containsExactlyInAnyOrderElementsOf(
         SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()));
+
   }
 
   @Test
   void pollWithOffsets() {
-    Map<Integer, Long> targetOffsets = new HashMap<>();
+    Map<TopicPartition, Long> targetOffsets = new HashMap<>();
     for (int i = 0; i < PARTITIONS; i++) {
       long offset = ThreadLocalRandom.current().nextLong(MSGS_PER_PARTITION);
-      targetOffsets.put(i, offset);
+      targetOffsets.put(new TopicPartition(TOPIC, i), offset);
     }
 
-    var emitter = new RecordEmitter(
+    var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new OffsetsSeekForward(TOPIC,
             new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.FORWARD)
         )
     );
 
-    var polledValues = Flux.create(emitter)
+    var backwardEmitter = new BackwardRecordEmitter(
+        this::createConsumer,
+        new OffsetsSeekBackward(TOPIC,
+            new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.BACKWARD),
+            PARTITIONS * MSGS_PER_PARTITION
+        )
+    );
+
+    var polledValues = Flux.create(forwardEmitter)
         .map(this::deserialize)
         .limitRequest(Long.MAX_VALUE)
         .collect(Collectors.toList())
         .block();
 
     var expectedValues = SENT_RECORDS.stream()
-        .filter(r -> r.getOffset() >= targetOffsets.get(r.getPartition()))
+        .filter(r -> r.getOffset() >= targetOffsets.get(r.getTp()))
         .map(Record::getValue)
         .collect(Collectors.toList());
 
     assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
+
+    expectedValues = SENT_RECORDS.stream()
+        .filter(r -> r.getOffset() < targetOffsets.get(r.getTp()))
+        .map(Record::getValue)
+        .collect(Collectors.toList());
+
+    polledValues =  Flux.create(backwardEmitter)
+        .map(this::deserialize)
+        .limitRequest(Long.MAX_VALUE)
+        .collect(Collectors.toList())
+        .block();
+
+    assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
   }
 
   @Test
   void pollWithTimestamps() {
-    Map<Integer, Long> targetTimestamps = new HashMap<>();
+    Map<TopicPartition, Long> targetTimestamps = new HashMap<>();
+    final Map<TopicPartition, List<Record>> perPartition =
+        SENT_RECORDS.stream().collect(Collectors.groupingBy((r) -> r.tp));
     for (int i = 0; i < PARTITIONS; i++) {
-      int randRecordIdx = ThreadLocalRandom.current().nextInt(SENT_RECORDS.size());
-      targetTimestamps.put(i, SENT_RECORDS.get(randRecordIdx).getTimestamp());
+      final List<Record> records = perPartition.get(new TopicPartition(TOPIC, i));
+      int randRecordIdx = ThreadLocalRandom.current().nextInt(records.size());
+      log.info("partition: {} position: {}", i, randRecordIdx);
+      targetTimestamps.put(
+          new TopicPartition(TOPIC, i),
+          records.get(randRecordIdx).getTimestamp()
+      );
     }
 
-    var emitter = new RecordEmitter(
+    var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new OffsetsSeekForward(TOPIC,
             new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps, SeekDirection.FORWARD)
         )
     );
 
-    var polledValues = Flux.create(emitter)
+    var backwardEmitter = new BackwardRecordEmitter(
+        this::createConsumer,
+        new OffsetsSeekBackward(TOPIC,
+            new ConsumerPosition(SeekType.TIMESTAMP, targetTimestamps, SeekDirection.BACKWARD),
+            PARTITIONS * MSGS_PER_PARTITION
+        )
+    );
+
+    var polledValues = Flux.create(forwardEmitter)
         .map(this::deserialize)
         .limitRequest(Long.MAX_VALUE)
         .collect(Collectors.toList())
         .block();
 
     var expectedValues = SENT_RECORDS.stream()
-        .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getPartition()))
+        .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getTp()))
         .map(Record::getValue)
         .collect(Collectors.toList());
 
     assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
+
+    polledValues = Flux.create(backwardEmitter)
+        .map(this::deserialize)
+        .limitRequest(Long.MAX_VALUE)
+        .collect(Collectors.toList())
+        .block();
+
+    expectedValues = SENT_RECORDS.stream()
+        .filter(r -> r.getTimestamp() < targetTimestamps.get(r.getTp()))
+        .map(Record::getValue)
+        .collect(Collectors.toList());
+
+    assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
+
   }
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer() {
-    return new KafkaConsumer<>(
-        Map.of(
-            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
-            ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
-            ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20, // to check multiple polls
-            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
-            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
+  @Test
+  void backwardEmitterSeekToEnd() {
+    final int numMessages = 100;
+    final Map<TopicPartition, Long> targetOffsets = new HashMap<>();
+    for (int i = 0; i < PARTITIONS; i++) {
+      targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION);
+    }
+
+    var backwardEmitter = new BackwardRecordEmitter(
+        this::createConsumer,
+        new OffsetsSeekBackward(TOPIC,
+            new ConsumerPosition(SeekType.OFFSET, targetOffsets, SeekDirection.BACKWARD),
+            numMessages
         )
     );
+
+    var polledValues = Flux.create(backwardEmitter)
+        .map(this::deserialize)
+        .limitRequest(numMessages)
+        .collect(Collectors.toList())
+        .block();
+
+    var expectedValues = SENT_RECORDS.stream()
+        .filter(r -> r.getOffset() < targetOffsets.get(r.getTp()))
+        .filter(r -> r.getOffset() >= (targetOffsets.get(r.getTp()) - (100 / PARTITIONS)))
+        .map(Record::getValue)
+        .collect(Collectors.toList());
+
+
+    assertThat(polledValues).containsExactlyInAnyOrderElementsOf(expectedValues);
+  }
+
+  @Test
+  void backwardEmitterSeekToBegin() {
+    Map<TopicPartition, Long> offsets = new HashMap<>();
+    for (int i = 0; i < PARTITIONS; i++) {
+      offsets.put(new TopicPartition(TOPIC, i), 0L);
+    }
+
+    var backwardEmitter = new BackwardRecordEmitter(
+        this::createConsumer,
+        new OffsetsSeekBackward(TOPIC,
+            new ConsumerPosition(SeekType.OFFSET, offsets, SeekDirection.BACKWARD),
+            100
+        )
+    );
+
+    var polledValues = Flux.create(backwardEmitter)
+        .map(this::deserialize)
+        .limitRequest(Long.MAX_VALUE)
+        .collect(Collectors.toList())
+        .block();
+
+    assertThat(polledValues).isEmpty();
+  }
+
+  private KafkaConsumer<Bytes, Bytes> createConsumer() {
+    return createConsumer(Map.of());
+  }
+
+  private KafkaConsumer<Bytes, Bytes> createConsumer(Map<String, Object> properties) {
+    final Map<String, ? extends Serializable> map = Map.of(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
+        ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
+        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20, // to check multiple polls
+        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
+    );
+    Properties props = new Properties();
+    props.putAll(map);
+    props.putAll(properties);
+    return new KafkaConsumer<>(props);
   }
 
   private String deserialize(ConsumerRecord<Bytes, Bytes> rec) {
@@ -175,7 +334,7 @@ class RecordEmitterTest extends AbstractBaseTest {
   @Value
   static class Record {
     String value;
-    int partition;
+    TopicPartition tp;
     long offset;
     long timestamp;
   }

+ 35 - 23
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/OffsetsSeekTest.java

@@ -21,11 +21,11 @@ import org.junit.jupiter.api.Test;
 
 class OffsetsSeekTest {
 
-  String topic = "test";
-  TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0
-  TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10
-  TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20
-  TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30
+  final String topic = "test";
+  final TopicPartition tp0 = new TopicPartition(topic, 0); //offsets: start 0, end 0
+  final TopicPartition tp1 = new TopicPartition(topic, 1); //offsets: start 10, end 10
+  final TopicPartition tp2 = new TopicPartition(topic, 2); //offsets: start 0, end 20
+  final TopicPartition tp3 = new TopicPartition(topic, 3); //offsets: start 25, end 30
 
   MockConsumer<Bytes, Bytes> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
 
@@ -57,7 +57,7 @@ class OffsetsSeekTest {
         topic,
         new ConsumerPosition(
             SeekType.BEGINNING,
-            Map.of(0, 0L, 1, 0L),
+            Map.of(tp0, 0L, tp1, 0L),
             SeekDirection.FORWARD
         )
     );
@@ -74,7 +74,7 @@ class OffsetsSeekTest {
         topic,
         new ConsumerPosition(
             SeekType.BEGINNING,
-            Map.of(2, 0L, 3, 0L),
+            Map.of(tp2, 0L, tp3, 0L),
             SeekDirection.BACKWARD
         ),
         10
@@ -82,8 +82,8 @@ class OffsetsSeekTest {
 
     seek.assignAndSeek(consumer);
     assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2, tp3);
-    assertThat(consumer.position(tp2)).isEqualTo(15L);
-    assertThat(consumer.position(tp3)).isEqualTo(25L);
+    assertThat(consumer.position(tp2)).isEqualTo(20L);
+    assertThat(consumer.position(tp3)).isEqualTo(30L);
   }
 
   @Test
@@ -110,8 +110,8 @@ class OffsetsSeekTest {
     assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2, tp3);
     assertThat(consumer.position(tp0)).isZero();
     assertThat(consumer.position(tp1)).isEqualTo(10L);
-    assertThat(consumer.position(tp2)).isEqualTo(15L);
-    assertThat(consumer.position(tp3)).isEqualTo(25L);
+    assertThat(consumer.position(tp2)).isEqualTo(20L);
+    assertThat(consumer.position(tp3)).isEqualTo(30L);
   }
 
 
@@ -121,14 +121,12 @@ class OffsetsSeekTest {
         topic,
         new ConsumerPosition(
             SeekType.OFFSET,
-            Map.of(0, 0L, 1, 1L, 2, 2L),
+            Map.of(tp0, 0L, tp1, 1L, tp2, 2L),
             SeekDirection.FORWARD
         )
     );
     seek.assignAndSeek(consumer);
-    assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2);
-    assertThat(consumer.position(tp0)).isZero();
-    assertThat(consumer.position(tp1)).isEqualTo(1L);
+    assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2);
     assertThat(consumer.position(tp2)).isEqualTo(2L);
   }
 
@@ -138,16 +136,30 @@ class OffsetsSeekTest {
         topic,
         new ConsumerPosition(
             SeekType.OFFSET,
-            Map.of(0, 0L, 1, 1L, 2, 2L),
-            SeekDirection.FORWARD
+            Map.of(tp0, 0L, tp1, 1L, tp2, 20L),
+            SeekDirection.BACKWARD
         ),
         2
     );
     seek.assignAndSeek(consumer);
-    assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp0, tp1, tp2);
-    assertThat(consumer.position(tp0)).isZero();
-    assertThat(consumer.position(tp1)).isEqualTo(1L);
-    assertThat(consumer.position(tp2)).isZero();
+    assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2);
+    assertThat(consumer.position(tp2)).isEqualTo(20L);
+  }
+
+  @Test
+  void backwardSeekToOffsetOnlyOnePartition() {
+    var seek = new OffsetsSeekBackward(
+        topic,
+        new ConsumerPosition(
+            SeekType.OFFSET,
+            Map.of(tp2, 20L),
+            SeekDirection.BACKWARD
+        ),
+        20
+    );
+    seek.assignAndSeek(consumer);
+    assertThat(consumer.assignment()).containsExactlyInAnyOrder(tp2);
+    assertThat(consumer.position(tp2)).isEqualTo(20L);
   }
 
 
@@ -159,14 +171,14 @@ class OffsetsSeekTest {
     @BeforeEach
     void assignAndCreateOffsets() {
       consumer.assign(List.of(tp0, tp1, tp2, tp3));
-      offsets = new OffsetsSeek.WaitingOffsets(topic, consumer);
+      offsets = new OffsetsSeek.WaitingOffsets(topic, consumer, List.of(tp0, tp1, tp2, tp3));
     }
 
     @Test
     void collectsSignificantOffsetsMinus1ForAssignedPartitions() {
       // offsets for partition 0 & 1 should be skipped because they
       // effectively contains no data (start offset = end offset)
-      assertThat(offsets.offsets).containsExactlyInAnyOrderEntriesOf(
+      assertThat(offsets.getEndOffsets()).containsExactlyInAnyOrderEntriesOf(
           Map.of(2, 19L, 3, 29L)
       );
     }