浏览代码

merged with master

iliax 1 年之前
父节点
当前提交
6a62fb87c6

+ 34 - 19
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -16,13 +16,15 @@ import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SeekTypeDTO;
 import com.provectus.kafka.ui.model.SeekTypeDTO;
 import com.provectus.kafka.ui.model.SerdeUsageDTO;
 import com.provectus.kafka.ui.model.SerdeUsageDTO;
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
+import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
 import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.MessagesService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
 import java.util.List;
 import java.util.Optional;
 import java.util.Optional;
 import javax.validation.Valid;
 import javax.validation.Valid;
@@ -43,26 +45,33 @@ public class MessagesController extends AbstractController implements MessagesAp
 
 
   private final MessagesService messagesService;
   private final MessagesService messagesService;
   private final DeserializationService deserializationService;
   private final DeserializationService deserializationService;
-  private final AccessControlService accessControlService;
 
 
   @Override
   @Override
   public Mono<ResponseEntity<Void>> deleteTopicMessages(
   public Mono<ResponseEntity<Void>> deleteTopicMessages(
       String clusterName, String topicName, @Valid List<Integer> partitions,
       String clusterName, String topicName, @Valid List<Integer> partitions,
       ServerWebExchange exchange) {
       ServerWebExchange exchange) {
 
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .cluster(clusterName)
         .topic(topicName)
         .topic(topicName)
         .topicActions(MESSAGES_DELETE)
         .topicActions(MESSAGES_DELETE)
-        .build());
+        .build();
 
 
-    return validateAccess.then(
+    return validateAccess(context).<ResponseEntity<Void>>then(
         messagesService.deleteTopicMessages(
         messagesService.deleteTopicMessages(
             getCluster(clusterName),
             getCluster(clusterName),
             topicName,
             topicName,
             Optional.ofNullable(partitions).orElse(List.of())
             Optional.ofNullable(partitions).orElse(List.of())
         ).thenReturn(ResponseEntity.ok().build())
         ).thenReturn(ResponseEntity.ok().build())
-    );
+    ).doOnEach(sig -> audit(context, sig));
+  }
+
+  @Override
+  public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilterTest(
+      Mono<SmartFilterTestExecutionDTO> smartFilterTestExecutionDto, ServerWebExchange exchange) {
+    return smartFilterTestExecutionDto
+        .map(MessagesService::execSmartFilterTest)
+        .map(ResponseEntity::ok);
   }
   }
 
 
   @Deprecated
   @Deprecated
@@ -95,12 +104,17 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                              String valueSerde,
                                                                              String valueSerde,
                                                                              String cursor,
                                                                              String cursor,
                                                                              ServerWebExchange exchange) {
                                                                              ServerWebExchange exchange) {
-    var context = AccessContext.builder()
+    var contextBuilder = AccessContext.builder()
         .cluster(clusterName)
         .cluster(clusterName)
         .topic(topicName)
         .topic(topicName)
         .topicActions(MESSAGES_READ)
         .topicActions(MESSAGES_READ)
-        .operationName("getTopicMessages")
-        .build();
+        .operationName("getTopicMessages");
+
+    if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
+      contextBuilder.auditActions(AuditAction.VIEW);
+    }
+
+    var accessContext = contextBuilder.build();
 
 
     Flux<TopicMessageEventDTO> messagesFlux;
     Flux<TopicMessageEventDTO> messagesFlux;
     if (cursor != null) {
     if (cursor != null) {
@@ -117,9 +131,9 @@ public class MessagesController extends AbstractController implements MessagesAp
           valueSerde
           valueSerde
       );
       );
     }
     }
-    return accessControlService.validateAccess(context)
+    return accessControlService.validateAccess(accessContext)
         .then(Mono.just(ResponseEntity.ok(messagesFlux)))
         .then(Mono.just(ResponseEntity.ok(messagesFlux)))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> auditService.audit(accessContext, sig));
   }
   }
 
 
   @Override
   @Override
@@ -127,17 +141,18 @@ public class MessagesController extends AbstractController implements MessagesAp
       String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
       String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
       ServerWebExchange exchange) {
       ServerWebExchange exchange) {
 
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .cluster(clusterName)
         .topic(topicName)
         .topic(topicName)
         .topicActions(MESSAGES_PRODUCE)
         .topicActions(MESSAGES_PRODUCE)
-        .build());
+        .operationName("sendTopicMessages")
+        .build();
 
 
-    return validateAccess.then(
+    return validateAccess(context).then(
         createTopicMessage.flatMap(msg ->
         createTopicMessage.flatMap(msg ->
             messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
             messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
         ).map(ResponseEntity::ok)
         ).map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -145,12 +160,12 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                  String topicName,
                                                                  String topicName,
                                                                  SerdeUsageDTO use,
                                                                  SerdeUsageDTO use,
                                                                  ServerWebExchange exchange) {
                                                                  ServerWebExchange exchange) {
-
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .cluster(clusterName)
         .topic(topicName)
         .topic(topicName)
         .topicActions(TopicAction.VIEW)
         .topicActions(TopicAction.VIEW)
-        .build());
+        .operationName("getSerdes")
+        .build();
 
 
     TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
     TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
         .key(use == SerdeUsageDTO.SERIALIZE
         .key(use == SerdeUsageDTO.SERIALIZE
@@ -160,7 +175,7 @@ public class MessagesController extends AbstractController implements MessagesAp
             ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
             ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
             : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
             : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
 
 
-    return validateAccess.then(
+    return validateAccess(context).then(
         Mono.just(dto)
         Mono.just(dto)
             .subscribeOn(Schedulers.boundedElastic())
             .subscribeOn(Schedulers.boundedElastic())
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)

+ 11 - 28
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -2,39 +2,23 @@ package com.provectus.kafka.ui.emitter;
 
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import jakarta.annotation.Nullable;
 import jakarta.annotation.Nullable;
-import java.time.Duration;
-import java.time.Instant;
-import javax.annotation.Nullable;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
 
 
-public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
+abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
 
   private final MessagesProcessing messagesProcessing;
   private final MessagesProcessing messagesProcessing;
-  private final PollingThrottler throttler;
-  protected final PollingSettings pollingSettings;
+  private final PollingSettings pollingSettings;
 
 
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
   protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
     this.messagesProcessing = messagesProcessing;
     this.messagesProcessing = messagesProcessing;
     this.pollingSettings = pollingSettings;
     this.pollingSettings = pollingSettings;
-    this.throttler = pollingSettings.getPollingThrottler();
   }
   }
 
 
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
-    return poll(sink, consumer, pollingSettings.getPollTimeout());
-  }
-
-  protected ConsumerRecords<Bytes, Bytes> poll(
-      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
-    Instant start = Instant.now();
-    ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
-    Instant finish = Instant.now();
-    int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
-    throttler.throttleAfterPoll(polledBytes);
+  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
+    var records = consumer.pollEnhanced(pollingSettings.getPollTimeout());
+    sendConsuming(sink, records);
     return records;
     return records;
   }
   }
 
 
