فهرست منبع

merged with master

iliax 1 سال پیش
والد
کامیت
d2e6e6a509

+ 5 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -104,38 +104,18 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                              String valueSerde,
                                                                              String cursor,
                                                                              ServerWebExchange exchange) {
-    var context = AccessContext.builder()
+    var contextBuilder = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_READ)
-        .operationName("getTopicMessages")
-        .build();
+        .operationName("getTopicMessages");
 
     if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
       contextBuilder.auditActions(AuditAction.VIEW);
     }
 
-    seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
-    seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
-    filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
-
-    var positions = new ConsumerPosition(
-        seekType,
-        topicName,
-        parseSeekTo(topicName, seekType, seekTo)
-    );
-    Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
-        ResponseEntity.ok(
-            messagesService.loadMessages(
-                getCluster(clusterName), topicName, positions, q, filterQueryType,
-                limit, seekDirection, keySerde, valueSerde)
-        )
-    );
+    var accessContext = contextBuilder.build();
 
-    var context = contextBuilder.build();
-    return validateAccess(context)
-        .then(job)
-        .doOnEach(sig -> audit(context, sig));
     Flux<TopicMessageEventDTO> messagesFlux;
     if (cursor != null) {
       messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
@@ -151,9 +131,9 @@ public class MessagesController extends AbstractController implements MessagesAp
           valueSerde
       );
     }
-    return accessControlService.validateAccess(context)
+    return accessControlService.validateAccess(accessContext)
         .then(Mono.just(ResponseEntity.ok(messagesFlux)))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> auditService.audit(accessContext, sig));
   }
 
   @Override

+ 5 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import jakarta.annotation.Nullable;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
@@ -25,8 +26,10 @@ abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<T
     return messagesProcessing.limitReached();
   }
 
-  protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
-    messagesProcessing.send(sink, records);
+  protected void send(FluxSink<TopicMessageEventDTO> sink,
+                      Iterable<ConsumerRecord<Bytes, Bytes>> records,
+                      @Nullable Cursor.Tracking cursor) {
+    messagesProcessing.send(sink, records, cursor);
   }
 
   protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {

+ 5 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java

@@ -18,18 +18,15 @@ public class BackwardEmitter extends RangePollingEmitter {
                          int messagesPerPage,
                          ConsumerRecordDeserializer deserializer,
                          Predicate<TopicMessageDTO> filter,
-                         PollingSettings pollingSettings) {
+                         PollingSettings pollingSettings,
+                         Cursor.Tracking cursor) {
     super(
         consumerSupplier,
         consumerPosition,
         messagesPerPage,
-        new MessagesProcessing(
-            deserializer,
-            filter,
-            false,
-            messagesPerPage
-        ),
-        pollingSettings
+        new MessagesProcessing(deserializer, filter, false, messagesPerPage),
+        pollingSettings,
+        cursor
     );
   }
 

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -28,8 +28,7 @@ class ConsumingStats {
     filterApplyErrors++;
   }
 
-
-  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors, @Nullable Cursor.Tracking cursor) {
+  void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
     sink.next(
         new TopicMessageEventDTO()
             .type(TopicMessageEventDTO.TypeEnum.DONE)
@@ -38,7 +37,7 @@ class ConsumingStats {
                     ? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
                     : null
             )
-            .consuming(createConsumingStats(sink, filterApplyErrors))
+            .consuming(createConsumingStats())
     );
   }
 

