소스 검색

api updates

iliax 2 년 전
부모
커밋
ebae773043
17개의 변경된 파일465개의 추가작업 그리고 354개의 파일을 삭제
  1. 40 38
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
  2. 17 31
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
  3. 16 8
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  4. 31 17
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
  5. 69 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java
  6. 18 12
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  7. 35 65
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java
  8. 10 18
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java
  9. 6 31
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java
  10. 7 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/ConsumerRecordDeserializer.java
  11. 131 85
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
  12. 25 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java
  13. 1 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java
  14. 2 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java
  15. 38 20
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
  16. 1 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java
  17. 18 14
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+ 40 - 38
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -25,8 +25,8 @@ import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
 import java.util.Optional;
-import javax.annotation.Nullable;
 import javax.validation.Valid;
+import javax.validation.ValidationException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.ResponseEntity;
@@ -41,9 +41,6 @@ import reactor.core.scheduler.Schedulers;
 @Slf4j
 public class MessagesController extends AbstractController implements MessagesApi {
 
-  private static final int MAX_LOAD_RECORD_LIMIT = 100;
-  private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
-
   private final MessagesService messagesService;
   private final DeserializationService deserializationService;
   private final AccessControlService accessControlService;
@@ -81,7 +78,45 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                            String keySerde,
                                                                            String valueSerde,
                                                                            ServerWebExchange exchange) {
-    throw new IllegalStateException();
+    throw new ValidationException("Not supported");
+  }
+
+
+  @Override
+  public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
+                                                                             PollingModeDTO mode,
+                                                                             List<Integer> partitions,
+                                                                             Integer limit,
+                                                                             String stringFilter,
+                                                                             String smartFilterId,
+                                                                             Long offset,
+                                                                             Long timestamp,
+                                                                             String keySerde,
+                                                                             String valueSerde,
+                                                                             String cursor,
+                                                                             ServerWebExchange exchange) {
+    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+        .cluster(clusterName)
+        .topic(topicName)
+        .topicActions(MESSAGES_READ)
+        .build());
+
+    Flux<TopicMessageEventDTO> messagesFlux;
+    if (cursor != null) {
+      messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
+    } else {
+      messagesFlux = messagesService.loadMessages(
+          getCluster(clusterName),
+          topicName,
+          ConsumerPosition.create(mode, topicName, partitions, timestamp, offset),
+          stringFilter,
+          smartFilterId,
+          limit,
+          keySerde,
+          valueSerde
+      );
+    }
+    return validateAccess.then(Mono.just(ResponseEntity.ok(messagesFlux)));
   }
 
   @Override
@@ -129,39 +164,6 @@ public class MessagesController extends AbstractController implements MessagesAp
     );
   }
 
-
-  @Override
-  public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
-                                                                             PollingModeDTO mode,
-                                                                             @Nullable List<Integer> partitions,
-                                                                             @Nullable Integer limit,
-                                                                             @Nullable String query,
-                                                                             @Nullable String filterId,
-                                                                             @Nullable String offsetString,
-                                                                             @Nullable Long ts,
-                                                                             @Nullable String keySerde,
-                                                                             @Nullable String valueSerde,
-                                                                             ServerWebExchange exchange) {
-    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
-        .cluster(clusterName)
-        .topic(topicName)
-        .topicActions(MESSAGES_READ)
-        .build());
-
-    ConsumerPosition consumerPosition = ConsumerPosition.create(mode, topicName, partitions, ts, offsetString);
-
-    int recordsLimit =
-        Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
-
-    return validateAccess.then(
-        Mono.just(
-            ResponseEntity.ok(
-                messagesService.loadMessagesV2(
-                    getCluster(clusterName), topicName, consumerPosition,
-                    query, filterId, recordsLimit, keySerde, valueSerde))));
-  }
-
-
   @Override
   public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
                                                                  String topicName,

+ 17 - 31
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,11 +1,9 @@
 package com.provectus.kafka.ui.emitter;
 
-import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.time.Duration;
 import java.time.Instant;
+import javax.annotation.Nullable;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -14,13 +12,12 @@ import reactor.core.publisher.FluxSink;
 
 public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final ConsumerRecordDeserializer recordDeserializer;
-  private final ConsumingStats consumingStats = new ConsumingStats();
+  private final MessagesProcessing messagesProcessing;
   private final PollingThrottler throttler;
   protected final PollingSettings pollingSettings;
 
-  protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingSettings pollingSettings) {
-    this.recordDeserializer = recordDeserializer;
+  protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
+    this.messagesProcessing = messagesProcessing;
     this.pollingSettings = pollingSettings;
     this.throttler = pollingSettings.getPollingThrottler();
   }
@@ -40,39 +37,28 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
     return records;
   }
 
+  protected boolean isSendLimitReached() {
+    return messagesProcessing.limitReached();
+  }
+
   protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
-                                                       ConsumerRecord<Bytes, Bytes> msg) {
-    final TopicMessageDTO topicMessage = recordDeserializer.deserialize(msg);
-    sink.next(
-        new TopicMessageEventDTO()
-            .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
-            .message(topicMessage)
-    );
+                             ConsumerRecord<Bytes, Bytes> msg) {
+    messagesProcessing.sendMsg(sink, msg);
   }
 
   protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