@@ -42,19 +26,18 @@ public abstract class AbstractEmitter implements java.util.function.Consumer<Flu
     return messagesProcessing.limitReached();
     return messagesProcessing.limitReached();
   }
   }
 
 
-  protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
-                             ConsumerRecord<Bytes, Bytes> msg) {
-    messagesProcessing.sendMsg(sink, msg);
+  protected void send(FluxSink<TopicMessageEventDTO> sink,
+                      Iterable<ConsumerRecord<Bytes, Bytes>> records,
+                      @Nullable Cursor.Tracking cursor) {
+    messagesProcessing.send(sink, records, cursor);
   }
   }
 
 
   protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
   protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
     messagesProcessing.sendPhase(sink, name);
     messagesProcessing.sendPhase(sink, name);
   }
   }
 
 
-  protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
-                              ConsumerRecords<Bytes, Bytes> records,
-                              long elapsed) {
-    return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
+  protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
+    messagesProcessing.sentConsumingInfo(sink, records);
   }
   }
 
 
   // cursor is null if target partitions were fully polled (no, need to do paging)
   // cursor is null if target partitions were fully polled (no, need to do paging)

+ 5 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java

@@ -18,18 +18,15 @@ public class BackwardEmitter extends RangePollingEmitter {
                          int messagesPerPage,
                          int messagesPerPage,
                          ConsumerRecordDeserializer deserializer,
                          ConsumerRecordDeserializer deserializer,
                          Predicate<TopicMessageDTO> filter,
                          Predicate<TopicMessageDTO> filter,
-                         PollingSettings pollingSettings) {
+                         PollingSettings pollingSettings,
+                         Cursor.Tracking cursor) {
     super(
     super(
         consumerSupplier,
         consumerSupplier,
         consumerPosition,
         consumerPosition,
         messagesPerPage,
         messagesPerPage,
-        new MessagesProcessing(
-            deserializer,
-            filter,
-            false,
-            messagesPerPage
-        ),
-        pollingSettings
+        new MessagesProcessing(deserializer, filter, false, messagesPerPage),
+        pollingSettings,
+        cursor
     );
     );
   }
   }
 
 

+ 17 - 24
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -3,10 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO;
 import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO;
-import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
 
 
 class ConsumingStats {
 class ConsumingStats {
@@ -14,27 +11,24 @@ class ConsumingStats {
   private long bytes = 0;
   private long bytes = 0;
   private int records = 0;
   private int records = 0;
   private long elapsed = 0;
   private long elapsed = 0;
+  private int filterApplyErrors = 0;
 
 
-  /**
-   * returns bytes polled.
-   */
-  int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
-                       ConsumerRecords<Bytes, Bytes> polledRecords,
-                       long elapsed,
-                       int filterApplyErrors) {
-    int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
-    bytes += polledBytes;
-    this.records += polledRecords.count();
-    this.elapsed += elapsed;
+  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
+    bytes += polledRecords.bytes();
+    records += polledRecords.count();
+    elapsed += polledRecords.elapsed().toMillis();
     sink.next(
     sink.next(
         new TopicMessageEventDTO()
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
-            .consuming(createConsumingStats(sink, filterApplyErrors))
+            .consuming(createConsumingStats())
     );
     );
-    return polledBytes;
   }
   }
 
 
-  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors, @Nullable Cursor.Tracking cursor) {
+  void incFilterApplyError() {
+    filterApplyErrors++;
+  }
+
+  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
     sink.next(
     sink.next(
         new TopicMessageEventDTO()
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.DONE)
             .type(TopicMessageEventDTO.TypeEnum.DONE)
@@ -43,17 +37,16 @@ class ConsumingStats {
                     ? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
                     ? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
                     : null
                     : null
             )
             )
-            .consuming(createConsumingStats(sink, filterApplyErrors))
+            .consuming(createConsumingStats())
     );
     );
   }
   }
 
 
