Просмотр исходного кода

Merge d2e6e6a50964754327a3e59b1e2df5507b61625f into cca2c9699755c2128bb88cf8920c9ed4414dbd58

Ilya Kuramshin 1 год назад
Родитель
Сommit
aa29ccae20
27 измененных файлов с 1010 добавлено и 299 удалено
  1. 56 56
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
  2. 9 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
  3. 5 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java
  4. 0 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  5. 8 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
  6. 90 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java
  7. 5 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java
  8. 0 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  9. 4 12
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java
  10. 8 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java
  11. 5 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java
  12. 5 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java
  13. 9 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java
  14. 46 41
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java
  15. 1 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
  16. 65 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java
  17. 109 44
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
  18. 45 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java
  19. 4 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
  20. 1 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java
  21. 195 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java
  22. 35 14
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java
  23. 6 8
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java
  24. 90 24
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java
  25. 63 50
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
  26. 3 8
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java
  27. 143 1
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+ 56 - 56
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -5,13 +5,14 @@ import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_
 import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
 import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
 import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
-import static java.util.stream.Collectors.toMap;
 
 import com.provectus.kafka.ui.api.MessagesApi;
-import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
+import com.provectus.kafka.ui.model.MessageFilterIdDTO;
+import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
 import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SeekTypeDTO;
 import com.provectus.kafka.ui.model.SerdeUsageDTO;
@@ -25,14 +26,11 @@ import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.MessagesService;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import javax.annotation.Nullable;
 import javax.validation.Valid;
+import javax.validation.ValidationException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -76,6 +74,7 @@ public class MessagesController extends AbstractController implements MessagesAp
         .map(ResponseEntity::ok);
   }
 
+  @Deprecated
   @Override
   public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
                                                                            String topicName,
@@ -88,6 +87,23 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                            String keySerde,
                                                                            String valueSerde,
                                                                            ServerWebExchange exchange) {
+    throw new ValidationException("Not supported");
+  }
+
+
+  @Override
+  public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
+                                                                             PollingModeDTO mode,
+                                                                             List<Integer> partitions,
+                                                                             Integer limit,
+                                                                             String stringFilter,
+                                                                             String smartFilterId,
+                                                                             Long offset,
+                                                                             Long timestamp,
+                                                                             String keySerde,
+                                                                             String valueSerde,
+                                                                             String cursor,
+                                                                             ServerWebExchange exchange) {
     var contextBuilder = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
@@ -98,27 +114,26 @@ public class MessagesController extends AbstractController implements MessagesAp
       contextBuilder.auditActions(AuditAction.VIEW);
     }
 
-    seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
-    seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
-    filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
-
-    var positions = new ConsumerPosition(
-        seekType,
-        topicName,
-        parseSeekTo(topicName, seekType, seekTo)
-    );
-    Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
-        ResponseEntity.ok(
-            messagesService.loadMessages(
-                getCluster(clusterName), topicName, positions, q, filterQueryType,
-                limit, seekDirection, keySerde, valueSerde)
-        )
-    );
-
-    var context = contextBuilder.build();
-    return validateAccess(context)
-        .then(job)
-        .doOnEach(sig -> audit(context, sig));
+    var accessContext = contextBuilder.build();
+
+    Flux<TopicMessageEventDTO> messagesFlux;
+    if (cursor != null) {
+      messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
+    } else {
+      messagesFlux = messagesService.loadMessages(
+          getCluster(clusterName),
+          topicName,
+          ConsumerPosition.create(mode, topicName, partitions, timestamp, offset),
+          stringFilter,
+          smartFilterId,
+          limit,
+          keySerde,
+          valueSerde
+      );
+    }
+    return accessControlService.validateAccess(accessContext)
+        .then(Mono.just(ResponseEntity.ok(messagesFlux)))
+        .doOnEach(sig -> auditService.audit(accessContext, sig));
   }
 
   @Override
@@ -140,34 +155,6 @@ public class MessagesController extends AbstractController implements MessagesAp
     ).doOnEach(sig -> audit(context, sig));
   }
 
-  /**
-   * The format is [partition]::[offset] for specifying offsets
-   * or [partition]::[timestamp in millis] for specifying timestamps.
-   */
-  @Nullable
-  private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
-    if (seekTo == null || seekTo.isEmpty()) {
-      if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
-        return null;
-      }
-      throw new ValidationException("seekTo should be set if seekType is " + seekType);
-    }
-    return seekTo.stream()
-        .map(p -> {
-          String[] split = p.split("::");
-          if (split.length != 2) {
-            throw new IllegalArgumentException(
-                "Wrong seekTo argument format. See API docs for details");
-          }
-
-          return Pair.of(
-              new TopicPartition(topic, Integer.parseInt(split[0])),
-              Long.parseLong(split[1])
-          );
-        })
-        .collect(toMap(Pair::getKey, Pair::getValue));
-  }
-
   @Override
   public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
                                                                  String topicName,
@@ -195,7 +182,20 @@ public class MessagesController extends AbstractController implements MessagesAp
     );
   }
 
+  @Override
+  public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
+                                                                 String topicName,
+                                                                 Mono<MessageFilterRegistrationDTO> registration,
+                                                                 ServerWebExchange exchange) {
 
+    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+        .cluster(clusterName)
+        .topic(topicName)
+        .topicActions(MESSAGES_READ)
+        .build());
 
-
+    return validateAccess.then(registration)
+        .map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
+        .map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
+  }
 }

+ 9 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import jakarta.annotation.Nullable;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
@@ -21,12 +22,14 @@ abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<T
     return records;
   }
 
-  protected boolean sendLimitReached() {
+  protected boolean isSendLimitReached() {
     return messagesProcessing.limitReached();
   }
 
-  protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
-    messagesProcessing.send(sink, records);
+  protected void send(FluxSink<TopicMessageEventDTO> sink,
+                      Iterable<ConsumerRecord<Bytes, Bytes>> records,
+                      @Nullable Cursor.Tracking cursor) {
+    messagesProcessing.send(sink, records, cursor);
   }
 
   protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
@@ -37,8 +40,9 @@ abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<T
     messagesProcessing.sentConsumingInfo(sink, records);
   }
 
-  protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
-    messagesProcessing.sendFinishEvent(sink);
+  // cursor is null if target partitions were fully polled (no, need to do paging)
+  protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
+    messagesProcessing.sendFinishEvents(sink, cursor);
     sink.complete();
   }
 }

+ 5 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java

@@ -18,18 +18,15 @@ public class BackwardEmitter extends RangePollingEmitter {
                          int messagesPerPage,
                          ConsumerRecordDeserializer deserializer,
                          Predicate<TopicMessageDTO> filter,
-                         PollingSettings pollingSettings) {
+                         PollingSettings pollingSettings,
+                         Cursor.Tracking cursor) {
     super(
         consumerSupplier,
         consumerPosition,
         messagesPerPage,
-        new MessagesProcessing(
-            deserializer,
-            filter,
-            false,
-            messagesPerPage
-        ),
-        pollingSettings
+        new MessagesProcessing(deserializer, filter, false, messagesPerPage),
+        pollingSettings,
+        cursor
     );
   }
 