-    sink.next(
-        new TopicMessageEventDTO()
-            .type(TopicMessageEventDTO.TypeEnum.PHASE)
-            .phase(new TopicMessagePhaseDTO().name(name))
-    );
+    messagesProcessing.sendPhase(sink, name);
   }
 
   protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
-                               ConsumerRecords<Bytes, Bytes> records,
-                               long elapsed) {
-    return consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
+                              ConsumerRecords<Bytes, Bytes> records,
+                              long elapsed) {
+    return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
   }
 
-  protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
-    consumingStats.sendFinishEvent(sink, getFilterApplyErrors(sink));
+  // cursor is null if target partitions were fully polled (no, need to do paging)
+  protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
+    messagesProcessing.sendFinishEvents(sink, cursor);
     sink.complete();
   }
-
-  protected Number getFilterApplyErrors(FluxSink<?> sink) {
-    return sink.contextView()
-        .<MessageFilterStats>getOrEmpty(MessageFilterStats.class)
-        .<Number>map(MessageFilterStats::getFilterApplyErrors)
-        .orElse(0);
-  }
 }

+ 16 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -24,17 +23,20 @@ public class BackwardRecordEmitter extends AbstractEmitter {
   private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
   private final ConsumerPosition consumerPosition;
   private final int messagesPerPage;
+  private final Cursor.Tracking cursor;
 
   public BackwardRecordEmitter(
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
-      ConsumerRecordDeserializer recordDeserializer,
-      PollingSettings pollingSettings) {
-    super(recordDeserializer, pollingSettings);
+      MessagesProcessing messagesProcessing,
+      PollingSettings pollingSettings,
+      Cursor.Tracking cursor) {
+    super(messagesProcessing, pollingSettings);
     this.consumerPosition = consumerPosition;
     this.messagesPerPage = messagesPerPage;
     this.consumerSupplier = consumerSupplier;
+    this.cursor = cursor;
   }
 
   @Override
@@ -46,11 +48,12 @@ public class BackwardRecordEmitter extends AbstractEmitter {
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
       var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
       readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
+      cursor.trackOffsets(readUntilOffsets);
 
       int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
       log.debug("'Until' offsets for polling: {}", readUntilOffsets);
 
-      while (!sink.isCancelled() && !readUntilOffsets.isEmpty()) {
+      while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !isSendLimitReached()) {
         new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
           if (sink.isCancelled()) {
             return; //fast return in case of sink cancellation
@@ -59,8 +62,6 @@ public class BackwardRecordEmitter extends AbstractEmitter {
           long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
 
           partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
-              .stream()
-              .filter(r -> !sink.isCancelled())
               .forEach(r -> sendMessage(sink, r));
 
           if (beginOffset == readFromOffset) {
@@ -77,7 +78,12 @@ public class BackwardRecordEmitter extends AbstractEmitter {
           log.debug("sink is cancelled after partitions poll iteration");
         }
       }
-      sendFinishStatsAndCompleteSink(sink);
+      sendFinishStatsAndCompleteSink(
+          sink,
+          readUntilOffsets.isEmpty()
+              ? null
+              : cursor
+      );
       log.debug("Polling finished");
     } catch (InterruptException kafkaInterruptException) {
       log.debug("Polling finished due to thread interruption");
@@ -97,6 +103,7 @@ public class BackwardRecordEmitter extends AbstractEmitter {
   ) {
     consumer.assign(Collections.singleton(tp));
     consumer.seek(tp, fromOffset);
+    cursor.trackOffset(tp, fromOffset);
     sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
     int desiredMsgsToPoll = (int) (toOffset - fromOffset);
 
@@ -104,6 +111,7 @@ public class BackwardRecordEmitter extends AbstractEmitter {
 
     EmptyPollsCounter emptyPolls  = pollingSettings.createEmptyPollsCounter();
     while (!sink.isCancelled()
+        && !isSendLimitReached()
         && recordsToSend.size() < desiredMsgsToPoll
         && !emptyPolls.noDataEmptyPollsReached()) {
       var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());

+ 31 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -2,6 +2,11 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO;
+import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
+import javax.annotation.Nullable;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 class ConsumingStats {
@@ -9,37 +14,46 @@ class ConsumingStats {
   private long bytes = 0;
   private int records = 0;
   private long elapsed = 0;
-  private int filterApplyErrors = 0;
 
-  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
-    bytes += polledRecords.bytes();
-    records += polledRecords.count();
-    elapsed += polledRecords.elapsed().toMillis();
+  /**
+   * returns bytes polled.
+   */
+  int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
+                       ConsumerRecords<Bytes, Bytes> polledRecords,
+                       long elapsed,
+                       int filterApplyErrors) {
+    int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
+    bytes += polledBytes;
+    this.records += polledRecords.count();
+    this.elapsed += elapsed;
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
-            .consuming(createConsumingStats())
+            .consuming(createConsumingStats(sink, filterApplyErrors))
     );
+    return polledBytes;
   }
 
-  void incFilterApplyError() {
-    filterApplyErrors++;
-  }
-
-  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
+  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors, @Nullable Cursor.Tracking cursor) {
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.DONE)
-            .consuming(createConsumingStats())
+            .cursor(
+                cursor != null
+                    ? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
+                    : null
+            )
+            .consuming(createConsumingStats(sink, filterApplyErrors))
     );
   }
 
-  private TopicMessageConsumingDTO createConsumingStats() {
+  private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
+                                                        int filterApplyErrors) {
     return new TopicMessageConsumingDTO()
-        .bytesConsumed(bytes)
-        .elapsedMs(elapsed)
-        .isCancelled(false)
+        .bytesConsumed(this.bytes)
+        .elapsedMs(this.elapsed)
+        .isCancelled(sink.isCancelled())
         .filterApplyErrors(filterApplyErrors)
-        .messagesConsumed(records);
+        .messagesConsumed(this.records);
   }
 }

+ 69 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java

@@ -0,0 +1,69 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.PollingModeDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.kafka.common.TopicPartition;
+
+public record Cursor(ConsumerRecordDeserializer deserializer,
+                     ConsumerPosition consumerPosition,
+                     Predicate<TopicMessageDTO> filter,
+                     int limit) {
+
+  public static class Tracking {
+    private final ConsumerRecordDeserializer deserializer;
+    private final ConsumerPosition originalPosition;
+    private final Predicate<TopicMessageDTO> filter;
+    private final int limit;
+    private final Function<Cursor, String> cursorRegistry;
+
+    private final Map<TopicPartition, Long> trackingOffsets = new HashMap<>();
+
+    public Tracking(ConsumerRecordDeserializer deserializer,
+                    ConsumerPosition originalPosition,
+                    Predicate<TopicMessageDTO> filter,
+                    int limit,
+                    Function<Cursor, String> cursorRegistry) {
+      this.deserializer = deserializer;
+      this.originalPosition = originalPosition;
+      this.filter = filter;
+      this.limit = limit;
+      this.cursorRegistry = cursorRegistry;
+    }
+
+    void trackOffset(TopicPartition tp, long offset) {
+      trackingOffsets.put(tp, offset);
+    }
+
+    void trackOffsets(Map<TopicPartition, Long> offsets) {
+      this.trackingOffsets.putAll(offsets);
+    }
+
+    String registerCursor() {
+      return cursorRegistry.apply(
+          new Cursor(
+              deserializer,
+              new ConsumerPosition(
+                  switch (originalPosition.pollingMode()) {
+                    case TO_OFFSET, TO_TIMESTAMP, LATEST -> PollingModeDTO.TO_OFFSET;
+                    case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> PollingModeDTO.FROM_OFFSET;
+                    case TAILING -> throw new IllegalStateException();
+                  },
+                  originalPosition.topic(),
+                  originalPosition.partitions(),
+                  null,
+                  new ConsumerPosition.Offsets(null, trackingOffsets)
+              ),
+              filter,
+              limit
+          )
+      );
+    }
+  }
+
+}

+ 18 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -2,9 +2,9 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -13,20 +13,22 @@ import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public class ForwardRecordEmitter
-    extends AbstractEmitter {
+public class ForwardRecordEmitter extends AbstractEmitter {
 
   private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
   private final ConsumerPosition position;
+  private final Cursor.Tracking cursor;
 
   public ForwardRecordEmitter(
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       ConsumerPosition position,
-      ConsumerRecordDeserializer recordDeserializer,
-      PollingSettings pollingSettings) {
-    super(recordDeserializer, pollingSettings);
+      MessagesProcessing messagesProcessing,
+      PollingSettings pollingSettings,
+      Cursor.Tracking cursor) {
+    super(messagesProcessing, pollingSettings);
     this.position = position;
     this.consumerSupplier = consumerSupplier;
+    this.cursor = cursor;
   }
 
   @Override
@@ -36,27 +38,26 @@ public class ForwardRecordEmitter
       sendPhase(sink, "Assigning partitions");
       var seekOperations = SeekOperations.create(consumer, position);
       seekOperations.assignAndSeek();
+      cursor.trackOffsets(seekOperations.getOffsetsForSeek());
 
       EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
       while (!sink.isCancelled()
+          && !isSendLimitReached()
           && !seekOperations.assignedPartitionsFullyPolled()
           && !emptyPolls.noDataEmptyPollsReached()) {
 
         sendPhase(sink, "Polling");
         ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
         emptyPolls.count(records);
+        trackOffsetsAfterPoll(consumer);
 
         log.debug("{} records polled", records.count());
 
         for (ConsumerRecord<Bytes, Bytes> msg : records) {
-          if (!sink.isCancelled()) {
-            sendMessage(sink, msg);
-          } else {
-            break;
-          }
+          sendMessage(sink, msg);
         }
       }
-      sendFinishStatsAndCompleteSink(sink);
+      sendFinishStatsAndCompleteSink(sink, seekOperations.assignedPartitionsFullyPolled() ? null : cursor);
       log.debug("Polling finished");
     } catch (InterruptException kafkaInterruptException) {
       log.debug("Polling finished due to thread interruption");
@@ -66,4 +67,9 @@ public class ForwardRecordEmitter
       sink.error(e);
     }
   }
+
+  private void trackOffsetsAfterPoll(Consumer<Bytes, Bytes> consumer) {
+    consumer.assignment().forEach(tp -> cursor.trackOffset(tp, consumer.position(tp)));
+  }
+
 }

+ 35 - 65
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -1,75 +1,71 @@
 package com.provectus.kafka.ui.emitter;
 
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.toList;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Streams;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 import java.util.function.Predicate;
 import javax.annotation.Nullable;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-@RequiredArgsConstructor
-class MessagesProcessing {
+public class MessagesProcessing {
 
   private final ConsumingStats consumingStats = new ConsumingStats();
   private long sentMessages = 0;
+  private int filterApplyErrors = 0;
 
   private final ConsumerRecordDeserializer deserializer;
   private final Predicate<TopicMessageDTO> filter;
-  private final boolean ascendingSortBeforeSend;
   private final @Nullable Integer limit;
 
+  public MessagesProcessing(ConsumerRecordDeserializer deserializer,
+                            Predicate<TopicMessageDTO> filter,
+                            @Nullable Integer limit) {
+    this.deserializer = deserializer;
+    this.filter = filter;
+    this.limit = limit;
+  }
+
   boolean limitReached() {
     return limit != null && sentMessages >= limit;
   }
 
-  void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
-    sortForSending(polled, ascendingSortBeforeSend)
-        .forEach(rec -> {
-          if (!limitReached() && !sink.isCancelled()) {
-            TopicMessageDTO topicMessage = deserializer.deserialize(rec);
-            try {
-              if (filter.test(topicMessage)) {
-                sink.next(
-                    new TopicMessageEventDTO()
-                        .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
-                        .message(topicMessage)
-                );
-                sentMessages++;
-              }
-            } catch (Exception e) {
-              consumingStats.incFilterApplyError();
-              log.trace("Error applying filter for message {}", topicMessage);
-            }
-          }
-        });
+  void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
+    if (!sink.isCancelled() && !limitReached()) {
+      TopicMessageDTO topicMessage = deserializer.deserialize(rec);
+      try {
+        if (filter.test(topicMessage)) {
+          sink.next(
+              new TopicMessageEventDTO()
+                  .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
+                  .message(topicMessage)
+          );
+          sentMessages++;
+        }
+      } catch (Exception e) {
+        filterApplyErrors++;
+        log.trace("Error applying filter for message {}", topicMessage);
+      }
+    }
   }
 
-  void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
+  int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
+                        ConsumerRecords<Bytes, Bytes> polledRecords,
+                        long elapsed) {
     if (!sink.isCancelled()) {
-      consumingStats.sendConsumingEvt(sink, polledRecords);
+      return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
     }
+    return 0;
   }
 
-  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
+  void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
     if (!sink.isCancelled()) {
-      consumingStats.sendFinishEvent(sink);
+      consumingStats.sendFinishEvent(sink, filterApplyErrors, cursor);
     }
   }
 
@@ -83,30 +79,4 @@ class MessagesProcessing {
     }
   }
 