-  private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
-                                                        int filterApplyErrors) {
+  private TopicMessageConsumingDTO createConsumingStats() {
     return new TopicMessageConsumingDTO()
     return new TopicMessageConsumingDTO()
-        .bytesConsumed(this.bytes)
-        .elapsedMs(this.elapsed)
-        .isCancelled(sink.isCancelled())
+        .bytesConsumed(bytes)
+        .elapsedMs(elapsed)
+        .isCancelled(false)
         .filterApplyErrors(filterApplyErrors)
         .filterApplyErrors(filterApplyErrors)
-        .messagesConsumed(this.records);
+        .messagesConsumed(records);
   }
   }
 }
 }

+ 31 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.emitter;
 package com.provectus.kafka.ui.emitter;
 
 
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
@@ -20,32 +22,41 @@ public record Cursor(ConsumerRecordDeserializer deserializer,
     private final ConsumerPosition originalPosition;
     private final ConsumerPosition originalPosition;
     private final Predicate<TopicMessageDTO> filter;
     private final Predicate<TopicMessageDTO> filter;
     private final int limit;
     private final int limit;
-    private final Function<Cursor, String> cursorRegistry;
+    private final Function<Cursor, String> registerAction;
 
 
-    private final Map<TopicPartition, Long> trackingOffsets = new HashMap<>();
+    //topic -> partition -> offset
+    private final Table<String, Integer, Long> trackingOffsets = HashBasedTable.create();
 
 
     public Tracking(ConsumerRecordDeserializer deserializer,
     public Tracking(ConsumerRecordDeserializer deserializer,
                     ConsumerPosition originalPosition,
                     ConsumerPosition originalPosition,
                     Predicate<TopicMessageDTO> filter,
                     Predicate<TopicMessageDTO> filter,
                     int limit,
                     int limit,
-                    Function<Cursor, String> cursorRegistry) {
+                    Function<Cursor, String> registerAction) {
       this.deserializer = deserializer;
       this.deserializer = deserializer;
       this.originalPosition = originalPosition;
       this.originalPosition = originalPosition;
       this.filter = filter;
       this.filter = filter;
       this.limit = limit;
       this.limit = limit;
-      this.cursorRegistry = cursorRegistry;
+      this.registerAction = registerAction;
     }
     }
 
 
-    void trackOffset(TopicPartition tp, long offset) {
-      trackingOffsets.put(tp, offset);
+    void trackOffset(String topic, int partition, long offset) {
+      trackingOffsets.put(topic, partition, offset);
     }
     }
 
 
-    void trackOffsets(Map<TopicPartition, Long> offsets) {
-      this.trackingOffsets.putAll(offsets);
+    void initOffsets(Map<TopicPartition, Long> initialSeekOffsets) {
+      initialSeekOffsets.forEach((tp, off) -> trackOffset(tp.topic(), tp.partition(), off));
+    }
+
+    private Map<TopicPartition, Long> getOffsetsMap(int offsetToAdd) {
+      Map<TopicPartition, Long> result = new HashMap<>();
+      trackingOffsets.rowMap()
+          .forEach((topic, partsMap) ->
+              partsMap.forEach((p, off) -> result.put(new TopicPartition(topic, p), off + offsetToAdd)));
+      return result;
     }
     }
 
 
     String registerCursor() {
     String registerCursor() {
-      return cursorRegistry.apply(
+      return registerAction.apply(
           new Cursor(
           new Cursor(
               deserializer,
               deserializer,
               new ConsumerPosition(
               new ConsumerPosition(
@@ -57,7 +68,17 @@ public record Cursor(ConsumerRecordDeserializer deserializer,
                   originalPosition.topic(),
                   originalPosition.topic(),
                   originalPosition.partitions(),
                   originalPosition.partitions(),
                   null,
                   null,
-                  new ConsumerPosition.Offsets(null, trackingOffsets)
+                  new ConsumerPosition.Offsets(
+                      null,
+                      getOffsetsMap(
+                          switch (originalPosition.pollingMode()) {
+                            case TO_OFFSET, TO_TIMESTAMP, LATEST -> 0;
+                            // when doing forward polling we need to start from latest msg's offset + 1
+                            case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> 1;
+                            case TAILING -> throw new IllegalStateException();
+                          }
+                      )
+                  )
               ),
               ),
               filter,
               filter,
               limit
               limit

+ 5 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java

@@ -18,18 +18,15 @@ public class ForwardEmitter extends RangePollingEmitter {
                         int messagesPerPage,
                         int messagesPerPage,
                         ConsumerRecordDeserializer deserializer,
                         ConsumerRecordDeserializer deserializer,
                         Predicate<TopicMessageDTO> filter,
                         Predicate<TopicMessageDTO> filter,
-                        PollingSettings pollingSettings) {
+                        PollingSettings pollingSettings,
+                        Cursor.Tracking cursor) {
     super(
     super(
         consumerSupplier,
         consumerSupplier,
         consumerPosition,
         consumerPosition,
         messagesPerPage,
         messagesPerPage,
-        new MessagesProcessing(
-            deserializer,
-            filter,
-            true,
-            messagesPerPage
-        ),
-        pollingSettings
+        new MessagesProcessing(deserializer, filter, true, messagesPerPage),
+        pollingSettings,
+        cursor
     );
     );
   }
   }
 
 

+ 69 - 34
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -1,71 +1,80 @@
 package com.provectus.kafka.ui.emitter;
 package com.provectus.kafka.ui.emitter;
 
 
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Predicate;
 import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
 
 
 @Slf4j
 @Slf4j
-public class MessagesProcessing {
+@RequiredArgsConstructor
+class MessagesProcessing {
 
 
   private final ConsumingStats consumingStats = new ConsumingStats();
   private final ConsumingStats consumingStats = new ConsumingStats();
   private long sentMessages = 0;
   private long sentMessages = 0;
-  private int filterApplyErrors = 0;
 
 
   private final ConsumerRecordDeserializer deserializer;
   private final ConsumerRecordDeserializer deserializer;
   private final Predicate<TopicMessageDTO> filter;
   private final Predicate<TopicMessageDTO> filter;
+  private final boolean ascendingSortBeforeSend;
   private final @Nullable Integer limit;
   private final @Nullable Integer limit;
 
 
-  public MessagesProcessing(ConsumerRecordDeserializer deserializer,
-                            Predicate<TopicMessageDTO> filter,
-                            @Nullable Integer limit) {
-    this.deserializer = deserializer;
-    this.filter = filter;
-    this.limit = limit;
-  }
-
   boolean limitReached() {
   boolean limitReached() {
     return limit != null && sentMessages >= limit;
     return limit != null && sentMessages >= limit;
   }
   }
 
 
-  void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
-    if (!sink.isCancelled() && !limitReached()) {
-      TopicMessageDTO topicMessage = deserializer.deserialize(rec);
-      try {
-        if (filter.test(topicMessage)) {
-          sink.next(
-              new TopicMessageEventDTO()
-                  .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
-                  .message(topicMessage)
-          );
-          sentMessages++;
-        }
-      } catch (Exception e) {
-        filterApplyErrors++;
-        log.trace("Error applying filter for message {}", topicMessage);
-      }
-    }
+  void send(FluxSink<TopicMessageEventDTO> sink,
+            Iterable<ConsumerRecord<Bytes, Bytes>> polled,
+            @Nullable Cursor.Tracking cursor) {
+    sortForSending(polled, ascendingSortBeforeSend)
+        .forEach(rec -> {
+          if (!limitReached() && !sink.isCancelled()) {
+            TopicMessageDTO topicMessage = deserializer.deserialize(rec);
+            try {
+              if (filter.test(topicMessage)) {
+                sink.next(
+                    new TopicMessageEventDTO()
+                        .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
+                        .message(topicMessage)
+                );
+                sentMessages++;
+              }
+              if (cursor != null) {
+                cursor.trackOffset(rec.topic(), rec.partition(), rec.offset());
+              }
+            } catch (Exception e) {
+              consumingStats.incFilterApplyError();
+              log.trace("Error applying filter for message {}", topicMessage);
+            }
+          }
+        });
   }
   }
 
 
-  int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
-                        ConsumerRecords<Bytes, Bytes> polledRecords,
-                        long elapsed) {
+  void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
     if (!sink.isCancelled()) {
     if (!sink.isCancelled()) {
-      return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
+      consumingStats.sendConsumingEvt(sink, polledRecords);
     }
     }
-    return 0;
   }
   }
 
 
   void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
   void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
     if (!sink.isCancelled()) {
     if (!sink.isCancelled()) {
-      consumingStats.sendFinishEvent(sink, filterApplyErrors, cursor);
+      consumingStats.sendFinishEvent(sink, cursor);
     }
     }
   }
   }
 
 
@@ -79,4 +88,30 @@ public class MessagesProcessing {
     }
     }
   }
   }
 
 
+  /*
+   * Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
+   */
+  @VisibleForTesting
+  static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
+                                                               boolean asc) {
+    Comparator<ConsumerRecord> offsetComparator = asc
+        ? Comparator.comparingLong(ConsumerRecord::offset)
+        : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
+
+    // partition -> sorted by offsets records
+    Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
+        .collect(
+            groupingBy(
+                ConsumerRecord::partition,
+                TreeMap::new,
+                collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
+
+    Comparator<ConsumerRecord> tsComparator = asc
+        ? Comparator.comparing(ConsumerRecord::timestamp)
+        : Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
+
+    // merge-sorting records from partitions one by one using timestamp comparator
+    return Iterables.mergeSorted(perPartition.values(), tsComparator);
+  }
+
 }
 }

