iliax 2 年之前
父節點
當前提交
e4d16b16c6

+ 4 - 9
documentation/compose/kafka-ui.yaml

@@ -20,11 +20,6 @@ services:
       KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
-      KAFKA_CLUSTERS_1_NAME: secondLocal
-      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
-      KAFKA_CLUSTERS_1_METRICS_PORT: 9998
-      KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
-      DYNAMIC_CONFIG_ENABLED: 'true'
 
   kafka0:
     image: confluentinc/cp-kafka:7.2.1
@@ -45,7 +40,7 @@ services:
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
       KAFKA_PROCESS_ROLES: 'broker,controller'
       KAFKA_NODE_ID: 1
-      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
+      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093,2@kafka1:29093'
       KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092'
       KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
       KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
@@ -62,7 +57,7 @@ services:
       - "9093:9092"
       - "9998:9998"
     environment:
-      KAFKA_BROKER_ID: 1
+      KAFKA_BROKER_ID: 2
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
       KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092'
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
@@ -72,8 +67,8 @@ services:
       KAFKA_JMX_PORT: 9998
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998
       KAFKA_PROCESS_ROLES: 'broker,controller'
-      KAFKA_NODE_ID: 1
-      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093'
+      KAFKA_NODE_ID: 2
+      KAFKA_CONTROLLER_QUORUM_VOTERS: '2@kafka1:29093,1@kafka0:29093'
       KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092'
       KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
       KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'

+ 69 - 35
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -11,19 +11,20 @@ import com.provectus.kafka.ui.api.MessagesApi;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
+import com.provectus.kafka.ui.model.MessageFilterIdDTO;
+import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
 import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
 import com.provectus.kafka.ui.model.SeekTypeDTO;
 import com.provectus.kafka.ui.model.SerdeUsageDTO;
-import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
-import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
-import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.MessagesService;
+import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -45,37 +46,34 @@ import reactor.core.scheduler.Schedulers;
 @Slf4j
 public class MessagesController extends AbstractController implements MessagesApi {
 
+  private static final int MAX_LOAD_RECORD_LIMIT = 100;
+  private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
+
   private final MessagesService messagesService;
   private final DeserializationService deserializationService;
+  private final AccessControlService accessControlService;
 
   @Override
   public Mono<ResponseEntity<Void>> deleteTopicMessages(
       String clusterName, String topicName, @Valid List<Integer> partitions,
       ServerWebExchange exchange) {
 
-    var context = AccessContext.builder()
+    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_DELETE)
-        .build();
+        .build());
 
-    return validateAccess(context).<ResponseEntity<Void>>then(
+    return validateAccess.then(
         messagesService.deleteTopicMessages(
             getCluster(clusterName),
             topicName,
             Optional.ofNullable(partitions).orElse(List.of())
         ).thenReturn(ResponseEntity.ok().build())
-    ).doOnEach(sig -> audit(context, sig));
-  }
-
-  @Override
-  public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilterTest(
-      Mono<SmartFilterTestExecutionDTO> smartFilterTestExecutionDto, ServerWebExchange exchange) {
-    return smartFilterTestExecutionDto
-        .map(MessagesService::execSmartFilterTest)
-        .map(ResponseEntity::ok);
+    );
   }
 
+  @Deprecated
   @Override
   public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
                                                                            String topicName,
@@ -88,19 +86,17 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                            String keySerde,
                                                                            String valueSerde,
                                                                            ServerWebExchange exchange) {
-    var contextBuilder = AccessContext.builder()
+    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_READ)
-        .operationName("getTopicMessages");
-
-    if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
-      contextBuilder.auditActions(AuditAction.VIEW);
-    }
+        .build());
 
     seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
     seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
     filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
+    int recordsLimit =
+        Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
 
     var positions = new ConsumerPosition(
         seekType,
@@ -111,14 +107,11 @@ public class MessagesController extends AbstractController implements MessagesAp
         ResponseEntity.ok(
             messagesService.loadMessages(
                 getCluster(clusterName), topicName, positions, q, filterQueryType,
-                limit, seekDirection, keySerde, valueSerde)
+                recordsLimit, seekDirection, keySerde, valueSerde)
         )
     );
 
-    var context = contextBuilder.build();
-    return validateAccess(context)
-        .then(job)
-        .doOnEach(sig -> audit(context, sig));
+    return validateAccess.then(job);
   }
 
   @Override
@@ -126,18 +119,17 @@ public class MessagesController extends AbstractController implements MessagesAp
       String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
       ServerWebExchange exchange) {
 
-    var context = AccessContext.builder()
+    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_PRODUCE)
-        .operationName("sendTopicMessages")
-        .build();
+        .build());
 