-  /*
-   * Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
-   */
-  @VisibleForTesting
-  static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
-                                                               boolean asc) {
-    Comparator<ConsumerRecord> offsetComparator = asc
-        ? Comparator.comparingLong(ConsumerRecord::offset)
-        : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
-
-    // partition -> sorted by offsets records
-    Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
-        .collect(
-            groupingBy(
-                ConsumerRecord::partition,
-                TreeMap::new,
-                collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
-
-    Comparator<ConsumerRecord> tsComparator = asc
-        ? Comparator.comparing(ConsumerRecord::timestamp)
-        : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
-
-    // merge-sorting records from partitions one by one using timestamp comparator
-    return Iterables.mergeSorted(perPartition.values(), tsComparator);
-  }
-
 }

+ 10 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java

@@ -1,9 +1,9 @@
 package com.provectus.kafka.ui.emitter;
 
 import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
+import static java.util.Objects.requireNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.PollingModeDTO;
 import java.util.HashMap;
@@ -53,22 +53,14 @@ class SeekOperations {
   static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
                                                      OffsetsInfo offsetsInfo,
                                                      ConsumerPosition position) {
-    switch (position.pollingMode()) {
-      case TAILING:
-        return consumer.endOffsets(offsetsInfo.allTargetPartitions());
-      case LATEST:
-        return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
-      case EARLIEST:
-        return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
-      case FROM_OFFSET, TO_OFFSET:
-        Preconditions.checkNotNull(position.offsets());
-        return fixOffsets(offsetsInfo, position.offsets());
-      case FROM_TIMESTAMP, TO_TIMESTAMP:
-        Preconditions.checkNotNull(position.timestamp());
-        return offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, position.timestamp());
-      default:
-        throw new IllegalStateException();
-    }
+    return switch (position.pollingMode()) {
+      case TAILING -> consumer.endOffsets(offsetsInfo.allTargetPartitions());
+      case LATEST -> consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
+      case EARLIEST -> consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
+      case FROM_OFFSET, TO_OFFSET -> fixOffsets(offsetsInfo, requireNonNull(position.offsets()));
+      case FROM_TIMESTAMP, TO_TIMESTAMP ->
+          offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, requireNonNull(position.timestamp()));
+    };
   }
 
   private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo,