+ 9 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java

@@ -17,6 +17,7 @@ import reactor.core.publisher.FluxSink;
 abstract class RangePollingEmitter extends AbstractEmitter {
 abstract class RangePollingEmitter extends AbstractEmitter {
 
 
   private final Supplier<EnhancedConsumer> consumerSupplier;
   private final Supplier<EnhancedConsumer> consumerSupplier;
+  private final Cursor.Tracking cursor;
   protected final ConsumerPosition consumerPosition;
   protected final ConsumerPosition consumerPosition;
   protected final int messagesPerPage;
   protected final int messagesPerPage;
 
 
@@ -24,11 +25,13 @@ abstract class RangePollingEmitter extends AbstractEmitter {
                                 ConsumerPosition consumerPosition,
                                 ConsumerPosition consumerPosition,
                                 int messagesPerPage,
                                 int messagesPerPage,
                                 MessagesProcessing messagesProcessing,
                                 MessagesProcessing messagesProcessing,
-                                PollingSettings pollingSettings) {
+                                PollingSettings pollingSettings,
+                                Cursor.Tracking cursor) {
     super(messagesProcessing, pollingSettings);
     super(messagesProcessing, pollingSettings);
     this.consumerPosition = consumerPosition;
     this.consumerPosition = consumerPosition;
     this.messagesPerPage = messagesPerPage;
     this.messagesPerPage = messagesPerPage;
     this.consumerSupplier = consumerSupplier;
     this.consumerSupplier = consumerSupplier;
+    this.cursor = cursor;
   }
   }
 
 
   protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
   protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
@@ -46,18 +49,20 @@ abstract class RangePollingEmitter extends AbstractEmitter {
     try (EnhancedConsumer consumer = consumerSupplier.get()) {
     try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Consumer created");
       sendPhase(sink, "Consumer created");
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
+      cursor.initOffsets(seekOperations.getOffsetsForSeek());
+
       TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
       TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
       log.debug("Starting from offsets {}", pollRange);
       log.debug("Starting from offsets {}", pollRange);
 
 
-      while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
+      while (!sink.isCancelled() && !pollRange.isEmpty() && !isSendLimitReached()) {
         var polled = poll(consumer, sink, pollRange);
         var polled = poll(consumer, sink, pollRange);
-        send(sink, polled);
+        send(sink, polled, cursor);
         pollRange = nextPollingRange(pollRange, seekOperations);
         pollRange = nextPollingRange(pollRange, seekOperations);
       }
       }
       if (sink.isCancelled()) {
       if (sink.isCancelled()) {
         log.debug("Polling finished due to sink cancellation");
         log.debug("Polling finished due to sink cancellation");
       }
       }
-      sendFinishStatsAndCompleteSink(sink);
+      sendFinishStatsAndCompleteSink(sink, pollRange.isEmpty() ? null : cursor);
       log.debug("Polling finished");
       log.debug("Polling finished");
     } catch (InterruptException kafkaInterruptException) {
     } catch (InterruptException kafkaInterruptException) {
       log.debug("Polling finished due to thread interruption");
       log.debug("Polling finished due to thread interruption");

+ 3 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java

@@ -20,7 +20,7 @@ class SeekOperations {
   private final OffsetsInfo offsetsInfo;
   private final OffsetsInfo offsetsInfo;
   private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
   private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
 
 
-  static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
+  public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
     OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty()
     OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty()
         ? new OffsetsInfo(consumer, consumerPosition.topic())
         ? new OffsetsInfo(consumer, consumerPosition.topic())
         : new OffsetsInfo(consumer, consumerPosition.partitions());
         : new OffsetsInfo(consumer, consumerPosition.partitions());
@@ -28,7 +28,7 @@ class SeekOperations {
     return new SeekOperations(consumer, offsetsInfo, offsetsToSeek);
     return new SeekOperations(consumer, offsetsInfo, offsetsToSeek);
   }
   }
 
 
-  void assignAndSeek() {
+  public void assignAndSeekNonEmptyPartitions() {
     consumer.assign(offsetsForSeek.keySet());
     consumer.assign(offsetsForSeek.keySet());
     offsetsForSeek.forEach(consumer::seek);
     offsetsForSeek.forEach(consumer::seek);
   }
   }
@@ -69,8 +69,7 @@ class SeekOperations {
     if (positionOffset.offset() != null) {
     if (positionOffset.offset() != null) {
       offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
       offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
     } else {
     } else {
-      requireNonNull(positionOffset.tpOffsets());
-      offsets.putAll(positionOffset.tpOffsets());
+      offsets.putAll(requireNonNull(positionOffset.tpOffsets()));
       offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
       offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
     }
     }
 
 

+ 17 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -1,24 +1,28 @@
 package com.provectus.kafka.ui.emitter;
 package com.provectus.kafka.ui.emitter;
 
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.HashMap;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InterruptException;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
 
 
 @Slf4j
 @Slf4j
-public class TailingEmitter extends AbstractEmitter
-    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
+public class TailingEmitter extends AbstractEmitter {
 
 
   private final Supplier<EnhancedConsumer> consumerSupplier;
   private final Supplier<EnhancedConsumer> consumerSupplier;
   private final ConsumerPosition consumerPosition;
   private final ConsumerPosition consumerPosition;
 
 
   public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
   public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
                         ConsumerPosition consumerPosition,
                         ConsumerPosition consumerPosition,
-                        MessagesProcessing messagesProcessing,
+                        ConsumerRecordDeserializer deserializer,
+                        Predicate<TopicMessageDTO> filter,
                         PollingSettings pollingSettings) {
                         PollingSettings pollingSettings) {
-    super(messagesProcessing, pollingSettings);
+    super(new MessagesProcessing(deserializer, filter, false, null), pollingSettings);
     this.consumerSupplier = consumerSupplier;
     this.consumerSupplier = consumerSupplier;
     this.consumerPosition = consumerPosition;
     this.consumerPosition = consumerPosition;
   }
   }
@@ -27,12 +31,11 @@ public class TailingEmitter extends AbstractEmitter
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
     log.debug("Starting tailing polling for {}", consumerPosition);
     log.debug("Starting tailing polling for {}", consumerPosition);
     try (EnhancedConsumer consumer = consumerSupplier.get()) {
     try (EnhancedConsumer consumer = consumerSupplier.get()) {
-      SeekOperations.create(consumer, consumerPosition)
-          .assignAndSeek();
+      assignAndSeek(consumer);
       while (!sink.isCancelled()) {
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");
         sendPhase(sink, "Polling");
         var polled = poll(sink, consumer);
         var polled = poll(sink, consumer);
-        polled.forEach(r -> sendMessage(sink, r));
+        send(sink, polled, null);
       }
       }
       sink.complete();
       sink.complete();
       log.debug("Tailing finished");
       log.debug("Tailing finished");
@@ -45,4 +48,11 @@ public class TailingEmitter extends AbstractEmitter
     }
     }
   }
   }
 
 