-    return validateAccess(context).then(
+    return validateAccess.then(
         createTopicMessage.flatMap(msg ->
             messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
         ).map(ResponseEntity::ok)
-    ).doOnEach(sig -> audit(context, sig));
+    );
   }
 
   /**
@@ -173,12 +165,12 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                  String topicName,
                                                                  SerdeUsageDTO use,
                                                                  ServerWebExchange exchange) {
-    var context = AccessContext.builder()
+
+    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(TopicAction.VIEW)
-        .operationName("getSerdes")
-        .build();
+        .build());
 
     TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
         .key(use == SerdeUsageDTO.SERIALIZE
@@ -188,7 +180,7 @@ public class MessagesController extends AbstractController implements MessagesAp
             ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
             : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
 
-    return validateAccess(context).then(
+    return validateAccess.then(
         Mono.just(dto)
             .subscribeOn(Schedulers.boundedElastic())
             .map(ResponseEntity::ok)
@@ -196,6 +188,48 @@ public class MessagesController extends AbstractController implements MessagesAp
   }
 
 
+  @Override
+  public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
+                                                                             PollingModeDTO mode,
+                                                                             @Nullable List<Integer> partitions,
+                                                                             @Nullable Integer limit,
+                                                                             @Nullable String query,
+                                                                             @Nullable String filterId,
+                                                                             @Nullable String offsetString,
+                                                                             @Nullable Long ts,
+                                                                             @Nullable String ks,
+                                                                             @Nullable String vs,
+                                                                             ServerWebExchange exchange) {
+    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+        .cluster(clusterName)
+        .topic(topicName)
+        .topicActions(MESSAGES_READ)
+        .build());
+
+    int recordsLimit =
+        Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
+
+    return validateAccess.then(
+        Mono.just(
+            ResponseEntity.ok(
+                messagesService.loadMessagesV2(
+                    getCluster(clusterName), topicName, positions, q, filterQueryType,
+                    recordsLimit, seekDirection, keySerde, valueSerde)
+            )
+        )
+    );
+  }
 
+   interface PollingMode {
+    static PollingMode create(PollingModeDTO mode, @Nullable String offsetString, @Nullable Long timestamp) {
+      return null;
+    }
+  }
 
+  @Override
+  public Mono<ResponseEntity<Flux<MessageFilterIdDTO>>> registerFilter(String clusterName, String topicName,
+                                                                       Mono<MessageFilterRegistrationDTO> messageFilterRegistrationDTO,
+                                                                       ServerWebExchange exchange) {
+    return null;
+  }
 }

+ 143 - 99
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -1,9 +1,9 @@
 package com.provectus.kafka.ui.service;
 
 import com.google.common.util.concurrent.RateLimiter;
-import com.provectus.kafka.ui.config.ClustersProperties;
-import com.provectus.kafka.ui.emitter.BackwardEmitter;
-import com.provectus.kafka.ui.emitter.ForwardEmitter;
+import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.MessageFilterStats;
 import com.provectus.kafka.ui.emitter.MessageFilters;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
@@ -12,25 +12,23 @@ import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
-import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
-import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serde.api.Serde;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
+import com.provectus.kafka.ui.util.ResultSizeLimiter;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
-import java.time.Instant;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.admin.OffsetSpec;
@@ -43,39 +41,21 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 @Service
+@RequiredArgsConstructor
 @Slf4j
 public class MessagesService {
 
-  private static final int DEFAULT_MAX_PAGE_SIZE = 500;
-  private static final int DEFAULT_PAGE_SIZE = 100;
   // limiting UI messages rate to 20/sec in tailing mode
-  private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
+  public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
 
   private final AdminClientService adminClientService;
   private final DeserializationService deserializationService;
   private final ConsumerGroupService consumerGroupService;
-  private final int maxPageSize;
-  private final int defaultPageSize;
-
-  public MessagesService(AdminClientService adminClientService,
-                         DeserializationService deserializationService,
-                         ConsumerGroupService consumerGroupService,
-                         ClustersProperties properties) {
-    this.adminClientService = adminClientService;
-    this.deserializationService = deserializationService;
-    this.consumerGroupService = consumerGroupService;
-
-    var pollingProps = Optional.ofNullable(properties.getPolling())
-        .orElseGet(ClustersProperties.PollingProperties::new);
-    this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize())
-        .orElse(DEFAULT_MAX_PAGE_SIZE);
-    this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize())
-        .orElse(DEFAULT_PAGE_SIZE);
-  }
 
   private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
     return adminClientService.get(cluster)
@@ -83,40 +63,6 @@ public class MessagesService {
         .switchIfEmpty(Mono.error(new TopicNotFoundException()));
   }
 
-  public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) {
-    Predicate<TopicMessageDTO> predicate;
-    try {
-      predicate = MessageFilters.createMsgFilter(
-          execData.getFilterCode(),
-          MessageFilterTypeDTO.GROOVY_SCRIPT
-      );
-    } catch (Exception e) {
-      log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e);
-      return new SmartFilterTestExecutionResultDTO()
-          .error("Compilation error : " + e.getMessage());
-    }
-    try {
-      var result = predicate.test(
-          new TopicMessageDTO()
-              .key(execData.getKey())
-              .content(execData.getValue())
-              .headers(execData.getHeaders())
-              .offset(execData.getOffset())
-              .partition(execData.getPartition())
-              .timestamp(
-                  Optional.ofNullable(execData.getTimestampMs())
-                      .map(ts -> OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC))
-                      .orElse(null))
-      );
-      return new SmartFilterTestExecutionResultDTO()
-          .result(result);
-    } catch (Exception e) {
-      log.info("Smart filter {} execution error", execData, e);
-      return new SmartFilterTestExecutionResultDTO()
-          .error("Execution error : " + e.getMessage());
-    }
-  }
-
   public Mono<Void> deleteTopicMessages(KafkaCluster cluster, String topicName,
                                         List<Integer> partitionsToInclude) {
     return withExistingTopic(cluster, topicName)
@@ -163,7 +109,13 @@ public class MessagesService {
             msg.getValueSerde().get()
         );
 
-    try (KafkaProducer<byte[], byte[]> producer = createProducer(cluster, Map.of())) {
+    Properties properties = new Properties();
+    SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
+    properties.putAll(cluster.getProperties());
+    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
+    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+    try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
       ProducerRecord<byte[], byte[]> producerRecord = producerRecordCreator.create(
           topicDescription.name(),
           msg.getPartition(),
@@ -185,23 +137,11 @@ public class MessagesService {
     }
   }
 
-  public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
-                                                             Map<String, Object> additionalProps) {
-    Properties properties = new Properties();
-    SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
-    properties.putAll(cluster.getProperties());
-    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
-    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-    properties.putAll(additionalProps);
-    return new KafkaProducer<>(properties);
-  }
-
   public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
                                                  ConsumerPosition consumerPosition,
                                                  @Nullable String query,
                                                  MessageFilterTypeDTO filterQueryType,
-                                                 @Nullable Integer pageSize,
+                                                 int limit,
                                                  SeekDirectionDTO seekDirection,
                                                  @Nullable String keySerde,
                                                  @Nullable String valueSerde) {
@@ -209,13 +149,7 @@ public class MessagesService {
         .flux()
         .publishOn(Schedulers.boundedElastic())
         .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
-            filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
-  }
-
-  private int fixPageSize(@Nullable Integer pageSize) {
-    return Optional.ofNullable(pageSize)
-        .filter(ps -> ps > 0 && ps <= maxPageSize)
-        .orElse(defaultPageSize);
+            filterQueryType, limit, seekDirection, keySerde, valueSerde));
   }
 
   private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
@@ -228,32 +162,142 @@ public class MessagesService {
                                                       @Nullable String keySerde,
                                                       @Nullable String valueSerde) {
 
-    var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
-    var filter = getMsgFilter(query, filterQueryType);
-    var emitter = switch (seekDirection) {
-      case FORWARD -> new ForwardEmitter(
+    java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
+    ConsumerRecordDeserializer recordDeserializer =
+        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
+    if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
+      emitter = new ForwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
+          consumerPosition,
+          recordDeserializer,
+          cluster.getThrottler().get()
       );
-      case BACKWARD -> new BackwardEmitter(
+    } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
+      emitter = new BackwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
+          consumerPosition,
+          limit,
+          recordDeserializer,
+          cluster.getThrottler().get()
       );
-      case TAILING -> new TailingEmitter(
+    } else {
+      emitter = new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition, deserializer, filter, cluster.getPollingSettings()
+          consumerPosition,
+          recordDeserializer,
+          cluster.getThrottler().get()
       );
-    };
+    }
+    MessageFilterStats filterStats = new MessageFilterStats();
     return Flux.create(emitter)
+        .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
+        .filter(getMsgFilter(query, filterQueryType, filterStats))
+        .map(getDataMasker(cluster, topic))
+        .takeWhile(createTakeWhilePredicate(seekDirection, limit))
         .map(throttleUiPublish(seekDirection));
   }
 
-  private Predicate<TopicMessageDTO> getMsgFilter(String query,
-                                                  MessageFilterTypeDTO filterQueryType) {
+  public Flux<TopicMessageEventDTO> loadMessagesV2(KafkaCluster cluster,
+                                                   String topic,
+                                                   PollingModeDTO pollingMode,
+                                                   @Nullable String query,
+                                                   @Nullable String filterId,
+                                                   int limit,
+                                                   @Nullable String keySerde,
+                                                   @Nullable String valueSerde) {
+    return withExistingTopic(cluster, topic)
+        .flux()
+        .publishOn(Schedulers.boundedElastic())
+        .flatMap(td -> loadMessagesImplV2(cluster, topic, consumerPosition, query,
+            filterQueryType, limit, seekDirection, keySerde, valueSerde));
+  }
+
+  private Flux<TopicMessageEventDTO> loadMessagesImplV2(KafkaCluster cluster,
+                                                        String topic,
+                                                        ConsumerPosition consumerPosition,
+                                                        @Nullable String query,
+                                                        MessageFilterTypeDTO filterQueryType,
+                                                        int limit,
+                                                        SeekDirectionDTO seekDirection,
+                                                        @Nullable String keySerde,
+                                                        @Nullable String valueSerde) {
+
+    java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
+    ConsumerRecordDeserializer recordDeserializer =
+        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
+    if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
+      emitter = new ForwardRecordEmitter(
+          () -> consumerGroupService.createConsumer(cluster),
+          consumerPosition,
+          recordDeserializer,
+          cluster.getThrottler().get()
+      );
+    } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
+      emitter = new BackwardRecordEmitter(
+          () -> consumerGroupService.createConsumer(cluster),
+          consumerPosition,
+          limit,
+          recordDeserializer,
+          cluster.getThrottler().get()
+      );
+    } else {
+      emitter = new TailingEmitter(
+          () -> consumerGroupService.createConsumer(cluster),
+          consumerPosition,
+          recordDeserializer,
+          cluster.getThrottler().get()
+      );
+    }
+    MessageFilterStats filterStats = new MessageFilterStats();
+    return Flux.create(emitter)
+        .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
+        .filter(getMsgFilter(query, filterQueryType, filterStats))
+        .map(getDataMasker(cluster, topic))
+        .takeWhile(createTakeWhilePredicate(seekDirection, limit))
+        .map(throttleUiPublish(seekDirection));
+  }
+
+  private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
+      SeekDirectionDTO seekDirection, int limit) {
+    return seekDirection == SeekDirectionDTO.TAILING
+        ? evt -> true // no limit for tailing
+        : new ResultSizeLimiter(limit);
+  }
+
+  private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
+    var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
+    var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
+    return evt -> {
+      if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
+        return evt;
+      }
+      return evt.message(
+        evt.getMessage()
+            .key(keyMasker.apply(evt.getMessage().getKey()))
+            .content(valMasker.apply(evt.getMessage().getContent())));
+    };
+  }
+
+  private Predicate<TopicMessageEventDTO> getMsgFilter(String query,
+                                                       MessageFilterTypeDTO filterQueryType,
+                                                       MessageFilterStats filterStats) {
     if (StringUtils.isEmpty(query)) {
       return evt -> true;
     }
-    return MessageFilters.createMsgFilter(query, filterQueryType);
+    var messageFilter = MessageFilters.createMsgFilter(query, filterQueryType);
+    return evt -> {
+      // we only apply filter for message events
+      if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) {
+        try {
+          return messageFilter.test(evt.getMessage());
+        } catch (Exception e) {
+          filterStats.incrementApplyErrors();
+          log.trace("Error applying filter '{}' for message {}", query, evt.getMessage());
+          return false;
+        }
+      }
+      return true;
+    };
   }
 
   private <T> UnaryOperator<T> throttleUiPublish(SeekDirectionDTO seekDirection) {

+ 12 - 8
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -797,6 +797,7 @@ paths:
         - name: m
           in: query
           description: Messages polling mode
+          required: true
           schema:
             $ref: "#/components/schemas/PollingMode"
         - name: p
@@ -821,18 +822,11 @@ paths:
           description: filter id, that was registered beforehand
           schema:
             type: string
-        - name: off
-          in: query
-          description: offset to read from / to
-          schema:
-            type: integer
-            format: int64
         - name: offs
           in: query
           description: partition offsets to read from / to. Format is "p1:off1,p2:off2,..."
           schema:
-            type: integer
-            format: int64
+            type: string
         - name: ts
           in: query
           description: timestamp (in ms) to read from / to
@@ -2576,6 +2570,7 @@ components:
             - MESSAGE
             - CONSUMING
             - DONE
+            - CURSOR
             - EMIT_THROTTLING
         message:
           $ref: "#/components/schemas/TopicMessage"
@@ -2583,6 +2578,8 @@ components:
           $ref: "#/components/schemas/TopicMessagePhase"
         consuming:
           $ref: "#/components/schemas/TopicMessageConsuming"
+        cursor:
+          $ref: "#/components/schemas/TopicMessageNextPageCursor"
 
     TopicMessagePhase:
       type: object
@@ -2612,6 +2609,13 @@ components:
         filterApplyErrors:
           type: integer
 
+    TopicMessageNextPageCursor:
+      type: object
+      properties:
+        offsetsString:
+          type: string
+        pollingMode:
+          $ref: "#/components/schemas/PollingMode"
 
     TopicMessage:
       type: object