@@ -77,7 +69,7 @@ class SeekOperations {
     if (positionOffset.offset() != null) {
       offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
     } else {
-      Preconditions.checkNotNull(positionOffset.tpOffsets());
+      requireNonNull(positionOffset.tpOffsets());
       offsets.putAll(positionOffset.tpOffsets());
       offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
     }

+ 6 - 31
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java

@@ -1,18 +1,12 @@
 package com.provectus.kafka.ui.model;
 
-import static java.util.stream.Collectors.toMap;
-
 import com.provectus.kafka.ui.exception.ValidationException;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import javax.annotation.Nullable;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.common.TopicPartition;
-import org.springframework.util.StringUtils;
-
 
 public record ConsumerPosition(PollingModeDTO pollingMode,
                                String topic,
@@ -29,8 +23,8 @@ public record ConsumerPosition(PollingModeDTO pollingMode,
                                         String topic,
                                         @Nullable List<Integer> partitions,
                                         @Nullable Long timestamp,
-                                        @Nullable String offsetsStr) {
-    @Nullable var offsets = parseAndValidateOffsets(pollingMode, topic, offsetsStr);
+                                        @Nullable Long offset) {
+    @Nullable var offsets = parseAndValidateOffsets(pollingMode, offset);
 
     var topicPartitions = Optional.ofNullable(partitions).orElse(List.of())
         .stream()
@@ -61,33 +55,14 @@ public record ConsumerPosition(PollingModeDTO pollingMode,
   }
 
   private static Offsets parseAndValidateOffsets(PollingModeDTO pollingMode,
-                                                 String topic,
-                                                 @Nullable String offsetsStr) {
-    Offsets offsets = null;
+                                                 @Nullable Long offset) {
     if (pollingMode == PollingModeDTO.FROM_OFFSET || pollingMode == PollingModeDTO.TO_OFFSET) {
-      if (!StringUtils.hasText(offsetsStr)) {
+      if (offset == null) {
         throw new ValidationException("offsets not provided for " + pollingMode);
       }
-      if (!offsetsStr.contains(":")) {
-        offsets = new Offsets(Long.parseLong(offsetsStr), null);
-      } else {
-        Map<TopicPartition, Long> tpOffsets = Stream.of(offsetsStr.split(","))
-            .map(p -> {
-              String[] split = p.split(":");
-              if (split.length != 2) {
-                throw new IllegalArgumentException(
-                    "Wrong seekTo argument format. See API docs for details");
-              }
-              return Pair.of(
-                  new TopicPartition(topic, Integer.parseInt(split[0])),
-                  Long.parseLong(split[1])
-              );
-            })
-            .collect(toMap(Pair::getKey, Pair::getValue));
-        offsets = new Offsets(null, tpOffsets);
-      }
+      return new Offsets(offset, null);
     }
-    return offsets;
+    return null;
   }
 
 }

+ 7 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/ConsumerRecordDeserializer.java

@@ -1,7 +1,6 @@
 package com.provectus.kafka.ui.serdes;
 
 import com.provectus.kafka.ui.model.TopicMessageDTO;
-import com.provectus.kafka.ui.model.TopicMessageDTO.TimestampTypeEnum;
 import com.provectus.kafka.ui.serde.api.Serde;
 import java.time.Instant;
 import java.time.OffsetDateTime;
@@ -9,7 +8,6 @@ import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.function.UnaryOperator;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -34,8 +32,6 @@ public class ConsumerRecordDeserializer {
   private final Serde.Deserializer fallbackKeyDeserializer;
   private final Serde.Deserializer fallbackValueDeserializer;
 
-  private final UnaryOperator<TopicMessageDTO> masker;
-
   public TopicMessageDTO deserialize(ConsumerRecord<Bytes, Bytes> rec) {
     var message = new TopicMessageDTO();
     fillKey(message, rec);
@@ -51,14 +47,14 @@ public class ConsumerRecordDeserializer {
     message.setValueSize(getValueSize(rec));
     message.setHeadersSize(getHeadersSize(rec));
 
-    return masker.apply(message);
+    return message;
   }
 
-  private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
+  private static TopicMessageDTO.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
     return switch (timestampType) {
-      case CREATE_TIME -> TimestampTypeEnum.CREATE_TIME;
-      case LOG_APPEND_TIME -> TimestampTypeEnum.LOG_APPEND_TIME;
-      case NO_TIMESTAMP_TYPE -> TimestampTypeEnum.NO_TIMESTAMP_TYPE;
+      case CREATE_TIME -> TopicMessageDTO.TimestampTypeEnum.CREATE_TIME;
+      case LOG_APPEND_TIME -> TopicMessageDTO.TimestampTypeEnum.LOG_APPEND_TIME;
+      case NO_TIMESTAMP_TYPE -> TopicMessageDTO.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
     };
   }
 
@@ -122,11 +118,11 @@ public class ConsumerRecordDeserializer {
   }
 
   private static Long getKeySize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
-    return consumerRecord.key() != null ? (long) consumerRecord.serializedKeySize() : null;
+    return consumerRecord.key() != null ? (long) consumerRecord.key().get().length : null;
   }
 
   private static Long getValueSize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
-    return consumerRecord.value() != null ? (long) consumerRecord.serializedValueSize() : null;
+    return consumerRecord.value() != null ? (long) consumerRecord.value().get().length : null;
   }
 
   private static int headerSize(Header header) {

+ 131 - 85
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -1,11 +1,16 @@
 package com.provectus.kafka.ui.service;
 
+import com.google.common.base.Charsets;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.RateLimiter;
+import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.Cursor;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
-import com.provectus.kafka.ui.emitter.MessageFilterStats;
 import com.provectus.kafka.ui.emitter.MessageFilters;
-import com.provectus.kafka.ui.emitter.ResultSizeLimiter;
+import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
@@ -21,16 +26,15 @@ import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -45,18 +49,43 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 @Service
-@RequiredArgsConstructor
 @Slf4j
 public class MessagesService {
 
+  private static final long SALT_FOR_HASHING = ThreadLocalRandom.current().nextLong();
+
+  private static final int DEFAULT_MAX_PAGE_SIZE = 500;
+  private static final int DEFAULT_PAGE_SIZE = 100;
   // limiting UI messages rate to 20/sec in tailing mode
-  public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
+  private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
 
   private final AdminClientService adminClientService;
   private final DeserializationService deserializationService;
   private final ConsumerGroupService consumerGroupService;
+  private final int maxPageSize;
+  private final int defaultPageSize;
+
+  private final Cache<String, Predicate<TopicMessageDTO>> registeredFilters = CacheBuilder.newBuilder()
+      .maximumSize(5_000)
+      .build();
+
+  private final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage();
 
-  private final Map<String, Predicate<TopicMessageDTO>> registeredFilters = new ConcurrentHashMap<>();
+  public MessagesService(AdminClientService adminClientService,
+                         DeserializationService deserializationService,
+                         ConsumerGroupService consumerGroupService,
+                         ClustersProperties properties) {
+    this.adminClientService = adminClientService;
+    this.deserializationService = deserializationService;
+    this.consumerGroupService = consumerGroupService;
+
+    var pollingProps = Optional.ofNullable(properties.getPolling())
+        .orElseGet(ClustersProperties.PollingProperties::new);
+    this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize())
+        .orElse(DEFAULT_MAX_PAGE_SIZE);
+    this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize())
+        .orElse(DEFAULT_PAGE_SIZE);
+  }
 
   private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
     return adminClientService.get(cluster)
@@ -138,118 +167,135 @@ public class MessagesService {
     }
   }
 
-  public Flux<TopicMessageEventDTO> loadMessagesV2(KafkaCluster cluster,
-                                                   String topic,
-                                                   ConsumerPosition position,
-                                                   @Nullable String query,
-                                                   @Nullable String filterId,
-                                                   int limit,
-                                                   @Nullable String keySerde,
-                                                   @Nullable String valueSerde) {
+  private int fixPageSize(@Nullable Integer pageSize) {
+    return Optional.ofNullable(pageSize)
+        .filter(ps -> ps > 0 && ps <= maxPageSize)
+        .orElse(defaultPageSize);
+  }
+
+  private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
+    var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
+    var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
+    return evt -> {
+      if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
+        return evt;
+      }
+      return evt.message(
+          evt.getMessage()
+              .key(keyMasker.apply(evt.getMessage().getKey()))
+              .content(valMasker.apply(evt.getMessage().getContent())));
+    };
+  }
+
+  public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
+                                                  String topic,
+                                                  ConsumerPosition consumerPosition,
+                                                  @Nullable String containsStringFilter,
+                                                  @Nullable String filterId,
+                                                  @Nullable Integer limit,
+                                                  @Nullable String keySerde,
+                                                  @Nullable String valueSerde) {
+    return loadMessages(
+        cluster,
+        topic,
+        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
+        consumerPosition,
+        getMsgFilter(containsStringFilter, filterId),
+        fixPageSize(limit)
+    );
+  }
+
+  public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic, String cursorId) {
+    Cursor cursor = cursorsStorage.getCursor(cursorId)
+        .orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache."));
+    return loadMessages(
+        cluster,
+        topic,
+        cursor.deserializer(),
+        cursor.consumerPosition(),
+        cursor.filter(),
+        cursor.limit()
+    );
+  }
+
+  private Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
+                                                  String topic,
+                                                  ConsumerRecordDeserializer deserializer,
+                                                  ConsumerPosition consumerPosition,
+                                                  Predicate<TopicMessageDTO> filter,
+                                                  int limit) {
     return withExistingTopic(cluster, topic)
         .flux()
         .publishOn(Schedulers.boundedElastic())
-        .flatMap(td -> loadMessagesImplV2(cluster, topic, position, query, filterId, limit, keySerde, valueSerde));
+        .flatMap(td -> loadMessagesImpl(cluster, topic, deserializer, consumerPosition, filter, fixPageSize(limit)));
   }
 