+  private void assignAndSeek(EnhancedConsumer consumer) {
+    var seekOperations = SeekOperations.create(consumer, consumerPosition);
+    var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
+    seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions
+    consumer.assign(seekOffsets.keySet());
+    seekOffsets.forEach(consumer::seek);
+  }
 }
 }

+ 32 - 54
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -6,25 +6,21 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.BackwardEmitter;
 import com.provectus.kafka.ui.emitter.Cursor;
 import com.provectus.kafka.ui.emitter.Cursor;
-import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilters;
 import com.provectus.kafka.ui.emitter.MessageFilters;
-import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.PollingModeDTO;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
@@ -252,81 +248,45 @@ public class MessagesService {
     return withExistingTopic(cluster, topic)
     return withExistingTopic(cluster, topic)
         .flux()
         .flux()
         .publishOn(Schedulers.boundedElastic())
         .publishOn(Schedulers.boundedElastic())
-        .flatMap(td -> loadMessagesImpl(cluster, topic, deserializer, consumerPosition, filter, limit));
+        .flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit));
   }
   }
 
 
   private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
   private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
-                                                      String topic,
                                                       ConsumerRecordDeserializer deserializer,
                                                       ConsumerRecordDeserializer deserializer,
                                                       ConsumerPosition consumerPosition,
                                                       ConsumerPosition consumerPosition,
                                                       Predicate<TopicMessageDTO> filter,
                                                       Predicate<TopicMessageDTO> filter,
                                                       int limit) {
                                                       int limit) {
-    var processing = new MessagesProcessing(
-        deserializer,
-        filter,
-        consumerPosition.pollingMode() == PollingModeDTO.TAILING ? null : limit
-    );
-
     var emitter = switch (consumerPosition.pollingMode()) {
     var emitter = switch (consumerPosition.pollingMode()) {
-      case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardRecordEmitter(
+      case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           consumerPosition,
           limit,
           limit,
-          processing,
+          deserializer,
+          filter,
           cluster.getPollingSettings(),
           cluster.getPollingSettings(),
-          new Cursor.Tracking(deserializer, consumerPosition, filter, limit, cursorsStorage::register)
+          cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
       );
       );
-      case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardRecordEmitter(
+      case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           consumerPosition,
-          processing,
+          limit,
+          deserializer,
+          filter,
           cluster.getPollingSettings(),
           cluster.getPollingSettings(),
-          new Cursor.Tracking(deserializer, consumerPosition, filter, limit, cursorsStorage::register)
+          cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
       );
       );
       case TAILING -> new TailingEmitter(
       case TAILING -> new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           consumerPosition,
-          processing,
+          deserializer,
+          filter,
           cluster.getPollingSettings()
           cluster.getPollingSettings()
       );
       );
     };
     };
     return Flux.create(emitter)
     return Flux.create(emitter)