+ 0 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java


+ 8 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -2,6 +2,8 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO;
+import javax.annotation.Nullable;
 import reactor.core.publisher.FluxSink;
 
 class ConsumingStats {
@@ -26,10 +28,15 @@ class ConsumingStats {
     filterApplyErrors++;
   }
 
-  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
+  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.DONE)
+            .cursor(
+                cursor != null
+                    ? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
+                    : null
+            )
             .consuming(createConsumingStats())
     );
   }

+ 90 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java

@@ -0,0 +1,90 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.PollingModeDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.kafka.common.TopicPartition;
+
+public record Cursor(ConsumerRecordDeserializer deserializer,
+                     ConsumerPosition consumerPosition,
+                     Predicate<TopicMessageDTO> filter,
+                     int limit) {
+
+  public static class Tracking {
+    private final ConsumerRecordDeserializer deserializer;
+    private final ConsumerPosition originalPosition;
+    private final Predicate<TopicMessageDTO> filter;
+    private final int limit;
+    private final Function<Cursor, String> registerAction;
+
+    //topic -> partition -> offset
+    private final Table<String, Integer, Long> trackingOffsets = HashBasedTable.create();
+
+    public Tracking(ConsumerRecordDeserializer deserializer,
+                    ConsumerPosition originalPosition,
+                    Predicate<TopicMessageDTO> filter,
+                    int limit,
+                    Function<Cursor, String> registerAction) {
+      this.deserializer = deserializer;
+      this.originalPosition = originalPosition;
+      this.filter = filter;
+      this.limit = limit;
+      this.registerAction = registerAction;
+    }
+
+    void trackOffset(String topic, int partition, long offset) {
+      trackingOffsets.put(topic, partition, offset);
+    }
+
+    void initOffsets(Map<TopicPartition, Long> initialSeekOffsets) {
+      initialSeekOffsets.forEach((tp, off) -> trackOffset(tp.topic(), tp.partition(), off));
+    }
+
+    private Map<TopicPartition, Long> getOffsetsMap(int offsetToAdd) {
+      Map<TopicPartition, Long> result = new HashMap<>();
+      trackingOffsets.rowMap()
+          .forEach((topic, partsMap) ->
+              partsMap.forEach((p, off) -> result.put(new TopicPartition(topic, p), off + offsetToAdd)));
+      return result;
+    }
+
+    String registerCursor() {
+      return registerAction.apply(
+          new Cursor(
+              deserializer,
+              new ConsumerPosition(
+                  switch (originalPosition.pollingMode()) {
+                    case TO_OFFSET, TO_TIMESTAMP, LATEST -> PollingModeDTO.TO_OFFSET;
+                    case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> PollingModeDTO.FROM_OFFSET;
+                    case TAILING -> throw new IllegalStateException();
+                  },
+                  originalPosition.topic(),
+                  originalPosition.partitions(),
+                  null,
+                  new ConsumerPosition.Offsets(
+                      null,
+                      getOffsetsMap(
+                          switch (originalPosition.pollingMode()) {
+                            case TO_OFFSET, TO_TIMESTAMP, LATEST -> 0;
+                            // when doing forward polling we need to start from latest msg's offset + 1
+                            case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> 1;
+                            case TAILING -> throw new IllegalStateException();
+                          }
+                      )
+                  )
+              ),
+              filter,
+              limit
+          )
+      );
+    }
+  }
+
+}

+ 5 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java

@@ -18,18 +18,15 @@ public class ForwardEmitter extends RangePollingEmitter {
                         int messagesPerPage,
                         ConsumerRecordDeserializer deserializer,
                         Predicate<TopicMessageDTO> filter,
-                        PollingSettings pollingSettings) {
+                        PollingSettings pollingSettings,
+                        Cursor.Tracking cursor) {
     super(
         consumerSupplier,
         consumerPosition,
         messagesPerPage,
-        new MessagesProcessing(
-            deserializer,
-            filter,
-            true,
-            messagesPerPage
-        ),
-        pollingSettings
+        new MessagesProcessing(deserializer, filter, true, messagesPerPage),
+        pollingSettings,
+        cursor
     );
   }
 

+ 0 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java


+ 4 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java

@@ -1,7 +1,6 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import groovy.json.JsonSlurper;
 import java.util.function.Predicate;