-  private Flux<TopicMessageEventDTO> loadMessagesImplV2(KafkaCluster cluster,
-                                                        String topic,
-                                                        ConsumerPosition consumerPosition,
-                                                        @Nullable String query,
-                                                        @Nullable String filterId,
-                                                        int limit,
-                                                        @Nullable String keySerde,
-                                                        @Nullable String valueSerde) {
-
-    ConsumerRecordDeserializer recordDeserializer =
-        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
+  private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
+                                                      String topic,
+                                                      ConsumerRecordDeserializer deserializer,
+                                                      ConsumerPosition consumerPosition,
+                                                      Predicate<TopicMessageDTO> filter,
+                                                      int limit) {
+    var processing = new MessagesProcessing(
+        deserializer,
+        filter,
+        consumerPosition.pollingMode() == PollingModeDTO.TAILING ? null : limit
+    );
 
     var emitter = switch (consumerPosition.pollingMode()) {
       case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           limit,
-          recordDeserializer,
-          cluster.getPollingSettings()
+          processing,
+          cluster.getPollingSettings(),
+          new Cursor.Tracking(deserializer, consumerPosition, filter, limit, cursorsStorage::register)
       );
       case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          recordDeserializer,
-          cluster.getPollingSettings()
+          processing,
+          cluster.getPollingSettings(),
+          new Cursor.Tracking(deserializer, consumerPosition, filter, limit, cursorsStorage::register)
       );
       case TAILING -> new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          recordDeserializer,