-        .map(getDataMasker(cluster, topic))
         .map(throttleUiPublish(consumerPosition.pollingMode()));
         .map(throttleUiPublish(consumerPosition.pollingMode()));
   }
   }
 
 
-  private int fixPageSize(@Nullable Integer pageSize) {
-    return Optional.ofNullable(pageSize)
-        .filter(ps -> ps > 0 && ps <= maxPageSize)
-        .orElse(defaultPageSize);
-  }
-
-  public String registerMessageFilter(String groovyCode) {
-    String saltedCode = groovyCode + SALT_FOR_HASHING;
-    String filterId = Hashing.sha256()
-        .hashString(saltedCode, Charsets.UTF_8)
-        .toString()
-        .substring(0, 8);
-    if (registeredFilters.getIfPresent(filterId) == null) {
-      registeredFilters.put(filterId, MessageFilters.groovyScriptFilter(groovyCode));
-    }
-    return filterId;
-  }
-
-  private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
-    var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
-    var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
-    return evt -> {
-      if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
-        return evt;
-      }
-      return evt.message(
-          evt.getMessage()
-              .key(keyMasker.apply(evt.getMessage().getKey()))
-              .content(valMasker.apply(evt.getMessage().getContent())));
-    };
-  }
-
   private Predicate<TopicMessageDTO> getMsgFilter(@Nullable String containsStrFilter,
   private Predicate<TopicMessageDTO> getMsgFilter(@Nullable String containsStrFilter,
                                                   @Nullable String smartFilterId) {
                                                   @Nullable String smartFilterId) {
     Predicate<TopicMessageDTO> messageFilter = MessageFilters.noop();
     Predicate<TopicMessageDTO> messageFilter = MessageFilters.noop();
@@ -356,4 +316,22 @@ public class MessagesService {
     return UnaryOperator.identity();
     return UnaryOperator.identity();
   }
   }
 
 
+  private int fixPageSize(@Nullable Integer pageSize) {
+    return Optional.ofNullable(pageSize)
+        .filter(ps -> ps > 0 && ps <= maxPageSize)
+        .orElse(defaultPageSize);
+  }
+
+  public String registerMessageFilter(String groovyCode) {
+    String saltedCode = groovyCode + SALT_FOR_HASHING;
+    String filterId = Hashing.sha256()
+        .hashString(saltedCode, Charsets.UTF_8)
+        .toString()
+        .substring(0, 8);
+    if (registeredFilters.getIfPresent(filterId) == null) {
+      registeredFilters.put(filterId, MessageFilters.groovyScriptFilter(groovyCode));
+    }
+    return filterId;
+  }
+
 }
 }

+ 12 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java

@@ -4,8 +4,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheBuilder;
 import com.provectus.kafka.ui.emitter.Cursor;
 import com.provectus.kafka.ui.emitter.Cursor;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