@@ -22,23 +21,16 @@ public class MessageFilters {
   private MessageFilters() {
   }
 
-  public static Predicate<TopicMessageDTO> createMsgFilter(String query, MessageFilterTypeDTO type) {
-    switch (type) {
-      case STRING_CONTAINS:
-        return containsStringFilter(query);
-      case GROOVY_SCRIPT:
-        return groovyScriptFilter(query);
-      default:
-        throw new IllegalStateException("Unknown query type: " + type);
-    }
+  public static Predicate<TopicMessageDTO> noop() {
+    return e -> true;
   }
 
-  static Predicate<TopicMessageDTO> containsStringFilter(String string) {
+  public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
     return msg -> StringUtils.contains(msg.getKey(), string)
         || StringUtils.contains(msg.getContent(), string);
   }
 
-  static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
+  public static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
     var engine = getGroovyEngine();
     var compiledScript = compileScript(engine, script);
     var jsonSlurper = new JsonSlurper();

+ 8 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -39,7 +39,9 @@ class MessagesProcessing {
     return limit != null && sentMessages >= limit;
   }
 
-  void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
+  void send(FluxSink<TopicMessageEventDTO> sink,
+            Iterable<ConsumerRecord<Bytes, Bytes>> polled,
+            @Nullable Cursor.Tracking cursor) {
     sortForSending(polled, ascendingSortBeforeSend)
         .forEach(rec -> {
           if (!limitReached() && !sink.isCancelled()) {
@@ -53,6 +55,9 @@ class MessagesProcessing {
                 );
                 sentMessages++;
               }
+              if (cursor != null) {
+                cursor.trackOffset(rec.topic(), rec.partition(), rec.offset());
+              }
             } catch (Exception e) {
               consumingStats.incFilterApplyError();
               log.trace("Error applying filter for message {}", topicMessage);
@@ -67,9 +72,9 @@ class MessagesProcessing {
     }
   }
 
-  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
+  void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
     if (!sink.isCancelled()) {
-      consumingStats.sendFinishEvent(sink);
+      consumingStats.sendFinishEvent(sink, cursor);
     }
   }
 

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
@@ -62,4 +63,8 @@ class OffsetsInfo {
     return cnt.getValue();
   }
 
+  public Set<TopicPartition> allTargetPartitions() {
+    return Sets.union(nonEmptyPartitions, emptyPartitions);
+  }
+
 }

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
@@ -32,6 +33,10 @@ public record PolledRecords(int count,
     return records.iterator();
   }
 
+  public Set<TopicPartition> partitions() {
+    return records.partitions();
+  }
+
   private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
     int polledBytes = 0;
     for (ConsumerRecord<Bytes, Bytes> rec : recs) {

+ 9 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java

@@ -17,6 +17,7 @@ import reactor.core.publisher.FluxSink;
 abstract class RangePollingEmitter extends AbstractEmitter {
 
   private final Supplier<EnhancedConsumer> consumerSupplier;
+  private final Cursor.Tracking cursor;
   protected final ConsumerPosition consumerPosition;
   protected final int messagesPerPage;
 
@@ -24,11 +25,13 @@ abstract class RangePollingEmitter extends AbstractEmitter {
                                 ConsumerPosition consumerPosition,
                                 int messagesPerPage,
                                 MessagesProcessing messagesProcessing,
-                                PollingSettings pollingSettings) {
+                                PollingSettings pollingSettings,
+                                Cursor.Tracking cursor) {
     super(messagesProcessing, pollingSettings);
     this.consumerPosition = consumerPosition;
     this.messagesPerPage = messagesPerPage;
     this.consumerSupplier = consumerSupplier;
+    this.cursor = cursor;
   }
 
   protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
@@ -46,18 +49,20 @@ abstract class RangePollingEmitter extends AbstractEmitter {
     try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Consumer created");
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
+      cursor.initOffsets(seekOperations.getOffsetsForSeek());
+
       TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
       log.debug("Starting from offsets {}", pollRange);
 
-      while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
+      while (!sink.isCancelled() && !pollRange.isEmpty() && !isSendLimitReached()) {
         var polled = poll(consumer, sink, pollRange);
-        send(sink, polled);
+        send(sink, polled, cursor);
         pollRange = nextPollingRange(pollRange, seekOperations);
       }
       if (sink.isCancelled()) {
         log.debug("Polling finished due to sink cancellation");
       }
-      sendFinishStatsAndCompleteSink(sink);
+      sendFinishStatsAndCompleteSink(sink, pollRange.isEmpty() ? null : cursor);
       log.debug("Polling finished");
     } catch (InterruptException kafkaInterruptException) {
       log.debug("Polling finished due to thread interruption");

+ 46 - 41
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java

@@ -1,13 +1,13 @@
 package com.provectus.kafka.ui.emitter;
 
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
 import org.apache.commons.lang3.mutable.MutableLong;
@@ -22,17 +22,11 @@ public class SeekOperations {
   private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
 
   public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
-    OffsetsInfo offsetsInfo;
-    if (consumerPosition.getSeekTo() == null) {
-      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
-    } else {
-      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet());
-    }
-    return new SeekOperations(
-        consumer,
-        offsetsInfo,
-        getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo())
-    );
+    OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty()
+        ? new OffsetsInfo(consumer, consumerPosition.topic())
+        : new OffsetsInfo(consumer, consumerPosition.partitions());
+    var offsetsToSeek = getOffsetsForSeek(consumer, offsetsInfo, consumerPosition);
+    return new SeekOperations(consumer, offsetsInfo, offsetsToSeek);
   }
 
   public void assignAndSeekNonEmptyPartitions() {
@@ -75,27 +69,26 @@ public class SeekOperations {
   @VisibleForTesting
   static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
                                                      OffsetsInfo offsetsInfo,
-                                                     SeekTypeDTO seekType,
-                                                     @Nullable Map<TopicPartition, Long> seekTo) {
-    switch (seekType) {
-      case LATEST:
-        return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
-      case BEGINNING:
-        return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
-      case OFFSET:
-        Preconditions.checkNotNull(seekTo);
-        return fixOffsets(offsetsInfo, seekTo);
-      case TIMESTAMP:
-        Preconditions.checkNotNull(seekTo);
-        return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
-      default:
-        throw new IllegalStateException();
-    }
+                                                     ConsumerPosition position) {
+    return switch (position.pollingMode()) {
+      case TAILING -> consumer.endOffsets(offsetsInfo.allTargetPartitions());
+      case LATEST -> consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
+      case EARLIEST -> consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
+      case FROM_OFFSET, TO_OFFSET -> fixOffsets(offsetsInfo, requireNonNull(position.offsets()));
+      case FROM_TIMESTAMP, TO_TIMESTAMP ->
+          offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, requireNonNull(position.timestamp()));
+    };
   }
 
-  private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo, Map<TopicPartition, Long> offsets) {
-    offsets = new HashMap<>(offsets);
-    offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+  private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo,
+                                                      ConsumerPosition.Offsets positionOffset) {
+    var offsets = new HashMap<TopicPartition, Long>();
+    if (positionOffset.offset() != null) {
+      offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
+    } else {
+      offsets.putAll(requireNonNull(positionOffset.tpOffsets()));
+      offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+    }
 
     Map<TopicPartition, Long> result = new HashMap<>();
     offsets.forEach((tp, targetOffset) -> {
@@ -112,13 +105,25 @@ public class SeekOperations {
     return result;
   }
 
-  private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
-                                                               Map<TopicPartition, Long> timestamps) {
-    timestamps = new HashMap<>(timestamps);
-    timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+  private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer,
+                                                               PollingModeDTO pollingMode,
+                                                               OffsetsInfo offsetsInfo,
+                                                               Long timestamp) {
+    Map<TopicPartition, Long> timestamps = new HashMap<>();
+    offsetsInfo.getNonEmptyPartitions().forEach(tp -> timestamps.put(tp, timestamp));
 
-    return consumer.offsetsForTimes(timestamps).entrySet().stream()
-        .filter(e -> e.getValue() != null)
-        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
+    Map<TopicPartition, Long> result = new HashMap<>();
+    consumer.offsetsForTimes(timestamps).forEach((tp, offsetAndTimestamp) -> {
+      if (offsetAndTimestamp == null) {
+        if (pollingMode == TO_TIMESTAMP && offsetsInfo.getNonEmptyPartitions().contains(tp)) {
+          // if no offset was returned this means that *all* timestamps are lower
+          // than target timestamp. Is case of TO_OFFSET mode we need to read from the ending of tp
+          result.put(tp, offsetsInfo.getEndOffsets().get(tp));
+        }
+      } else {
+        result.put(tp, offsetAndTimestamp.offset());
+      }
+    });
+    return result;
   }
 }

+ 1 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -35,7 +35,7 @@ public class TailingEmitter extends AbstractEmitter {
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");
         var polled = poll(sink, consumer);
-        send(sink, polled);
+        send(sink, polled, null);
       }
       sink.complete();
       log.debug("Tailing finished");
@@ -55,5 +55,4 @@ public class TailingEmitter extends AbstractEmitter {
     consumer.assign(seekOffsets.keySet());
     seekOffsets.forEach(consumer::seek);
   }
-
 }

+ 65 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java

@@ -1,14 +1,72 @@
 package com.provectus.kafka.ui.model;
 
+import com.google.common.base.Preconditions;
+import com.provectus.kafka.ui.exception.ValidationException;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
-import lombok.Value;
 import org.apache.kafka.common.TopicPartition;
 
-@Value
-public class ConsumerPosition {
-  SeekTypeDTO seekType;
-  String topic;
-  @Nullable
-  Map<TopicPartition, Long> seekTo; // null if positioning should apply to all tps
+public record ConsumerPosition(PollingModeDTO pollingMode,
+                               String topic,
+                               List<TopicPartition> partitions, //all partitions if list is empty
+                               @Nullable Long timestamp,
+                               @Nullable Offsets offsets) {
+
+  public record Offsets(@Nullable Long offset, //should be applied to all partitions
+                        @Nullable Map<TopicPartition, Long> tpOffsets) {
+    public Offsets {
+      // only one of properties should be set
+      Preconditions.checkArgument((offset == null && tpOffsets != null) || (offset != null && tpOffsets == null));
+    }
+  }
+
+  public static ConsumerPosition create(PollingModeDTO pollingMode,
+                                        String topic,
+                                        @Nullable List<Integer> partitions,
+                                        @Nullable Long timestamp,
+                                        @Nullable Long offset) {
+    @Nullable var offsets = parseAndValidateOffsets(pollingMode, offset);
+
+    var topicPartitions = Optional.ofNullable(partitions).orElse(List.of())
+        .stream()
+        .map(p -> new TopicPartition(topic, p))
+        .collect(Collectors.toList());
+
+    // if offsets are specified - inferring partitions list from there
+    topicPartitions = (offsets != null && offsets.tpOffsets() != null)
+        ? List.copyOf(offsets.tpOffsets().keySet())
+        : topicPartitions;
+
+    return new ConsumerPosition(
+        pollingMode,
+        topic,
+        topicPartitions,
+        validateTimestamp(pollingMode, timestamp),
+        offsets
+    );
+  }
+
+  private static Long validateTimestamp(PollingModeDTO pollingMode, @Nullable Long ts) {
+    if (pollingMode == PollingModeDTO.FROM_TIMESTAMP || pollingMode == PollingModeDTO.TO_TIMESTAMP) {
+      if (ts == null) {
+        throw new ValidationException("timestamp not provided for " + pollingMode);
+      }
+    }
+    return ts;
+  }
+
+  private static Offsets parseAndValidateOffsets(PollingModeDTO pollingMode,
+                                                 @Nullable Long offset) {
+    if (pollingMode == PollingModeDTO.FROM_OFFSET || pollingMode == PollingModeDTO.TO_OFFSET) {
+      if (offset == null) {
+        throw new ValidationException("offsets not provided for " + pollingMode);
+      }
+      return new Offsets(offset, null);
+    }
+    return null;
+  }
+
 }

+ 109 - 44
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -1,8 +1,13 @@
 package com.provectus.kafka.ui.service;
 
+import com.google.common.base.Charsets;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.emitter.BackwardEmitter;
+import com.provectus.kafka.ui.emitter.Cursor;
 import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilters;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
@@ -11,12 +16,12 @@ import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.time.Instant;
@@ -27,12 +32,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -50,8 +55,11 @@ import reactor.core.scheduler.Schedulers;
 @Slf4j
 public class MessagesService {
 
+  private static final long SALT_FOR_HASHING = ThreadLocalRandom.current().nextLong();
+
   private static final int DEFAULT_MAX_PAGE_SIZE = 500;
   private static final int DEFAULT_PAGE_SIZE = 100;
+
   // limiting UI messages rate to 20/sec in tailing mode
   private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
 
@@ -61,6 +69,12 @@ public class MessagesService {
   private final int maxPageSize;
   private final int defaultPageSize;
 
+  private final Cache<String, Predicate<TopicMessageDTO>> registeredFilters = CacheBuilder.newBuilder()
+      .maximumSize(PollingCursorsStorage.MAX_SIZE)
+      .build();
+
+  private final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage();
+
   public MessagesService(AdminClientService adminClientService,
                          DeserializationService deserializationService,
                          ConsumerGroupService consumerGroupService,
@@ -86,10 +100,7 @@ public class MessagesService {
   public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
     Predicate<TopicMessageDTO> predicate;
     try {
-      predicate = MessageFilters.createMsgFilter(
-          execData.getFilterCode(),
-          MessageFilterTypeDTO.GROOVY_SCRIPT
-      );
+      predicate = MessageFilters.groovyScriptFilter(execData.getFilterCode());
     } catch (Exception e) {
       log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e);
       return new SmartFilterTestExecutionResultDTO()
@@ -197,67 +208,103 @@ public class MessagesService {
     return new KafkaProducer<>(properties);
   }
 
-  public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
+  public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
+                                                 String topic,
                                                  ConsumerPosition consumerPosition,
-                                                 @Nullable String query,
-                                                 MessageFilterTypeDTO filterQueryType,
-                                                 @Nullable Integer pageSize,
-                                                 SeekDirectionDTO seekDirection,
+                                                 @Nullable String containsStringFilter,
+                                                 @Nullable String filterId,
+                                                 @Nullable Integer limit,
                                                  @Nullable String keySerde,
                                                  @Nullable String valueSerde) {
+    return loadMessages(
+        cluster,
+        topic,
+        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
+        consumerPosition,
+        getMsgFilter(containsStringFilter, filterId),
+        fixPageSize(limit)
+    );
+  }
+
+  public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic, String cursorId) {
+    Cursor cursor = cursorsStorage.getCursor(cursorId)
+        .orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache."));
+    return loadMessages(
+        cluster,
+        topic,
+        cursor.deserializer(),
+        cursor.consumerPosition(),
+        cursor.filter(),
+        cursor.limit()
+    );
+  }
+
+  private Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
+                                                  String topic,
+                                                  ConsumerRecordDeserializer deserializer,
+                                                  ConsumerPosition consumerPosition,
+                                                  Predicate<TopicMessageDTO> filter,
+                                                  int limit) {
     return withExistingTopic(cluster, topic)
         .flux()
         .publishOn(Schedulers.boundedElastic())
-        .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
-            filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
-  }
-
-  private int fixPageSize(@Nullable Integer pageSize) {
-    return Optional.ofNullable(pageSize)
-        .filter(ps -> ps > 0 && ps <= maxPageSize)
-        .orElse(defaultPageSize);
+        .flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit));
   }
 
   private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
-                                                      String topic,
+                                                      ConsumerRecordDeserializer deserializer,
                                                       ConsumerPosition consumerPosition,
-                                                      @Nullable String query,
-                                                      MessageFilterTypeDTO filterQueryType,
-                                                      int limit,
-                                                      SeekDirectionDTO seekDirection,
-                                                      @Nullable String keySerde,
-                                                      @Nullable String valueSerde) {
-
-    var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
-    var filter = getMsgFilter(query, filterQueryType);
-    var emitter = switch (seekDirection) {
-      case FORWARD -> new ForwardEmitter(
+                                                      Predicate<TopicMessageDTO> filter,
+                                                      int limit) {
+    var emitter = switch (consumerPosition.pollingMode()) {
+      case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
+          consumerPosition,
+          limit,
+          deserializer,
+          filter,
+          cluster.getPollingSettings(),
+          cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
       );
-      case BACKWARD -> new BackwardEmitter(
+      case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
+          consumerPosition,
+          limit,
+          deserializer,
+          filter,
+          cluster.getPollingSettings(),
+          cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
       );
       case TAILING -> new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition, deserializer, filter, cluster.getPollingSettings()
+          consumerPosition,
+          deserializer,
+          filter,
+          cluster.getPollingSettings()
       );
     };
     return Flux.create(emitter)
-        .map(throttleUiPublish(seekDirection));
+        .map(throttleUiPublish(consumerPosition.pollingMode()));
   }
 
-  private Predicate<TopicMessageDTO> getMsgFilter(String query,
-                                                  MessageFilterTypeDTO filterQueryType) {
-    if (StringUtils.isEmpty(query)) {
-      return evt -> true;
+  private Predicate<TopicMessageDTO> getMsgFilter(@Nullable String containsStrFilter,
+                                                  @Nullable String smartFilterId) {
+    Predicate<TopicMessageDTO> messageFilter = MessageFilters.noop();
+    if (containsStrFilter != null) {
+      messageFilter = messageFilter.and(MessageFilters.containsStringFilter(containsStrFilter));
     }
-    return MessageFilters.createMsgFilter(query, filterQueryType);
+    if (smartFilterId != null) {
+      var registered = registeredFilters.getIfPresent(smartFilterId);
+      if (registered == null) {
+        throw new ValidationException("No filter was registered with id " + smartFilterId);
+      }
+      messageFilter = messageFilter.and(registered);
+    }
+    return messageFilter;
   }
 
-  private <T> UnaryOperator<T> throttleUiPublish(SeekDirectionDTO seekDirection) {
-    if (seekDirection == SeekDirectionDTO.TAILING) {
+  private <T> UnaryOperator<T> throttleUiPublish(PollingModeDTO pollingMode) {
+    if (pollingMode == PollingModeDTO.TAILING) {
       RateLimiter rateLimiter = RateLimiter.create(TAILING_UI_MESSAGE_THROTTLE_RATE);
       return m -> {
         rateLimiter.acquire(1);
@@ -269,4 +316,22 @@ public class MessagesService {
     return UnaryOperator.identity();
   }
 
+  private int fixPageSize(@Nullable Integer pageSize) {
+    return Optional.ofNullable(pageSize)
+        .filter(ps -> ps > 0 && ps <= maxPageSize)
+        .orElse(defaultPageSize);
+  }
+
+  public String registerMessageFilter(String groovyCode) {
+    String saltedCode = groovyCode + SALT_FOR_HASHING;
+    String filterId = Hashing.sha256()
+        .hashString(saltedCode, Charsets.UTF_8)
+        .toString()
+        .substring(0, 8);
+    if (registeredFilters.getIfPresent(filterId) == null) {
+      registeredFilters.put(filterId, MessageFilters.groovyScriptFilter(groovyCode));
+    }
+    return filterId;
+  }
+
 }

+ 45 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java

@@ -0,0 +1,45 @@
+package com.provectus.kafka.ui.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.provectus.kafka.ui.emitter.Cursor;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
+import org.apache.commons.lang3.RandomStringUtils;
+
+public class PollingCursorsStorage {
+
+  public static final int MAX_SIZE = 10_000;
+
+  private final Cache<String, Cursor> cursorsCache = CacheBuilder.newBuilder()
+      .maximumSize(MAX_SIZE)
+      .build();
+
+
+  public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer,
+                                         ConsumerPosition originalPosition,
+                                         Predicate<TopicMessageDTO> filter,
+                                         int limit) {
+    return new Cursor.Tracking(deserializer, originalPosition, filter, limit, this::register);
+  }
+
+  public Optional<Cursor> getCursor(String id) {
+    return Optional.ofNullable(cursorsCache.getIfPresent(id));
+  }
+
+  public String register(Cursor cursor) {
+    var id = RandomStringUtils.random(8, true, true);
+    cursorsCache.put(id, cursor);
+    return id;
+  }
+
+  @VisibleForTesting
+  public Map<String, Cursor> asMap() {
+    return cursorsCache.asMap();
+  }
+}

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,6 +1,6 @@
 package com.provectus.kafka.ui.service.analyze;
 
-import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
+import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST;
 
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.SeekOperations;
@@ -14,6 +14,7 @@ import java.io.Closeable;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import lombok.RequiredArgsConstructor;
@@ -104,7 +105,8 @@ public class TopicAnalysisService {
         consumer.partitionsFor(topicId.topicName)
             .forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
 
-        var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
+        var seekOperations =
+            SeekOperations.create(consumer, new ConsumerPosition(EARLIEST, topicId.topicName, List.of(), null, null));
         long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
         seekOperations.assignAndSeekNonEmptyPartitions();
 

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java

@@ -56,7 +56,7 @@ public class KafkaConsumerTests extends AbstractIntegrationTest {
     }
 
     long count = webTestClient.get()
-        .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
+        .uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?m=EARLIEST", LOCAL, topicName)
         .accept(TEXT_EVENT_STREAM)
         .exchange()
         .expectStatus()

+ 195 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java

@@ -0,0 +1,195 @@
+package com.provectus.kafka.ui.emitter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.PollingModeDTO;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.service.PollingCursorsStorage;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+class CursorTest extends AbstractIntegrationTest {
+
+  static final String TOPIC = CursorTest.class.getSimpleName() + "_" + UUID.randomUUID();
+  static final int MSGS_IN_PARTITION = 20;
+  static final int PAGE_SIZE = (MSGS_IN_PARTITION / 2) + 1; //to poll fill data set in 2 iterations
+
+  final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage();
+
+  @BeforeAll
+  static void setup() {
+    createTopic(new NewTopic(TOPIC, 1, (short) 1));
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      for (int i = 0; i < MSGS_IN_PARTITION; i++) {
+        producer.send(new ProducerRecord<>(TOPIC, "msg_" + i));
+      }
+    }
+  }
+
+  @AfterAll
+  static void cleanup() {
+    deleteTopic(TOPIC);
+  }
+
+  @Test
+  void backwardEmitter() {
+    var consumerPosition = new ConsumerPosition(PollingModeDTO.LATEST, TOPIC, List.of(), null, null);
+    var emitter = createBackwardEmitter(consumerPosition);
+    emitMessages(emitter, PAGE_SIZE);
+    var cursor = assertCursor(
+        PollingModeDTO.TO_OFFSET,
+        offsets -> assertThat(offsets)
+            .hasSize(1)
+            .containsEntry(new TopicPartition(TOPIC, 0), 9L)
+    );
+
+    // polling remaining records using registered cursor
+    emitter = createBackwardEmitterWithCursor(cursor);
+    emitMessages(emitter, MSGS_IN_PARTITION - PAGE_SIZE);
+    //checking no new cursors registered
+    assertThat(cursorsStorage.asMap()).hasSize(1).containsValue(cursor);
+  }
+
+  @Test
+  void forwardEmitter() {
+    var consumerPosition = new ConsumerPosition(PollingModeDTO.EARLIEST, TOPIC, List.of(), null, null);
+    var emitter = createForwardEmitter(consumerPosition);
+    emitMessages(emitter, PAGE_SIZE);
+    var cursor = assertCursor(
+        PollingModeDTO.FROM_OFFSET,
+        offsets -> assertThat(offsets)
+            .hasSize(1)
+            .containsEntry(new TopicPartition(TOPIC, 0), 11L)
+    );
+
+    //polling remaining records using registered cursor
+    emitter = createForwardEmitterWithCursor(cursor);
+    emitMessages(emitter, MSGS_IN_PARTITION - PAGE_SIZE);
+    //checking no new cursors registered
+    assertThat(cursorsStorage.asMap()).hasSize(1).containsValue(cursor);
+  }
+
+  private Cursor assertCursor(PollingModeDTO expectedMode,
+                              Consumer<Map<TopicPartition, Long>> offsetsAssert) {
+    Cursor registeredCursor = cursorsStorage.asMap().values().stream().findFirst().orElse(null);
+    assertThat(registeredCursor).isNotNull();
+    assertThat(registeredCursor.limit()).isEqualTo(PAGE_SIZE);
+    assertThat(registeredCursor.deserializer()).isNotNull();
+    assertThat(registeredCursor.filter()).isNotNull();
+
+    var cursorPosition = registeredCursor.consumerPosition();
+    assertThat(cursorPosition).isNotNull();
+    assertThat(cursorPosition.topic()).isEqualTo(TOPIC);
+    assertThat(cursorPosition.partitions()).isEqualTo(List.of());
+    assertThat(cursorPosition.pollingMode()).isEqualTo(expectedMode);
+
+    offsetsAssert.accept(cursorPosition.offsets().tpOffsets());
+    return registeredCursor;
+  }
+
+  private void emitMessages(AbstractEmitter emitter, int expectedCnt) {
+    StepVerifier.create(
+            Flux.create(emitter)
+                .filter(e -> e.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+                .map(e -> e.getMessage().getContent())
+        )
+        .expectNextCount(expectedCnt)
+        .verifyComplete();
+  }
+
+  private BackwardEmitter createBackwardEmitter(ConsumerPosition position) {
+    return new BackwardEmitter(
+        this::createConsumer,
+        position,
+        PAGE_SIZE,
+        createRecordsDeserializer(),
+        m -> true,
+        PollingSettings.createDefault(),
+        createCursor(position)
+    );
+  }
+
+  private BackwardEmitter createBackwardEmitterWithCursor(Cursor cursor) {
+    return new BackwardEmitter(
+        this::createConsumer,
+        cursor.consumerPosition(),
+        cursor.limit(),
+        cursor.deserializer(),
+        cursor.filter(),
+        PollingSettings.createDefault(),
+        createCursor(cursor.consumerPosition())
+    );
+  }
+
+  private ForwardEmitter createForwardEmitterWithCursor(Cursor cursor) {
+    return new ForwardEmitter(
+        this::createConsumer,
+        cursor.consumerPosition(),
+        cursor.limit(),
+        cursor.deserializer(),
+        cursor.filter(),
+        PollingSettings.createDefault(),
+        createCursor(cursor.consumerPosition())
+    );
+  }
+
+  private ForwardEmitter createForwardEmitter(ConsumerPosition position) {
+    return new ForwardEmitter(
+        this::createConsumer,
+        position,
+        PAGE_SIZE,
+        createRecordsDeserializer(),
+        m -> true,
+        PollingSettings.createDefault(),
+        createCursor(position)
+    );
+  }
+
+  private Cursor.Tracking createCursor(ConsumerPosition position) {
+    return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE);
+  }
+
+  private EnhancedConsumer createConsumer() {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, PAGE_SIZE - 1); // to check multiple polls
+    return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
+  }
+
+  private static ConsumerRecordDeserializer createRecordsDeserializer() {
+    Serde s = new StringSerde();
+    s.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());
+    return new ConsumerRecordDeserializer(
+        StringSerde.name(),
+        s.deserializer(null, Serde.Target.KEY),
+        StringSerde.name(),
+        s.deserializer(null, Serde.Target.VALUE),
+        StringSerde.name(),
+        s.deserializer(null, Serde.Target.KEY),
+        s.deserializer(null, Serde.Target.VALUE),
+        msg -> msg
+    );
+  }
+
+}

+ 35 - 14
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java

@@ -1,8 +1,13 @@
 package com.provectus.kafka.ui.emitter;
 
+import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST;
+import static com.provectus.kafka.ui.model.PollingModeDTO.LATEST;
+import static com.provectus.kafka.ui.model.PollingModeDTO.TAILING;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.PollingModeDTO;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -14,6 +19,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 class SeekOperationsTest {
 
@@ -40,13 +47,22 @@ class SeekOperationsTest {
   @Nested
   class GetOffsetsForSeek {
 
+    @Test
+    void tailing() {
+      var offsets = SeekOperations.getOffsetsForSeek(
+          consumer,
+          new OffsetsInfo(consumer, topic),
+          new ConsumerPosition(TAILING, topic, List.of(), null, null)
+      );
+      assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp0, 0L, tp1, 10L, tp2, 20L, tp3, 30L));
+    }
+
     @Test
     void latest() {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.LATEST,
-          null
+          new ConsumerPosition(LATEST, topic, List.of(), null, null)
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L));
     }
@@ -56,33 +72,38 @@ class SeekOperationsTest {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.BEGINNING,
-          null
+          new ConsumerPosition(EARLIEST, topic, List.of(), null, null)
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L));
     }
 
-    @Test
-    void offsets() {
+    @ParameterizedTest
+    @CsvSource({"TO_OFFSET", "FROM_OFFSET"})
+    void offsets(PollingModeDTO mode) {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.OFFSET,
-          Map.of(tp1, 10L, tp2, 10L, tp3, 26L)
+          new ConsumerPosition(
+              mode, topic, List.of(tp1, tp2, tp3), null,
+              new ConsumerPosition.Offsets(null, Map.of(tp1, 10L, tp2, 10L, tp3, 26L))
+          )
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 10L, tp3, 26L));
     }
 