+          processing,
           cluster.getPollingSettings()
       );
     };
-
-    MessageFilterStats filterStats = new MessageFilterStats();
     return Flux.create(emitter)
-        .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
-        .filter(getMsgFilter(query, filterId, filterStats))
         .map(getDataMasker(cluster, topic))
-        .takeWhile(createTakeWhilePredicate(consumerPosition.pollingMode(), limit))
         .map(throttleUiPublish(consumerPosition.pollingMode()));
   }
 
-  private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
-      PollingModeDTO pollingMode, int limit) {
-    return pollingMode == PollingModeDTO.TAILING
-        ? evt -> true // no limit for tailing
-        : new ResultSizeLimiter(limit);
-  }
-
-  private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
-    var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
-    var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
-    return evt -> {
-      if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
-        return evt;
-      }
-      return evt.message(
-        evt.getMessage()
-            .key(keyMasker.apply(evt.getMessage().getKey()))
-            .content(valMasker.apply(evt.getMessage().getContent())));
-    };
-  }
-
   public String registerMessageFilter(String groovyCode) {
-    var filter = MessageFilters.groovyScriptFilter(groovyCode);
-    var id = RandomStringUtils.random(10, true, true);
-    registeredFilters.put(id, filter);
-    return id;
+    String saltedCode = groovyCode + SALT_FOR_HASHING;
+    String filterId = Hashing.sha256()
+        .hashString(saltedCode, Charsets.UTF_8)
+        .toString()
+        .substring(0, 8);
+    if (registeredFilters.getIfPresent(filterId) == null) {
+      registeredFilters.put(filterId, MessageFilters.groovyScriptFilter(groovyCode));
+    }
+    return filterId;
   }
 