+import java.util.function.Predicate;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 
 
 public class PollingCursorsStorage {
 public class PollingCursorsStorage {
@@ -16,6 +20,14 @@ public class PollingCursorsStorage {
       .maximumSize(MAX_SIZE)
       .maximumSize(MAX_SIZE)
       .build();
       .build();
 
 
+
+  public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer,
+                                         ConsumerPosition originalPosition,
+                                         Predicate<TopicMessageDTO> filter,
+                                         int limit) {
+    return new Cursor.Tracking(deserializer, originalPosition, filter, limit, this::register);
+  }
+
   public Optional<Cursor> getCursor(String id) {
   public Optional<Cursor> getCursor(String id) {
     return Optional.ofNullable(cursorsCache.getIfPresent(id));
     return Optional.ofNullable(cursorsCache.getIfPresent(id));
   }
   }

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,6 +1,6 @@
 package com.provectus.kafka.ui.service.analyze;
 package com.provectus.kafka.ui.service.analyze;
 
 
-import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
+import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST;
 
 
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.SeekOperations;
 import com.provectus.kafka.ui.emitter.SeekOperations;
@@ -14,6 +14,7 @@ import java.io.Closeable;
 import java.time.Duration;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
@@ -104,7 +105,8 @@ public class TopicAnalysisService {
         consumer.partitionsFor(topicId.topicName)
         consumer.partitionsFor(topicId.topicName)
             .forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
             .forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
 
 
-        var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
+        var seekOperations =
+            SeekOperations.create(consumer, new ConsumerPosition(EARLIEST, topicId.topicName, List.of(), null, null));
         long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
         long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
         seekOperations.assignAndSeekNonEmptyPartitions();
         seekOperations.assignAndSeekNonEmptyPartitions();
 
 

+ 21 - 20
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java

@@ -117,56 +117,56 @@ class CursorTest extends AbstractIntegrationTest {
         .verifyComplete();
         .verifyComplete();
   }
   }
 
 
-  private BackwardRecordEmitter createBackwardEmitter(ConsumerPosition position) {
-    return new BackwardRecordEmitter(
+  private BackwardEmitter createBackwardEmitter(ConsumerPosition position) {
+    return new BackwardEmitter(
         this::createConsumer,
         this::createConsumer,
         position,
         position,
         PAGE_SIZE,
         PAGE_SIZE,
-        new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
+        createRecordsDeserializer(),
+        m -> true,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         createCursor(position)
         createCursor(position)
     );
     );
   }
   }
 
 
-  private BackwardRecordEmitter createBackwardEmitterWithCursor(Cursor cursor) {
-    return new BackwardRecordEmitter(
+  private BackwardEmitter createBackwardEmitterWithCursor(Cursor cursor) {
+    return new BackwardEmitter(
         this::createConsumer,
         this::createConsumer,
         cursor.consumerPosition(),
         cursor.consumerPosition(),
         cursor.limit(),
         cursor.limit(),
-        new MessagesProcessing(cursor.deserializer(), cursor.filter(), PAGE_SIZE),
+        cursor.deserializer(),
+        cursor.filter(),
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         createCursor(cursor.consumerPosition())
         createCursor(cursor.consumerPosition())
     );
     );
   }
   }
 
 
-  private ForwardRecordEmitter createForwardEmitterWithCursor(Cursor cursor) {
-    return new ForwardRecordEmitter(
+  private ForwardEmitter createForwardEmitterWithCursor(Cursor cursor) {
+    return new ForwardEmitter(
         this::createConsumer,
         this::createConsumer,
         cursor.consumerPosition(),
         cursor.consumerPosition(),
-        new MessagesProcessing(cursor.deserializer(), cursor.filter(), PAGE_SIZE),
+        cursor.limit(),
+        cursor.deserializer(),
+        cursor.filter(),
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         createCursor(cursor.consumerPosition())
         createCursor(cursor.consumerPosition())
     );
     );
   }
   }
 
 
-  private ForwardRecordEmitter createForwardEmitter(ConsumerPosition position) {
-    return new ForwardRecordEmitter(
+  private ForwardEmitter createForwardEmitter(ConsumerPosition position) {
+    return new ForwardEmitter(
         this::createConsumer,
         this::createConsumer,
         position,
         position,
-        new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
+        PAGE_SIZE,
+        createRecordsDeserializer(),
+        m -> true,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         createCursor(position)
         createCursor(position)
     );
     );
   }
   }
 
 
   private Cursor.Tracking createCursor(ConsumerPosition position) {
   private Cursor.Tracking createCursor(ConsumerPosition position) {
-    return new Cursor.Tracking(
-        createRecordsDeserializer(),
-        position,
-        m -> true,
-        PAGE_SIZE,
-        cursorsStorage::register
-    );
+    return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE);
   }
   }
 
 
   private EnhancedConsumer createConsumer() {
   private EnhancedConsumer createConsumer() {
@@ -187,7 +187,8 @@ class CursorTest extends AbstractIntegrationTest {
         s.deserializer(null, Serde.Target.VALUE),
         s.deserializer(null, Serde.Target.VALUE),
         StringSerde.name(),
         StringSerde.name(),
         s.deserializer(null, Serde.Target.KEY),
         s.deserializer(null, Serde.Target.KEY),
-        s.deserializer(null, Serde.Target.VALUE)
+        s.deserializer(null, Serde.Target.VALUE),
+        msg -> msg
     );
     );
   }
   }
 
 

+ 54 - 42
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -9,19 +9,21 @@ import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
-import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.BackwardEmitter;
 import com.provectus.kafka.ui.emitter.Cursor;
 import com.provectus.kafka.ui.emitter.Cursor;
-import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
-import com.provectus.kafka.ui.emitter.MessagesProcessing;
+import com.provectus.kafka.ui.emitter.EnhancedConsumer;
+import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.emitter.PollingSettings;
+import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.ConsumerPosition.Offsets;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.util.ApplicationMetrics;
 import java.io.Serializable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -32,17 +34,15 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 import lombok.Value;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;