+ 31 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.emitter;
 
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
@@ -20,32 +22,41 @@ public record Cursor(ConsumerRecordDeserializer deserializer,
     private final ConsumerPosition originalPosition;
     private final Predicate<TopicMessageDTO> filter;
     private final int limit;
-    private final Function<Cursor, String> cursorRegistry;
+    private final Function<Cursor, String> registerAction;
 
-    private final Map<TopicPartition, Long> trackingOffsets = new HashMap<>();
+    //topic -> partition -> offset
+    private final Table<String, Integer, Long> trackingOffsets = HashBasedTable.create();
 
     public Tracking(ConsumerRecordDeserializer deserializer,
                     ConsumerPosition originalPosition,
                     Predicate<TopicMessageDTO> filter,
                     int limit,
-                    Function<Cursor, String> cursorRegistry) {
+                    Function<Cursor, String> registerAction) {
       this.deserializer = deserializer;
       this.originalPosition = originalPosition;
       this.filter = filter;
       this.limit = limit;
-      this.cursorRegistry = cursorRegistry;
+      this.registerAction = registerAction;
     }
 
-    void trackOffset(TopicPartition tp, long offset) {
-      trackingOffsets.put(tp, offset);
+    void trackOffset(String topic, int partition, long offset) {
+      trackingOffsets.put(topic, partition, offset);
     }
 
-    void trackOffsets(Map<TopicPartition, Long> offsets) {
-      this.trackingOffsets.putAll(offsets);
+    void initOffsets(Map<TopicPartition, Long> initialSeekOffsets) {
+      initialSeekOffsets.forEach((tp, off) -> trackOffset(tp.topic(), tp.partition(), off));
+    }
+
+    private Map<TopicPartition, Long> getOffsetsMap(int offsetToAdd) {
+      Map<TopicPartition, Long> result = new HashMap<>();
+      trackingOffsets.rowMap()
+          .forEach((topic, partsMap) ->
+              partsMap.forEach((p, off) -> result.put(new TopicPartition(topic, p), off + offsetToAdd)));
+      return result;
     }
 
     String registerCursor() {
-      return cursorRegistry.apply(
+      return registerAction.apply(
           new Cursor(
               deserializer,
               new ConsumerPosition(
@@ -57,7 +68,17 @@ public record Cursor(ConsumerRecordDeserializer deserializer,
                   originalPosition.topic(),
                   originalPosition.partitions(),
                   null,
-                  new ConsumerPosition.Offsets(null, trackingOffsets)
+                  new ConsumerPosition.Offsets(
+                      null,
+                      getOffsetsMap(
+                          switch (originalPosition.pollingMode()) {
+                            case TO_OFFSET, TO_TIMESTAMP, LATEST -> 0;
+                            // when doing forward polling we need to start from latest msg's offset + 1
+                            case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> 1;
+                            case TAILING -> throw new IllegalStateException();
+                          }
+                      )
+                  )
               ),
               filter,
               limit

+ 5 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java

@@ -18,18 +18,15 @@ public class ForwardEmitter extends RangePollingEmitter {
                         int messagesPerPage,
                         ConsumerRecordDeserializer deserializer,
                         Predicate<TopicMessageDTO> filter,
-                        PollingSettings pollingSettings) {
+                        PollingSettings pollingSettings,
+                        Cursor.Tracking cursor) {
     super(
         consumerSupplier,
         consumerPosition,
         messagesPerPage,
-        new MessagesProcessing(
-            deserializer,
-            filter,
-            true,
-            messagesPerPage
-        ),
-        pollingSettings
+        new MessagesProcessing(deserializer, filter, true, messagesPerPage),
+        pollingSettings,
+        cursor
     );
   }
 

+ 6 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java

@@ -39,7 +39,9 @@ class MessagesProcessing {
     return limit != null && sentMessages >= limit;
   }
 
-  void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
+  void send(FluxSink<TopicMessageEventDTO> sink,
+            Iterable<ConsumerRecord<Bytes, Bytes>> polled,
+            @Nullable Cursor.Tracking cursor) {
     sortForSending(polled, ascendingSortBeforeSend)
         .forEach(rec -> {
           if (!limitReached() && !sink.isCancelled()) {
@@ -53,6 +55,9 @@ class MessagesProcessing {
                 );
                 sentMessages++;
               }
+              if (cursor != null) {
+                cursor.trackOffset(rec.topic(), rec.partition(), rec.offset());
+              }
             } catch (Exception e) {
               consumingStats.incFilterApplyError();
               log.trace("Error applying filter for message {}", topicMessage);

+ 9 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java

@@ -17,6 +17,7 @@ import reactor.core.publisher.FluxSink;
 abstract class RangePollingEmitter extends AbstractEmitter {
 
   private final Supplier<EnhancedConsumer> consumerSupplier;
+  private final Cursor.Tracking cursor;
   protected final ConsumerPosition consumerPosition;
   protected final int messagesPerPage;
 
@@ -24,11 +25,13 @@ abstract class RangePollingEmitter extends AbstractEmitter {
                                 ConsumerPosition consumerPosition,
                                 int messagesPerPage,
                                 MessagesProcessing messagesProcessing,
-                                PollingSettings pollingSettings) {
+                                PollingSettings pollingSettings,
+                                Cursor.Tracking cursor) {
     super(messagesProcessing, pollingSettings);
     this.consumerPosition = consumerPosition;
     this.messagesPerPage = messagesPerPage;
     this.consumerSupplier = consumerSupplier;
+    this.cursor = cursor;
   }
 
   protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
@@ -46,18 +49,20 @@ abstract class RangePollingEmitter extends AbstractEmitter {
     try (EnhancedConsumer consumer = consumerSupplier.get()) {
       sendPhase(sink, "Consumer created");
       var seekOperations = SeekOperations.create(consumer, consumerPosition);
+      cursor.initOffsets(seekOperations.getOffsetsForSeek());
+
       TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
       log.debug("Starting from offsets {}", pollRange);
 
-      while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
+      while (!sink.isCancelled() && !pollRange.isEmpty() && !isSendLimitReached()) {
         var polled = poll(consumer, sink, pollRange);
-        send(sink, polled);
+        send(sink, polled, cursor);
         pollRange = nextPollingRange(pollRange, seekOperations);
       }
       if (sink.isCancelled()) {
         log.debug("Polling finished due to sink cancellation");
       }
-      sendFinishStatsAndCompleteSink(sink);
+      sendFinishStatsAndCompleteSink(sink, pollRange.isEmpty() ? null : cursor);
       log.debug("Polling finished");
     } catch (InterruptException kafkaInterruptException) {
       log.debug("Polling finished due to thread interruption");

+ 3 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java

@@ -21,7 +21,7 @@ public class SeekOperations {
   private final OffsetsInfo offsetsInfo;
   private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
 
-  static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
+  public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
     OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty()
         ? new OffsetsInfo(consumer, consumerPosition.topic())
         : new OffsetsInfo(consumer, consumerPosition.partitions());
@@ -29,7 +29,7 @@ public class SeekOperations {
     return new SeekOperations(consumer, offsetsInfo, offsetsToSeek);
   }
 
-  void assignAndSeek() {
+  public void assignAndSeekNonEmptyPartitions() {
     consumer.assign(offsetsForSeek.keySet());
     offsetsForSeek.forEach(consumer::seek);
   }
@@ -86,8 +86,7 @@ public class SeekOperations {
     if (positionOffset.offset() != null) {
       offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
     } else {
-      requireNonNull(positionOffset.tpOffsets());
-      offsets.putAll(positionOffset.tpOffsets());
+      offsets.putAll(requireNonNull(positionOffset.tpOffsets()));
       offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
     }
 

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -1,8 +1,11 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.HashMap;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.errors.InterruptException;
@@ -32,7 +35,7 @@ public class TailingEmitter extends AbstractEmitter {
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");
         var polled = poll(sink, consumer);
-        send(sink, polled);
+        send(sink, polled, null);
       }
       sink.complete();
       log.debug("Tailing finished");

+ 36 - 18
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -6,23 +6,21 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.emitter.BackwardEmitter;
 import com.provectus.kafka.ui.emitter.Cursor;
+import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilters;
-import com.provectus.kafka.ui.emitter.MessagesProcessing;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.PollingModeDTO;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
 import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
@@ -250,48 +248,50 @@ public class MessagesService {
     return withExistingTopic(cluster, topic)
         .flux()
         .publishOn(Schedulers.boundedElastic())
-        .flatMap(td -> loadMessagesImpl(cluster, topic, deserializer, consumerPosition, filter, limit));
+        .flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit));
   }
 
   private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
-                                                      String topic,
                                                       ConsumerRecordDeserializer deserializer,
                                                       ConsumerPosition consumerPosition,
                                                       Predicate<TopicMessageDTO> filter,
                                                       int limit) {
-    var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
-    var filter = getMsgFilter(query, filterQueryType);
     var emitter = switch (consumerPosition.pollingMode()) {
       case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           limit,
-          processing,
+          deserializer,
+          filter,
           cluster.getPollingSettings(),
-          new Cursor.Tracking(deserializer, consumerPosition, filter, limit, cursorsStorage::register)
+          cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
       );
       case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          processing,
+          limit,
+          deserializer,
+          filter,
           cluster.getPollingSettings(),
-          new Cursor.Tracking(deserializer, consumerPosition, filter, limit, cursorsStorage::register)
+          cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
       );
       case TAILING -> new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          processing,
+          deserializer,
+          filter,
           cluster.getPollingSettings()
       );
     };
     return Flux.create(emitter)
-        .map(throttleUiPublish(seekDirection));
+        .map(throttleUiPublish(consumerPosition.pollingMode()));
   }
 
-  private Predicate<TopicMessageDTO> getMsgFilter(String query,
-                                                  MessageFilterTypeDTO filterQueryType) {
-    if (StringUtils.isEmpty(query)) {
-      return evt -> true;
+  private Predicate<TopicMessageDTO> getMsgFilter(@Nullable String containsStrFilter,
+                                                  @Nullable String smartFilterId) {
+    Predicate<TopicMessageDTO> messageFilter = MessageFilters.noop();
+    if (containsStrFilter != null) {
+      messageFilter = messageFilter.and(MessageFilters.containsStringFilter(containsStrFilter));
     }
     if (smartFilterId != null) {
       var registered = registeredFilters.getIfPresent(smartFilterId);
@@ -316,4 +316,22 @@ public class MessagesService {
     return UnaryOperator.identity();
   }
 
+  private int fixPageSize(@Nullable Integer pageSize) {
+    return Optional.ofNullable(pageSize)
+        .filter(ps -> ps > 0 && ps <= maxPageSize)
+        .orElse(defaultPageSize);
+  }
+
+  public String registerMessageFilter(String groovyCode) {
+    String saltedCode = groovyCode + SALT_FOR_HASHING;
+    String filterId = Hashing.sha256()
+        .hashString(saltedCode, Charsets.UTF_8)
+        .toString()
+        .substring(0, 8);
+    if (registeredFilters.getIfPresent(filterId) == null) {
+      registeredFilters.put(filterId, MessageFilters.groovyScriptFilter(groovyCode));
+    }
+    return filterId;
+  }
+
 }

+ 12 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java

@@ -4,8 +4,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.provectus.kafka.ui.emitter.Cursor;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Predicate;
 import org.apache.commons.lang3.RandomStringUtils;
 
 public class PollingCursorsStorage {
@@ -16,6 +20,14 @@ public class PollingCursorsStorage {
       .maximumSize(MAX_SIZE)
       .build();
 
+
+  public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer,
+                                         ConsumerPosition originalPosition,
+                                         Predicate<TopicMessageDTO> filter,
+                                         int limit) {
+    return new Cursor.Tracking(deserializer, originalPosition, filter, limit, this::register);
+  }
+
   public Optional<Cursor> getCursor(String id) {
     return Optional.ofNullable(cursorsCache.getIfPresent(id));
   }

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,6 +1,6 @@
 package com.provectus.kafka.ui.service.analyze;
 
-import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
+import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST;
 
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.SeekOperations;
@@ -14,6 +14,7 @@ import java.io.Closeable;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import lombok.RequiredArgsConstructor;
@@ -104,7 +105,8 @@ public class TopicAnalysisService {
         consumer.partitionsFor(topicId.topicName)
             .forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
 
-        var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
+        var seekOperations =
+            SeekOperations.create(consumer, new ConsumerPosition(EARLIEST, topicId.topicName, List.of(), null, null));
         long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
         seekOperations.assignAndSeekNonEmptyPartitions();
 

+ 21 - 20
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java

@@ -117,56 +117,56 @@ class CursorTest extends AbstractIntegrationTest {
         .verifyComplete();
   }
 
-  private BackwardRecordEmitter createBackwardEmitter(ConsumerPosition position) {
-    return new BackwardRecordEmitter(
+  private BackwardEmitter createBackwardEmitter(ConsumerPosition position) {
+    return new BackwardEmitter(
         this::createConsumer,
         position,
         PAGE_SIZE,
-        new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
+        createRecordsDeserializer(),
+        m -> true,
         PollingSettings.createDefault(),
         createCursor(position)
     );
   }
 
-  private BackwardRecordEmitter createBackwardEmitterWithCursor(Cursor cursor) {
-    return new BackwardRecordEmitter(
+  private BackwardEmitter createBackwardEmitterWithCursor(Cursor cursor) {
+    return new BackwardEmitter(
         this::createConsumer,
         cursor.consumerPosition(),
         cursor.limit(),
-        new MessagesProcessing(cursor.deserializer(), cursor.filter(), PAGE_SIZE),
+        cursor.deserializer(),
+        cursor.filter(),
         PollingSettings.createDefault(),
         createCursor(cursor.consumerPosition())
     );
   }
 
-  private ForwardRecordEmitter createForwardEmitterWithCursor(Cursor cursor) {
-    return new ForwardRecordEmitter(
+  private ForwardEmitter createForwardEmitterWithCursor(Cursor cursor) {
+    return new ForwardEmitter(
         this::createConsumer,
         cursor.consumerPosition(),
-        new MessagesProcessing(cursor.deserializer(), cursor.filter(), PAGE_SIZE),
+        cursor.limit(),
+        cursor.deserializer(),
+        cursor.filter(),
         PollingSettings.createDefault(),
         createCursor(cursor.consumerPosition())
     );
   }
 
-  private ForwardRecordEmitter createForwardEmitter(ConsumerPosition position) {
-    return new ForwardRecordEmitter(
+  private ForwardEmitter createForwardEmitter(ConsumerPosition position) {
+    return new ForwardEmitter(
         this::createConsumer,
         position,
-        new MessagesProcessing(createRecordsDeserializer(), m -> true, PAGE_SIZE),
+        PAGE_SIZE,
+        createRecordsDeserializer(),
+        m -> true,
         PollingSettings.createDefault(),
         createCursor(position)
     );
   }
 
   private Cursor.Tracking createCursor(ConsumerPosition position) {
-    return new Cursor.Tracking(
-        createRecordsDeserializer(),
-        position,
-        m -> true,
-        PAGE_SIZE,
-        cursorsStorage::register
-    );
+    return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE);
   }
 
   private EnhancedConsumer createConsumer() {
@@ -187,7 +187,8 @@ class CursorTest extends AbstractIntegrationTest {
         s.deserializer(null, Serde.Target.VALUE),
         StringSerde.name(),
         s.deserializer(null, Serde.Target.KEY),
-        s.deserializer(null, Serde.Target.VALUE)
+        s.deserializer(null, Serde.Target.VALUE),
+        msg -> msg
     );
   }
 

+ 25 - 42
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -10,6 +10,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.emitter.BackwardEmitter;
+import com.provectus.kafka.ui.emitter.Cursor;
 import com.provectus.kafka.ui.emitter.EnhancedConsumer;
 import com.provectus.kafka.ui.emitter.ForwardEmitter;
 import com.provectus.kafka.ui.emitter.PollingSettings;
@@ -120,13 +121,10 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   void pollNothingOnEmptyTopic() {
     var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(EARLIEST, EMPTY_TOPIC, null),
+        new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         100,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
-        new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
-        createMessagesProcessing(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
     );
@@ -135,12 +133,10 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         100,
-        createMessagesProcessing(),
-        PollingSettings.createDefault(),
-        CURSOR_MOCK,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     StepVerifier.create(Flux.create(forwardEmitter))
@@ -161,7 +157,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         CURSOR_MOCK
     );
@@ -170,12 +168,10 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(LATEST, TOPIC, List.of(), null, null),
         PARTITIONS * MSGS_PER_PARTITION,
-        createMessagesProcessing(),
-        PollingSettings.createDefault(),
-        CURSOR_MOCK
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
@@ -195,28 +191,23 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(FROM_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
-            new Offsets(null, targetOffsets)),
-        createMessagesProcessing(),
-        PollingSettings.createDefault(),
-        CURSOR_MOCK
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
+            new ConsumerPosition.Offsets(null, targetOffsets)),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
-            new Offsets(null, targetOffsets)),
+            new ConsumerPosition.Offsets(null, targetOffsets)),
         PARTITIONS * MSGS_PER_PARTITION,
-        createMessagesProcessing(),
-        PollingSettings.createDefault(),
-        CURSOR_MOCK
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -243,7 +234,9 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardEmitter(
         this::createConsumer,
         new ConsumerPosition(FROM_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
-        createMessagesProcessing(),
+        PARTITIONS * MSGS_PER_PARTITION,
+        RECORD_DESERIALIZER,
+        NOOP_FILTER,
         PollingSettings.createDefault(),
         CURSOR_MOCK
     );
@@ -254,23 +247,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
             .filter(r -> r.getTimestamp() >= targetTimestamp)
             .map(Record::getValue)
             .collect(Collectors.toList())
-        new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
-        PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER,
-        NOOP_FILTER,
-        PollingSettings.createDefault()
     );
 
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(TO_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         PARTITIONS * MSGS_PER_PARTITION,
-        createMessagesProcessing(),
-        PollingSettings.createDefault(),
-        CURSOR_MOCK
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     expectEmitter(
@@ -293,14 +279,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
         new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
-            new Offsets(null, targetOffsets)),
+            new ConsumerPosition.Offsets(null, targetOffsets)),
         numMessages,
-        createMessagesProcessing(),
-        PollingSettings.createDefault(),
-        CURSOR_MOCK
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
+        PollingSettings.createDefault(),
+        CURSOR_MOCK
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -323,12 +307,11 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
     var backwardEmitter = new BackwardEmitter(
         this::createConsumer,
-        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null, new Offsets(null, offsets)),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null,
+            new ConsumerPosition.Offsets(null, offsets)),
         100,
         RECORD_DESERIALIZER,
         NOOP_FILTER,
-        PollingSettings.createDefault()
-        createMessagesProcessing(),
         PollingSettings.createDefault(),
         CURSOR_MOCK
     );