-  private Predicate<TopicMessageEventDTO> getMsgFilter(@Nullable String containsStrFilter,
-                                                       @Nullable String filterId,
-                                                       MessageFilterStats filterStats) {
+  private Predicate<TopicMessageDTO> getMsgFilter(@Nullable String containsStrFilter,
+                                                  @Nullable String smartFilterId) {
     Predicate<TopicMessageDTO> messageFilter = MessageFilters.noop();
     if (containsStrFilter != null) {
-      messageFilter = MessageFilters.containsStringFilter(containsStrFilter);
+      messageFilter = messageFilter.and(MessageFilters.containsStringFilter(containsStrFilter));
     }
-    if (filterId != null) {
-      messageFilter = registeredFilters.get(filterId);
-      if (messageFilter == null) {
-        throw new ValidationException("No filter was registered with id " + filterId);
+    if (smartFilterId != null) {
+      var registered = registeredFilters.getIfPresent(smartFilterId);
+      if (registered == null) {
+        throw new ValidationException("No filter was registered with id " + smartFilterId);
       }
+      messageFilter = messageFilter.and(registered);
     }
-    Predicate<TopicMessageDTO> finalMessageFilter = messageFilter;
-    return evt -> {
-      // we only apply filter for message events
-      if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) {
-        try {
-          return finalMessageFilter.test(evt.getMessage());
-        } catch (Exception e) {
-          filterStats.incrementApplyErrors();
-          log.trace("Error applying filter for message {}", evt.getMessage());
-          return false;
-        }
-      }
-      return true;
-    };
+    return messageFilter;
   }
 
   private <T> UnaryOperator<T> throttleUiPublish(PollingModeDTO pollingMode) {

+ 25 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java

@@ -0,0 +1,25 @@
+package com.provectus.kafka.ui.service;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.provectus.kafka.ui.emitter.Cursor;
+import java.util.Optional;
+import org.apache.commons.lang3.RandomStringUtils;
+
+public class PollingCursorsStorage {
+
+  private final Cache<String, Cursor> cursorsCache = CacheBuilder.newBuilder()
+      .maximumSize(10_000)
+      .build();
+
+  public Optional<Cursor> getCursor(String id) {
+    return Optional.ofNullable(cursorsCache.getIfPresent(id));
+  }
+
+  public String register(Cursor cursor) {
+    var id = RandomStringUtils.random(8, true, true);
+    cursorsCache.put(id, cursor);
+    return id;
+  }
+
+}

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java

@@ -109,7 +109,7 @@ class TailingEmitterTest extends AbstractIntegrationTest {
         .get();
 
     return applicationContext.getBean(MessagesService.class)
-        .loadMessagesV2(cluster, topicName,
+        .loadMessages(cluster, topicName,
             new ConsumerPosition(PollingModeDTO.TAILING, topic, List.of(), null, null),
             query,
             null,

+ 2 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java

@@ -54,7 +54,7 @@ class MessagesServiceTest extends AbstractIntegrationTest {
   @Test
   void loadMessagesReturnsExceptionWhenTopicNotFound() {
     StepVerifier.create(messagesService
-            .loadMessagesV2(cluster, NON_EXISTING_TOPIC,
+            .loadMessages(cluster, NON_EXISTING_TOPIC,
                 new ConsumerPosition(PollingModeDTO.TAILING, NON_EXISTING_TOPIC, List.of(), null, null),
                 null, null, 1, "String", "String"))
         .expectError(TopicNotFoundException.class)
@@ -69,7 +69,7 @@ class MessagesServiceTest extends AbstractIntegrationTest {
       producer.send(testTopic, "message1");
       producer.send(testTopic, "message2").get();
 
-      Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessagesV2(
+      Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
           cluster,
           testTopic,
           new ConsumerPosition(PollingModeDTO.EARLIEST, testTopic, List.of(), null, null),

+ 38 - 20
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -10,7 +10,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.Cursor;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition.Offsets;
@@ -44,6 +46,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 import reactor.test.StepVerifier;
@@ -58,6 +61,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static final String EMPTY_TOPIC = TOPIC + "_empty";
   static final List<Record> SENT_RECORDS = new ArrayList<>();
   static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
+  static final Cursor.Tracking CURSOR_MOCK = Mockito.mock(Cursor.Tracking.class);
 
   @BeforeAll
   static void generateMsgs() throws Exception {
@@ -110,21 +114,27 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     );
   }
 
+  private MessagesProcessing createMessagesProcessing() {
+    return new MessagesProcessing(RECORD_DESERIALIZER, msg -> true, null);
+  }
+
   @Test
   void pollNothingOnEmptyTopic() {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         100,
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     StepVerifier.create(Flux.create(forwardEmitter))
@@ -145,16 +155,18 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null),
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(LATEST, TOPIC, List.of(), null, null),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
@@ -175,8 +187,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(FROM_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
             new Offsets(null, targetOffsets)),
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
@@ -184,8 +197,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
             new Offsets(null, targetOffsets)),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -212,8 +226,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(FROM_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     expectEmitter(
@@ -228,8 +243,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(TO_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     expectEmitter(
@@ -254,8 +270,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
             new Offsets(null, targetOffsets)),
         numMessages,
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -280,8 +297,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null, new Offsets(null, offsets)),
         100,
-        RECORD_DESERIALIZER,
-        PollingSettings.createDefault()
+        createMessagesProcessing(),
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     expectEmitter(backwardEmitter,

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java

@@ -497,7 +497,7 @@ public class SendAndReadTests extends AbstractIntegrationTest {
       String topic = createTopicAndCreateSchemas();
       try {
         messagesService.sendMessage(targetCluster, topic, msgToSend).block();
-        TopicMessageDTO polled = messagesService.loadMessagesV2(
+        TopicMessageDTO polled = messagesService.loadMessages(
                 targetCluster,
                 topic,
                 new ConsumerPosition(PollingModeDTO.EARLIEST, topic, List.of(), null, null),

+ 18 - 14
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -792,55 +792,61 @@ paths:
           required: true
           schema:
             type: string
-        - name: m
+        - name: mode
           in: query
           description: Messages polling mode
           required: true
           schema:
             $ref: "#/components/schemas/PollingMode"
-        - name: p
+        - name: partitions
           in: query
           schema:
             type: array
             description: List of target partitions (all partitions if not provided)
             items:
               type: integer
-        - name: lim
+        - name: limit
           in: query
           description: Max number of messages can be returned
           schema:
             type: integer
-        - name: q
+        - name: stringFilter
           in: query
           description: query string to contains string filtration
           schema:
             type: string
-        - name: fid
+        - name: smartFilterId
           in: query
           description: filter id, that was registered beforehand
           schema:
             type: string
-        - name: offs
+        - name: offset
           in: query
-          description: partition offsets to read from / to. Format is "p1:offset1,p2:offset2,...".
+          description: message offset to read from / to
           schema:
-            type: string
-        - name: ts
+            type: integer
+            format: int64
+        - name: timestamp
           in: query
           description: timestamp (in ms) to read from / to
           schema:
             type: integer
             format: int64
-        - name: ks
+        - name: keySerde
           in: query
           description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
           schema:
             type: string
-        - name: vs
+        - name: valueSerde
           in: query
           description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
           schema:
             type: string
+        - name: cursor
+          in: query
+          description: "id of the cursor for pagination"
+          schema:
+            type: string
       responses:
         200:
           description: OK
@@ -2608,10 +2614,8 @@ components:
     TopicMessageNextPageCursor:
       type: object
       properties:
-        offsetsString:
+        id:
           type: string
-        pollingMode:
-          $ref: "#/components/schemas/PollingMode"
 
     TopicMessage:
       type: object