@@ -62,6 +62,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static final List<Record> SENT_RECORDS = new ArrayList<>();
   static final List<Record> SENT_RECORDS = new ArrayList<>();
   static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
   static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
   static final Cursor.Tracking CURSOR_MOCK = Mockito.mock(Cursor.Tracking.class);
   static final Cursor.Tracking CURSOR_MOCK = Mockito.mock(Cursor.Tracking.class);
+  static final Predicate<TopicMessageDTO> NOOP_FILTER = m -> true;
 
 
   @BeforeAll
   @BeforeAll
   static void generateMsgs() throws Exception {
   static void generateMsgs() throws Exception {
@@ -98,6 +99,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static void cleanup() {
   static void cleanup() {
     deleteTopic(TOPIC);
     deleteTopic(TOPIC);
     deleteTopic(EMPTY_TOPIC);
     deleteTopic(EMPTY_TOPIC);
+    SENT_RECORDS.clear();
   }
   }
 
 
   private static ConsumerRecordDeserializer createRecordsDeserializer() {
   private static ConsumerRecordDeserializer createRecordsDeserializer() {
@@ -110,29 +112,29 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         s.deserializer(null, Serde.Target.VALUE),
         s.deserializer(null, Serde.Target.VALUE),
         StringSerde.name(),
         StringSerde.name(),
         s.deserializer(null, Serde.Target.KEY),
         s.deserializer(null, Serde.Target.KEY),
-        s.deserializer(null, Serde.Target.VALUE)
+        s.deserializer(null, Serde.Target.VALUE),
+        msg -> msg
     );
     );
   }
   }
 
 
-  private MessagesProcessing createMessagesProcessing() {
-    return new MessagesProcessing(RECORD_DESERIALIZER, msg -> true, null);
-  }
-
   @Test
   @Test
   void pollNothingOnEmptyTopic() {
   void pollNothingOnEmptyTopic() {
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
-        createMessagesProcessing(),
+        100,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
 
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         100,
         100,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
@@ -152,19 +154,22 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
 
   @Test
   @Test
   void pollFullTopicFromBeginning() {
   void pollFullTopicFromBeginning() {
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null),
         new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
 
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(LATEST, TOPIC, List.of(), null, null),
         new ConsumerPosition(LATEST, TOPIC, List.of(), null, null),
         PARTITIONS * MSGS_PER_PARTITION,
         PARTITIONS * MSGS_PER_PARTITION,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
@@ -183,21 +188,24 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       targetOffsets.put(new TopicPartition(TOPIC, i), offset);
       targetOffsets.put(new TopicPartition(TOPIC, i), offset);
     }
     }
 
 
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(FROM_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
         new ConsumerPosition(FROM_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
-            new Offsets(null, targetOffsets)),
-        createMessagesProcessing(),
+            new ConsumerPosition.Offsets(null, targetOffsets)),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
 
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
         new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
-            new Offsets(null, targetOffsets)),
+            new ConsumerPosition.Offsets(null, targetOffsets)),
         PARTITIONS * MSGS_PER_PARTITION,
         PARTITIONS * MSGS_PER_PARTITION,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
@@ -223,10 +231,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     //choosing ts in the middle
     //choosing ts in the middle
     long targetTimestamp = tsStats.getMin() + ((tsStats.getMax() - tsStats.getMin()) / 2);
     long targetTimestamp = tsStats.getMin() + ((tsStats.getMax() - tsStats.getMin()) / 2);
 
 
-    var forwardEmitter = new ForwardRecordEmitter(
+    var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(FROM_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         new ConsumerPosition(FROM_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
@@ -239,11 +249,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
             .collect(Collectors.toList())
             .collect(Collectors.toList())
     );
     );
 
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(TO_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         new ConsumerPosition(TO_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         PARTITIONS * MSGS_PER_PARTITION,
         PARTITIONS * MSGS_PER_PARTITION,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
@@ -265,12 +276,13 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION);
       targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION);
     }
     }
 
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
         new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
-            new Offsets(null, targetOffsets)),
+            new ConsumerPosition.Offsets(null, targetOffsets)),
         numMessages,
         numMessages,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
@@ -293,11 +305,13 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       offsets.put(new TopicPartition(TOPIC, i), 0L);
       offsets.put(new TopicPartition(TOPIC, i), 0L);
     }
     }
 
 
-    var backwardEmitter = new BackwardRecordEmitter(
+    var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         this::createConsumer,
-        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null, new Offsets(null, offsets)),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null,
+            new ConsumerPosition.Offsets(null, offsets)),
         100,
         100,
-        createMessagesProcessing(),
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
         CURSOR_MOCK
     );
     );
@@ -338,22 +352,20 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
     assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
   }
   }
 
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer() {
+  private EnhancedConsumer createConsumer() {
     return createConsumer(Map.of());
     return createConsumer(Map.of());
   }
   }
 
 
-  private KafkaConsumer<Bytes, Bytes> createConsumer(Map<String, Object> properties) {
+  private EnhancedConsumer createConsumer(Map<String, Object> properties) {
     final Map<String, ? extends Serializable> map = Map.of(
     final Map<String, ? extends Serializable> map = Map.of(
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
         ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
         ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
-        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls
-        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
+        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19 // to check multiple polls
     );
     );
     Properties props = new Properties();
     Properties props = new Properties();
     props.putAll(map);
     props.putAll(map);
     props.putAll(properties);
     props.putAll(properties);
-    return new KafkaConsumer<>(props);
+    return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
   }
   }
 
 
   @Value
   @Value