-    @Test
-    void offsetsWithBoundsFixing() {
+    @ParameterizedTest
+    @CsvSource({"TO_OFFSET", "FROM_OFFSET"})
+    void offsetsWithBoundsFixing(PollingModeDTO mode) {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.OFFSET,
-          Map.of(tp1, 10L, tp2, 21L, tp3, 24L)
+          new ConsumerPosition(
+              mode, topic, List.of(tp1, tp2, tp3), null,
+              new ConsumerPosition.Offsets(null, Map.of(tp1, 10L, tp2, 21L, tp3, 24L))
+          )
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 25L));
     }
   }
 
-}
+}

+ 6 - 8
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java

@@ -4,10 +4,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.service.ClustersStorage;
 import com.provectus.kafka.ui.service.MessagesService;
 import java.time.Duration;
@@ -111,13 +110,12 @@ class TailingEmitterTest extends AbstractIntegrationTest {
 
     return applicationContext.getBean(MessagesService.class)
         .loadMessages(cluster, topicName,
-            new ConsumerPosition(SeekTypeDTO.LATEST, topic, null),
+            new ConsumerPosition(PollingModeDTO.TAILING, topic, List.of(), null, null),
             query,
-            MessageFilterTypeDTO.STRING_CONTAINS,
+            null,
             0,
-            SeekDirectionDTO.TAILING,
-            "String",
-            "String");
+            StringSerde.name(),
+            StringSerde.name());
   }
 
   private List<TopicMessageEventDTO> startTailing(String filterQuery) {

+ 90 - 24
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java

@@ -8,19 +8,24 @@ import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.springframework.beans.factory.annotation.Autowired;
 import reactor.core.publisher.Flux;
 import reactor.test.StepVerifier;
@@ -35,6 +40,8 @@ class MessagesServiceTest extends AbstractIntegrationTest {
 
   KafkaCluster cluster;
 
+  Set<String> createdTopics = new HashSet<>();
+
   @BeforeEach
   void init() {
     cluster = applicationContext
@@ -43,6 +50,11 @@ class MessagesServiceTest extends AbstractIntegrationTest {
         .get();
   }
 
+  @AfterEach
+  void deleteCreatedTopics() {
+    createdTopics.forEach(MessagesServiceTest::deleteTopic);
+  }
+
   @Test
   void deleteTopicMessagesReturnsExceptionWhenTopicNotFound() {
     StepVerifier.create(messagesService.deleteTopicMessages(cluster, NON_EXISTING_TOPIC, List.of()))
@@ -60,7 +72,9 @@ class MessagesServiceTest extends AbstractIntegrationTest {
   @Test
   void loadMessagesReturnsExceptionWhenTopicNotFound() {
     StepVerifier.create(messagesService
-            .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String"))
+            .loadMessages(cluster, NON_EXISTING_TOPIC,
+                new ConsumerPosition(PollingModeDTO.TAILING, NON_EXISTING_TOPIC, List.of(), null, null),
+                null, null, 1, "String", "String"))
         .expectError(TopicNotFoundException.class)
         .verify();
   }
@@ -68,32 +82,84 @@ class MessagesServiceTest extends AbstractIntegrationTest {
   @Test
   void maskingAppliedOnConfiguredClusters() throws Exception {
     String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
+    createTopicWithCleanup(new NewTopic(testTopic, 1, (short) 1));
+
     try (var producer = KafkaTestProducer.forKafka(kafka)) {
-      createTopic(new NewTopic(testTopic, 1, (short) 1));
       producer.send(testTopic, "message1");
       producer.send(testTopic, "message2").get();
+    }
+
+    Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
+            cluster,
+            testTopic,
+            new ConsumerPosition(PollingModeDTO.EARLIEST, testTopic, List.of(), null, null),
+            null,
+            null,
+            100,
+            StringSerde.name(),
+            StringSerde.name()
+        ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+        .map(TopicMessageEventDTO::getMessage);
+
+    // both messages should be masked
+    StepVerifier.create(msgsFlux)
+        .expectNextMatches(msg -> msg.getContent().equals("***"))
+        .expectNextMatches(msg -> msg.getContent().equals("***"))
+        .verifyComplete();
+  }
 
-      Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
-          cluster,
-          testTopic,
-          new ConsumerPosition(SeekTypeDTO.BEGINNING, testTopic, null),
-          null,
-          null,
-          100,
-          SeekDirectionDTO.FORWARD,
-          StringSerde.name(),
-          StringSerde.name()
-      ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
-          .map(TopicMessageEventDTO::getMessage);
-
-      // both messages should be masked
-      StepVerifier.create(msgsFlux)
-          .expectNextMatches(msg -> msg.getContent().equals("***"))
-          .expectNextMatches(msg -> msg.getContent().equals("***"))
-          .verifyComplete();
-    } finally {
-      deleteTopic(testTopic);
+  @ParameterizedTest
+  @CsvSource({"EARLIEST", "LATEST"})
+  void cursorIsRegisteredAfterPollingIsDoneAndCanBeUsedForNextPagePolling(PollingModeDTO mode) {
+    String testTopic = MessagesServiceTest.class.getSimpleName() + UUID.randomUUID();
+    createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1));
+
+    int msgsToGenerate = 100;
+    int pageSize = (msgsToGenerate / 2) + 1;
+
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      for (int i = 0; i < msgsToGenerate; i++) {
+        producer.send(testTopic, "message_" + i);
+      }
     }
+
+    var cursorIdCatcher = new AtomicReference<String>();
+    Flux<String> msgsFlux = messagesService.loadMessages(
+            cluster, testTopic,
+            new ConsumerPosition(mode, testTopic, List.of(), null, null),
+            null, null, pageSize, StringSerde.name(), StringSerde.name())
+        .doOnNext(evt -> {
+          if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) {
+            assertThat(evt.getCursor()).isNotNull();
+            cursorIdCatcher.set(evt.getCursor().getId());
+          }
+        })
+        .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+        .map(evt -> evt.getMessage().getContent());
+
+    StepVerifier.create(msgsFlux)
+        .expectNextCount(pageSize)
+        .verifyComplete();
+
+    assertThat(cursorIdCatcher.get()).isNotNull();
+
+    Flux<String> remainingMsgs = messagesService.loadMessages(cluster, testTopic, cursorIdCatcher.get())
+        .doOnNext(evt -> {
+          if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) {
+            assertThat(evt.getCursor()).isNull();
+          }
+        })
+        .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
+        .map(evt -> evt.getMessage().getContent());
+
+    StepVerifier.create(remainingMsgs)
+        .expectNextCount(msgsToGenerate - pageSize)
+        .verifyComplete();
+  }
+
+  private void createTopicWithCleanup(NewTopic newTopic) {
+    createTopic(newTopic);
+    createdTopics.add(newTopic.name());
   }
 
   @Test

+ 63 - 50
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -1,13 +1,16 @@
 package com.provectus.kafka.ui.service;
 
-import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.LATEST;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.OFFSET;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP;
+import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST;
+import static com.provectus.kafka.ui.model.PollingModeDTO.FROM_OFFSET;
+import static com.provectus.kafka.ui.model.PollingModeDTO.FROM_TIMESTAMP;
+import static com.provectus.kafka.ui.model.PollingModeDTO.LATEST;
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_OFFSET;
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.emitter.BackwardEmitter;
+import com.provectus.kafka.ui.emitter.Cursor;
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.PollingSettings;
@@ -43,6 +46,7 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.test.StepVerifier;
@@ -57,16 +61,18 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static final String EMPTY_TOPIC = TOPIC + "_empty";
   static final List<Record> SENT_RECORDS = new ArrayList<>();
   static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
+  static final Cursor.Tracking CURSOR_MOCK = Mockito.mock(Cursor.Tracking.class);
   static final Predicate<TopicMessageDTO> NOOP_FILTER = m -> true;
 
   @BeforeAll
   static void generateMsgs() throws Exception {
     createTopic(new NewTopic(TOPIC, PARTITIONS, (short) 1));
     createTopic(new NewTopic(EMPTY_TOPIC, PARTITIONS, (short) 1));
+    long startTs = System.currentTimeMillis();
     try (var producer = KafkaTestProducer.forKafka(kafka)) {
       for (int partition = 0; partition < PARTITIONS; partition++) {
         for (int i = 0; i < MSGS_PER_PARTITION; i++) {
-          long ts = System.currentTimeMillis() + i;
+          long ts = (startTs += 100);
           var value = "msg_" + partition + "_" + i;
           var metadata = producer.send(
               new ProducerRecord<>(
@@ -115,20 +121,22 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   void pollNothingOnEmptyTopic() {
     var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
+        new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         100,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
+        new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         100,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     StepVerifier.create(Flux.create(forwardEmitter))
@@ -148,20 +156,22 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   void pollFullTopicFromBeginning() {
     var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(BEGINNING, TOPIC, null),
+        new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(LATEST, TOPIC, null),
+        new ConsumerPosition(LATEST, TOPIC, List.of(), null, null),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
@@ -180,20 +190,24 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
     var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
+        new ConsumerPosition(FROM_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
+            new ConsumerPosition.Offsets(null, targetOffsets)),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
+            new ConsumerPosition.Offsets(null, targetOffsets)),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -213,50 +227,45 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
   @Test
   void pollWithTimestamps() {
-    Map<TopicPartition, Long> targetTimestamps = new HashMap<>();
-    final Map<TopicPartition, List<Record>> perPartition =
-        SENT_RECORDS.stream().collect(Collectors.groupingBy((r) -> r.tp));
-    for (int i = 0; i < PARTITIONS; i++) {
-      final List<Record> records = perPartition.get(new TopicPartition(TOPIC, i));
-      int randRecordIdx = ThreadLocalRandom.current().nextInt(records.size());
-      log.info("partition: {} position: {}", i, randRecordIdx);
-      targetTimestamps.put(
-          new TopicPartition(TOPIC, i),
-          records.get(randRecordIdx).getTimestamp()
-      );
-    }
+    var tsStats = SENT_RECORDS.stream().mapToLong(Record::getTimestamp).summaryStatistics();
+    //choosing ts in the middle
+    long targetTimestamp = tsStats.getMin() + ((tsStats.getMax() - tsStats.getMin()) / 2);
 
     var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
+        new ConsumerPosition(FROM_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
+    );
+
+    expectEmitter(
+        forwardEmitter,
+        SENT_RECORDS.stream()
+            .filter(r -> r.getTimestamp() >= targetTimestamp)
+            .map(Record::getValue)
+            .collect(Collectors.toList())
     );
 
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
+        new ConsumerPosition(TO_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
-    var expectedValues = SENT_RECORDS.stream()
-        .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getTp()))
-        .map(Record::getValue)
-        .collect(Collectors.toList());
-
-    expectEmitter(forwardEmitter, expectedValues);
-
-    expectedValues = SENT_RECORDS.stream()
-        .filter(r -> r.getTimestamp() < targetTimestamps.get(r.getTp()))
-        .map(Record::getValue)
-        .collect(Collectors.toList());
-
-    expectEmitter(backwardEmitter, expectedValues);
+    expectEmitter(
+        backwardEmitter,
+        SENT_RECORDS.stream()
+            .filter(r -> r.getTimestamp() < targetTimestamp)
+            .map(Record::getValue)
+            .collect(Collectors.toList())
+    );
   }
 
   @Test
@@ -269,11 +278,13 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
+            new ConsumerPosition.Offsets(null, targetOffsets)),
         numMessages,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -296,11 +307,13 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, offsets),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null,
+            new ConsumerPosition.Offsets(null, offsets)),
         100,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     expectEmitter(backwardEmitter,

+ 3 - 8
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java

@@ -7,8 +7,7 @@ import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
@@ -20,6 +19,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.time.Duration;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
@@ -500,15 +500,10 @@ public class SendAndReadTests extends AbstractIntegrationTest {
         TopicMessageDTO polled = messagesService.loadMessages(
                 targetCluster,
                 topic,
-                new ConsumerPosition(
-                    SeekTypeDTO.BEGINNING,
-                    topic,
-                    Map.of(new TopicPartition(topic, 0), 0L)
-                ),
+                new ConsumerPosition(PollingModeDTO.EARLIEST, topic, List.of(), null, null),
                 null,
                 null,
                 1,
-                SeekDirectionDTO.FORWARD,
                 msgToSend.getKeySerde().get(),
                 msgToSend.getValueSerde().get()
             ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))

+ 143 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -763,6 +763,119 @@ paths:
         404:
           description: Not found
 
+  /api/clusters/{clusterName}/topics/{topicName}/smartfilters:
+    post:
+      tags:
+        - Messages
+      summary: registerFilter
+      operationId: registerFilter
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/MessageFilterRegistration'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/MessageFilterId'
+
+
+  /api/clusters/{clusterName}/topics/{topicName}/messages/v2:
+    get:
+      tags:
+        - Messages
+      summary: getTopicMessagesV2
+      operationId: getTopicMessagesV2
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: mode
+          in: query
+          description: Messages polling mode
+          required: true
+          schema:
+            $ref: "#/components/schemas/PollingMode"
+        - name: partitions
+          in: query
+          schema:
+            type: array
+            description: List of target partitions (all partitions if not provided)
+            items:
+              type: integer
+        - name: limit
+          in: query
+          description: Max number of messages can be returned
+          schema:
+            type: integer
+        - name: stringFilter
+          in: query
+          description: query string to contains string filtration
+          schema:
+            type: string
+        - name: smartFilterId
+          in: query
+          description: filter id, that was registered beforehand
+          schema:
+            type: string
+        - name: offset
+          in: query
+          description: message offset to read from / to
+          schema:
+            type: integer
+            format: int64
+        - name: timestamp
+          in: query
+          description: timestamp (in ms) to read from / to
+          schema:
+            type: integer
+            format: int64
+        - name: keySerde
+          in: query
+          description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
+          schema:
+            type: string
+        - name: valueSerde
+          in: query
+          description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
+          schema:
+            type: string
+        - name: cursor
+          in: query
+          description: "id of the cursor for pagination"
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            text/event-stream:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/TopicMessageEvent'
+
   /api/clusters/{clusterName}/topics/{topicName}/consumer-groups:
     get:
       tags:
@@ -2731,13 +2844,14 @@ components:
             - MESSAGE
             - CONSUMING
             - DONE
-            - EMIT_THROTTLING
         message:
           $ref: "#/components/schemas/TopicMessage"
         phase:
           $ref: "#/components/schemas/TopicMessagePhase"
         consuming:
           $ref: "#/components/schemas/TopicMessageConsuming"
+        cursor:
+          $ref: "#/components/schemas/TopicMessageNextPageCursor"
 
     TopicMessagePhase:
       type: object
@@ -2767,6 +2881,11 @@ components:
         filterApplyErrors:
           type: integer
 
+    TopicMessageNextPageCursor:
+      type: object
+      properties:
+        id:
+          type: string
 
     TopicMessage:
       type: object
@@ -2839,6 +2958,29 @@ components:
         - TIMESTAMP
         - LATEST
 
+    MessageFilterRegistration:
+      type: object
+      properties:
+        filterCode:
+          type: string
+
+    MessageFilterId:
+      type: object
+      properties:
+        id:
+          type: string
+
+    PollingMode:
+      type: string
+      enum:
+        - FROM_OFFSET
+        - TO_OFFSET
+        - FROM_TIMESTAMP
+        - TO_TIMESTAMP
+        - LATEST
+        - EARLIEST
+        - TAILING
+
     MessageFilterType:
       type: string
       enum: