iliax 2 年之前
父节点
当前提交
306c1fb1b7

+ 21 - 75
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -5,10 +5,8 @@ import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_
 import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
 import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
 import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
-import static java.util.stream.Collectors.toMap;
 
 import com.provectus.kafka.ui.api.MessagesApi;
-import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.MessageFilterIdDTO;
@@ -26,14 +24,11 @@ import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import javax.validation.Valid;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -86,32 +81,7 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                            String keySerde,
                                                                            String valueSerde,
                                                                            ServerWebExchange exchange) {
-    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
-        .cluster(clusterName)
-        .topic(topicName)
-        .topicActions(MESSAGES_READ)
-        .build());
-
-    seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
-    seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
-    filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
-    int recordsLimit =
-        Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
-
-    var positions = new ConsumerPosition(
-        seekType,
-        topicName,
-        parseSeekTo(topicName, seekType, seekTo)
-    );
-    Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
-        ResponseEntity.ok(
-            messagesService.loadMessages(
-                getCluster(clusterName), topicName, positions, q, filterQueryType,
-                recordsLimit, seekDirection, keySerde, valueSerde)
-        )
-    );
-
-    return validateAccess.then(job);
+    throw new IllegalStateException();
   }
 
   @Override
@@ -132,34 +102,6 @@ public class MessagesController extends AbstractController implements MessagesAp
     );
   }
 
-  /**
-   * The format is [partition]::[offset] for specifying offsets
-   * or [partition]::[timestamp in millis] for specifying timestamps.
-   */
-  @Nullable
-  private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
-    if (seekTo == null || seekTo.isEmpty()) {
-      if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
-        return null;
-      }
-      throw new ValidationException("seekTo should be set if seekType is " + seekType);
-    }
-    return seekTo.stream()
-        .map(p -> {
-          String[] split = p.split("::");
-          if (split.length != 2) {
-            throw new IllegalArgumentException(
-                "Wrong seekTo argument format. See API docs for details");
-          }
-
-          return Pair.of(
-              new TopicPartition(topic, Integer.parseInt(split[0])),
-              Long.parseLong(split[1])
-          );
-        })
-        .collect(toMap(Pair::getKey, Pair::getValue));
-  }
-
   @Override
   public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
                                                                  String topicName,
@@ -197,8 +139,8 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                              @Nullable String filterId,
                                                                              @Nullable String offsetString,
                                                                              @Nullable Long ts,
-                                                                             @Nullable String ks,
-                                                                             @Nullable String vs,
+                                                                             @Nullable String keySerde,
+                                                                             @Nullable String valueSerde,
                                                                              ServerWebExchange exchange) {
     final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
@@ -206,6 +148,8 @@ public class MessagesController extends AbstractController implements MessagesAp
         .topicActions(MESSAGES_READ)
         .build());
 
+    ConsumerPosition consumerPosition = ConsumerPosition.create(mode, topicName, partitions, ts, offsetString);
+
     int recordsLimit =
         Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
 
@@ -213,23 +157,25 @@ public class MessagesController extends AbstractController implements MessagesAp
         Mono.just(
             ResponseEntity.ok(
                 messagesService.loadMessagesV2(
-                    getCluster(clusterName), topicName, positions, q, filterQueryType,
-                    recordsLimit, seekDirection, keySerde, valueSerde)
-            )
-        )
-    );
+                    getCluster(clusterName), topicName, consumerPosition,
+                    query, filterId, recordsLimit, keySerde, valueSerde))));
   }
 
-   interface PollingMode {
-    static PollingMode create(PollingModeDTO mode, @Nullable String offsetString, @Nullable Long timestamp) {
-      return null;
-    }
-  }
 
   @Override
-  public Mono<ResponseEntity<Flux<MessageFilterIdDTO>>> registerFilter(String clusterName, String topicName,
-                                                                       Mono<MessageFilterRegistrationDTO> messageFilterRegistrationDTO,
-                                                                       ServerWebExchange exchange) {
-    return null;
+  public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
+                                                                 String topicName,
+                                                                 Mono<MessageFilterRegistrationDTO> registration,
+                                                                 ServerWebExchange exchange) {
+
+    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+        .cluster(clusterName)
+        .topic(topicName)
+        .topicActions(MESSAGES_READ)
+        .build());
+
+    return validateAccess.then(registration)
+        .map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
+        .map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
   }
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -12,7 +12,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
-public abstract class AbstractEmitter {
+public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
   private final ConsumerRecordDeserializer recordDeserializer;
   private final ConsumingStats consumingStats = new ConsumingStats();

+ 1 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -19,9 +19,7 @@ import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public class BackwardRecordEmitter
-    extends AbstractEmitter
-    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
+public class BackwardRecordEmitter extends AbstractEmitter {
 
   private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
   private final ConsumerPosition consumerPosition;

+ 1 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -14,8 +14,7 @@ import reactor.core.publisher.FluxSink;
 
 @Slf4j
 public class ForwardRecordEmitter
-    extends AbstractEmitter
-    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
+    extends AbstractEmitter {
 
   private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
   private final ConsumerPosition position;

+ 2 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java

@@ -1,7 +1,6 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import groovy.json.JsonSlurper;
 import java.util.function.Predicate;
@@ -22,23 +21,12 @@ public class MessageFilters {
   private MessageFilters() {
   }
 
-  public static Predicate<TopicMessageDTO> createMsgFilter(String query, MessageFilterTypeDTO type) {
-    switch (type) {
-      case STRING_CONTAINS:
-        return containsStringFilter(query);
-      case GROOVY_SCRIPT:
-        return groovyScriptFilter(query);
-      default:
-        throw new IllegalStateException("Unknown query type: " + type);
-    }
-  }
-
-  static Predicate<TopicMessageDTO> containsStringFilter(String string) {
+  public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
     return msg -> StringUtils.contains(msg.getKey(), string)
         || StringUtils.contains(msg.getContent(), string);
   }
 
-  static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
+  public static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
     var compiledScript = compileScript(script);
     var jsonSlurper = new JsonSlurper();
     return new Predicate<TopicMessageDTO>() {

+ 47 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java

@@ -1,13 +1,13 @@
 package com.provectus.kafka.ui.emitter;
 
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -22,15 +22,15 @@ class SeekOperations {
 
   static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
     OffsetsInfo offsetsInfo;
-    if (consumerPosition.getSeekTo() == null) {
-      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
+    if (consumerPosition.partitions().isEmpty()) {
+      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.topic());
     } else {
-      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet());
+      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.partitions());
     }
     return new SeekOperations(
         consumer,
         offsetsInfo,
-        getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo())
+        getOffsetsForSeek(consumer, offsetsInfo, consumerPosition)
     );
   }
 
@@ -61,28 +61,34 @@ class SeekOperations {
    */
   @VisibleForTesting
   static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
-                                                             OffsetsInfo offsetsInfo,
-                                                             SeekTypeDTO seekType,
-                                                             @Nullable Map<TopicPartition, Long> seekTo) {
-    switch (seekType) {
-      case LATEST:
+                                                     OffsetsInfo offsetsInfo,
+                                                     ConsumerPosition position) {
+    switch (position.pollingMode()) {
+      case LATEST, TAILING:
         return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
-      case BEGINNING:
+      case EARLIEST:
         return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
-      case OFFSET:
-        Preconditions.checkNotNull(offsetsInfo);
-        return fixOffsets(offsetsInfo, seekTo);
-      case TIMESTAMP:
-        Preconditions.checkNotNull(offsetsInfo);
-        return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
+      case FROM_OFFSET, TO_OFFSET:
+        Preconditions.checkNotNull(position.offsets());
+        return fixOffsets(offsetsInfo, position.offsets());
+      case FROM_TIMESTAMP, TO_TIMESTAMP:
+        Preconditions.checkNotNull(position.timestamp());
+        return offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, position.timestamp());
       default:
         throw new IllegalStateException();
     }
   }
 
-  private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo, Map<TopicPartition, Long> offsets) {
-    offsets = new HashMap<>(offsets);
-    offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+  private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo,
+                                                      ConsumerPosition.Offsets positionOffset) {
+    var offsets = new HashMap<TopicPartition, Long>();
+    if (positionOffset.offset() != null) {
+      offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
+    } else {
+      Preconditions.checkNotNull(positionOffset.tpOffsets());
+      offsets.putAll(positionOffset.tpOffsets());
+      offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+    }
 
     Map<TopicPartition, Long> result = new HashMap<>();
     offsets.forEach((tp, targetOffset) -> {
@@ -99,13 +105,25 @@ class SeekOperations {
     return result;
   }
 
-  private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
-                                                        Map<TopicPartition, Long> timestamps) {
-    timestamps = new HashMap<>(timestamps);
-    timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+  private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer,
+                                                               PollingModeDTO pollingMode,
+                                                               OffsetsInfo offsetsInfo,
+                                                               Long timestamp) {
+    Map<TopicPartition, Long> timestamps = new HashMap<>();
+    offsetsInfo.getNonEmptyPartitions().forEach(tp -> timestamps.put(tp, timestamp));
 
-    return consumer.offsetsForTimes(timestamps).entrySet().stream()
-        .filter(e -> e.getValue() != null)
-        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
+    Map<TopicPartition, Long> result = new HashMap<>();
+    consumer.offsetsForTimes(timestamps).forEach((tp, offsetAndTimestamp) -> {
+      if (offsetAndTimestamp == null) {
+        if (pollingMode == TO_TIMESTAMP && offsetsInfo.getNonEmptyPartitions().contains(tp)) {
+          // if no offset was returned this means that *all* timestamps are lower
+          // than target timestamp. Is case of TO_OFFSET mode we need to read from the ending of tp
+          result.put(tp, offsetsInfo.getEndOffsets().get(tp));
+        }
+      } else {
+        result.put(tp, offsetAndTimestamp.offset());
+      }
+    });
+    return result;
   }
 }

+ 1 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -12,8 +12,7 @@ import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
 @Slf4j
-public class TailingEmitter extends AbstractEmitter
-    implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
+public class TailingEmitter extends AbstractEmitter {
 
   private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
   private final ConsumerPosition consumerPosition;

+ 83 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java

@@ -1,14 +1,90 @@
 package com.provectus.kafka.ui.model;
 
+import static java.util.stream.Collectors.toMap;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
-import lombok.Value;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.common.TopicPartition;
+import org.springframework.util.StringUtils;
+
+
+public record ConsumerPosition(PollingModeDTO pollingMode,
+                               String topic,
+                               List<TopicPartition> partitions, //all partitions if list is empty
+                               @Nullable Long timestamp,
+                               @Nullable Offsets offsets) {
+
+  public record Offsets(@Nullable Long offset,
+                        @Nullable Map<TopicPartition, Long> tpOffsets) {
+  }
+
+  public static ConsumerPosition create(PollingModeDTO pollingMode,
+                                        String topic,
+                                        @Nullable List<Integer> partitions,
+                                        @Nullable Long timestamp,
+                                        @Nullable String offsetsStr) {
+    Offsets offsets = parseAndValidateOffsets(pollingMode, topic, offsetsStr);
+
+    var topicPartitions = Optional.ofNullable(partitions).orElse(List.of())
+        .stream()
+        .map(p -> new TopicPartition(topic, p))
+        .collect(Collectors.toList());
+
+    // if offsets are specified -inferring partitions list from there
+    topicPartitions = offsets.tpOffsets == null ? topicPartitions : List.copyOf(offsets.tpOffsets.keySet());
+
+    return new ConsumerPosition(
+        pollingMode,
+        topic,
+        Optional.ofNullable(topicPartitions).orElse(List.of()),
+        validateTimestamp(pollingMode, timestamp),
+        offsets
+    );
+  }
+
+  private static Long validateTimestamp(PollingModeDTO pollingMode, @Nullable Long ts) {
+    if (pollingMode == PollingModeDTO.FROM_TIMESTAMP || pollingMode == PollingModeDTO.TO_TIMESTAMP) {
+      if (ts == null) {
+        throw new ValidationException("timestamp not provided for " + pollingMode);
+      }
+    }
+    return ts;
+  }
+
+  private static Offsets parseAndValidateOffsets(PollingModeDTO pollingMode,
+                                                 String topic,
+                                                 @Nullable String offsetsStr) {
+    Offsets offsets = null;
+    if (pollingMode == PollingModeDTO.FROM_OFFSET || pollingMode == PollingModeDTO.TO_OFFSET) {
+      if (!StringUtils.hasText(offsetsStr)) {
+        throw new ValidationException("offsets not provided for " + pollingMode);
+      }
+      if (offsetsStr.contains(":")) {
+        offsets = new Offsets(Long.parseLong(offsetsStr), null);
+      } else {
+        Map<TopicPartition, Long> tpOffsets = Stream.of(offsetsStr.split(","))
+            .map(p -> {
+              String[] split = p.split(":");
+              if (split.length != 2) {
+                throw new IllegalArgumentException(
+                    "Wrong seekTo argument format. See API docs for details");
+              }
+              return Pair.of(
+                  new TopicPartition(topic, Integer.parseInt(split[0])),
+                  Long.parseLong(split[1])
+              );
+            })
+            .collect(toMap(Pair::getKey, Pair::getValue));
+        offsets = new Offsets(null, tpOffsets);
+      }
+    }
+    return offsets;
+  }
 
-@Value
-public class ConsumerPosition {
-  SeekTypeDTO seekType;
-  String topic;
-  @Nullable
-  Map<TopicPartition, Long> seekTo; // null if positioning should apply to all tps
 }

+ 48 - 95
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -12,9 +12,8 @@ import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.PollingModeDTO;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
@@ -23,14 +22,16 @@ import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -41,7 +42,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -57,6 +57,8 @@ public class MessagesService {
   private final DeserializationService deserializationService;
   private final ConsumerGroupService consumerGroupService;
 
+  private final Map<String, Predicate<TopicMessageDTO>> registeredFilters = new ConcurrentHashMap<>();
+
   private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
     return adminClientService.get(cluster)
         .flatMap(client -> client.describeTopic(topicName))
@@ -137,69 +139,9 @@ public class MessagesService {
     }
   }
 
-  public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
-                                                 ConsumerPosition consumerPosition,
-                                                 @Nullable String query,
-                                                 MessageFilterTypeDTO filterQueryType,
-                                                 int limit,
-                                                 SeekDirectionDTO seekDirection,
-                                                 @Nullable String keySerde,
-                                                 @Nullable String valueSerde) {
-    return withExistingTopic(cluster, topic)
-        .flux()
-        .publishOn(Schedulers.boundedElastic())
-        .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
-            filterQueryType, limit, seekDirection, keySerde, valueSerde));
-  }
-
-  private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
-                                                      String topic,
-                                                      ConsumerPosition consumerPosition,
-                                                      @Nullable String query,
-                                                      MessageFilterTypeDTO filterQueryType,
-                                                      int limit,
-                                                      SeekDirectionDTO seekDirection,
-                                                      @Nullable String keySerde,
-                                                      @Nullable String valueSerde) {
-
-    java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
-    ConsumerRecordDeserializer recordDeserializer =
-        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
-    if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
-      emitter = new ForwardRecordEmitter(
-          () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          recordDeserializer,
-          cluster.getPollingSettings()
-      );
-    } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
-      emitter = new BackwardRecordEmitter(
-          () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          limit,
-          recordDeserializer,
-          cluster.getPollingSettings()
-      );
-    } else {
-      emitter = new TailingEmitter(
-          () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          recordDeserializer,
-          cluster.getPollingSettings()
-      );
-    }
-    MessageFilterStats filterStats = new MessageFilterStats();
-    return Flux.create(emitter)
-        .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
-        .filter(getMsgFilter(query, filterQueryType, filterStats))
-        .map(getDataMasker(cluster, topic))
-        .takeWhile(createTakeWhilePredicate(seekDirection, limit))
-        .map(throttleUiPublish(seekDirection));
-  }
-
   public Flux<TopicMessageEventDTO> loadMessagesV2(KafkaCluster cluster,
                                                    String topic,
-                                                   PollingModeDTO pollingMode,
+                                                   ConsumerPosition position,
                                                    @Nullable String query,
                                                    @Nullable String filterId,
                                                    int limit,
@@ -208,58 +150,55 @@ public class MessagesService {
     return withExistingTopic(cluster, topic)
         .flux()
         .publishOn(Schedulers.boundedElastic())
-        .flatMap(td -> loadMessagesImplV2(cluster, topic, consumerPosition, query,
-            filterQueryType, limit, seekDirection, keySerde, valueSerde));
+        .flatMap(td -> loadMessagesImplV2(cluster, topic, position, query, filterId, limit, keySerde, valueSerde));
   }
 
   private Flux<TopicMessageEventDTO> loadMessagesImplV2(KafkaCluster cluster,
                                                         String topic,
                                                         ConsumerPosition consumerPosition,
                                                         @Nullable String query,
-                                                        MessageFilterTypeDTO filterQueryType,
+                                                        @Nullable String filterId,
                                                         int limit,
-                                                        SeekDirectionDTO seekDirection,
                                                         @Nullable String keySerde,
                                                         @Nullable String valueSerde) {
 
-    java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
     ConsumerRecordDeserializer recordDeserializer =
         deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
-    if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
-      emitter = new ForwardRecordEmitter(
+
+    var emitter = switch (consumerPosition.pollingMode()) {
+      case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
+          limit,
           recordDeserializer,
-          cluster.getThrottler().get()
+          cluster.getPollingSettings()
       );
-    } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
-      emitter = new BackwardRecordEmitter(
+      case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          limit,
           recordDeserializer,
-          cluster.getThrottler().get()
+          cluster.getPollingSettings()
       );
-    } else {
-      emitter = new TailingEmitter(
+      case TAILING -> new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           recordDeserializer,
-          cluster.getThrottler().get()
+          cluster.getPollingSettings()
       );
-    }
+    };
+
     MessageFilterStats filterStats = new MessageFilterStats();
     return Flux.create(emitter)
         .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
-        .filter(getMsgFilter(query, filterQueryType, filterStats))
+        .filter(getMsgFilter(query, filterId, filterStats))
         .map(getDataMasker(cluster, topic))
-        .takeWhile(createTakeWhilePredicate(seekDirection, limit))
-        .map(throttleUiPublish(seekDirection));
+        .takeWhile(createTakeWhilePredicate(consumerPosition.pollingMode(), limit))
+        .map(throttleUiPublish(consumerPosition.pollingMode()));
   }
 
   private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
-      SeekDirectionDTO seekDirection, int limit) {
-    return seekDirection == SeekDirectionDTO.TAILING
+      PollingModeDTO pollingMode, int limit) {
+    return pollingMode == PollingModeDTO.TAILING
         ? evt -> true // no limit for tailing
         : new ResultSizeLimiter(limit);
   }
@@ -278,21 +217,35 @@ public class MessagesService {
     };
   }
 
-  private Predicate<TopicMessageEventDTO> getMsgFilter(String query,
-                                                       MessageFilterTypeDTO filterQueryType,
+  public String registerMessageFilter(String groovyCode) {
+    var filter = MessageFilters.groovyScriptFilter(groovyCode);
+    var id = RandomStringUtils.random(10, true, true);
+    registeredFilters.put(id, filter);
+    return id;
+  }
+
+  private Predicate<TopicMessageEventDTO> getMsgFilter(@Nullable String containsStrFilter,
+                                                       @Nullable String filterId,
                                                        MessageFilterStats filterStats) {
-    if (StringUtils.isEmpty(query)) {
-      return evt -> true;
+    Predicate<TopicMessageDTO> messageFilter = e -> true;
+    if (containsStrFilter != null) {
+      messageFilter = MessageFilters.containsStringFilter(containsStrFilter);
+    }
+    if (filterId != null) {
+      messageFilter = registeredFilters.get(filterId);
+      if (messageFilter == null) {
+        throw new ValidationException("No filter was registered with id " + filterId);
+      }
     }
-    var messageFilter = MessageFilters.createMsgFilter(query, filterQueryType);
+    Predicate<TopicMessageDTO> finalMessageFilter = messageFilter;
     return evt -> {
       // we only apply filter for message events
       if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) {
         try {
-          return messageFilter.test(evt.getMessage());
+          return finalMessageFilter.test(evt.getMessage());
         } catch (Exception e) {
           filterStats.incrementApplyErrors();
-          log.trace("Error applying filter '{}' for message {}", query, evt.getMessage());
+          log.trace("Error applying filter for message {}", evt.getMessage());
           return false;
         }
       }
@@ -300,8 +253,8 @@ public class MessagesService {
     };
   }
 
-  private <T> UnaryOperator<T> throttleUiPublish(SeekDirectionDTO seekDirection) {
-    if (seekDirection == SeekDirectionDTO.TAILING) {
+  private <T> UnaryOperator<T> throttleUiPublish(PollingModeDTO pollingMode) {
+    if (pollingMode == PollingModeDTO.TAILING) {
       RateLimiter rateLimiter = RateLimiter.create(TAILING_UI_MESSAGE_THROTTLE_RATE);
       return m -> {
         rateLimiter.acquire(1);

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java

@@ -56,7 +56,7 @@ public class KafkaConsumerTests extends AbstractIntegrationTest {
     }
 
     long count = webTestClient.get()
-        .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
+        .uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?m=EARLIEST", LOCAL, topicName)
         .accept(TEXT_EVENT_STREAM)
         .exchange()
         .expectStatus()

+ 22 - 14
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java

@@ -2,7 +2,9 @@ package com.provectus.kafka.ui.emitter;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.PollingModeDTO;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -14,6 +16,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 class SeekOperationsTest {
 
@@ -45,8 +49,7 @@ class SeekOperationsTest {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.LATEST,
-          null
+          new ConsumerPosition(PollingModeDTO.LATEST, topic, null, null, null)
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L));
     }
@@ -56,33 +59,38 @@ class SeekOperationsTest {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.BEGINNING,
-          null
+          new ConsumerPosition(PollingModeDTO.EARLIEST, topic, null, null, null)
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L));
     }
 
-    @Test
-    void offsets() {
+    @ParameterizedTest
+    @CsvSource({"TO_OFFSET", "FROM_OFFSET"})
+    void offsets(PollingModeDTO mode) {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.OFFSET,
-          Map.of(tp1, 10L, tp2, 10L, tp3, 26L)
+          new ConsumerPosition(
+              mode, topic, List.of(tp1, tp2, tp3), null,
+              new ConsumerPosition.Offsets(null, Map.of(tp1, 10L, tp2, 10L, tp3, 26L))
+          )
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 10L, tp3, 26L));
     }
 
-    @Test
-    void offsetsWithBoundsFixing() {
+    @ParameterizedTest
+    @CsvSource({"TO_OFFSET", "FROM_OFFSET"})
+    void offsetsWithBoundsFixing(PollingModeDTO mode) {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.OFFSET,
-          Map.of(tp1, 10L, tp2, 21L, tp3, 24L)
+          new ConsumerPosition(
+              mode, topic, List.of(tp1, tp2, tp3), null,
+              new ConsumerPosition.Offsets(null, Map.of(tp1, 10L, tp2, 21L, tp3, 24L))
+          )
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 25L));
     }
   }
 
-}
+}

+ 7 - 9
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java

@@ -4,10 +4,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.service.ClustersStorage;
 import com.provectus.kafka.ui.service.MessagesService;
 import java.time.Duration;
@@ -110,14 +109,13 @@ class TailingEmitterTest extends AbstractIntegrationTest {
         .get();
 
     return applicationContext.getBean(MessagesService.class)
-        .loadMessages(cluster, topicName,
-            new ConsumerPosition(SeekTypeDTO.LATEST, topic, null),
+        .loadMessagesV2(cluster, topicName,
+            new ConsumerPosition(PollingModeDTO.TAILING, topic, List.of(), null, null),
             query,
-            MessageFilterTypeDTO.STRING_CONTAINS,
+            null,
             0,
-            SeekDirectionDTO.TAILING,
-            "String",
-            "String");
+            StringSerde.name(),
+            StringSerde.name());
   }
 
   private List<TopicMessageEventDTO> startTailing(String filterQuery) {

+ 6 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java

@@ -5,8 +5,7 @@ import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
@@ -55,7 +54,9 @@ class MessagesServiceTest extends AbstractIntegrationTest {
   @Test
   void loadMessagesReturnsExceptionWhenTopicNotFound() {
     StepVerifier.create(messagesService
-            .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String"))
+            .loadMessagesV2(cluster, NON_EXISTING_TOPIC,
+                new ConsumerPosition(PollingModeDTO.TAILING, NON_EXISTING_TOPIC, List.of(), null, null),
+                null, null, 1, "String", "String"))
         .expectError(TopicNotFoundException.class)
         .verify();
   }
@@ -68,14 +69,13 @@ class MessagesServiceTest extends AbstractIntegrationTest {
       producer.send(testTopic, "message1");
       producer.send(testTopic, "message2").get();
 
-      Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
+      Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessagesV2(
           cluster,
           testTopic,
-          new ConsumerPosition(SeekTypeDTO.BEGINNING, testTopic, null),
+          new ConsumerPosition(PollingModeDTO.EARLIEST, testTopic, List.of(), null, null),
           null,
           null,
           100,
-          SeekDirectionDTO.FORWARD,
           StringSerde.name(),
           StringSerde.name()
       ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)

+ 40 - 40
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -1,9 +1,11 @@
 package com.provectus.kafka.ui.service;
 
-import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.LATEST;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.OFFSET;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP;
+import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST;
+import static com.provectus.kafka.ui.model.PollingModeDTO.FROM_OFFSET;
+import static com.provectus.kafka.ui.model.PollingModeDTO.FROM_TIMESTAMP;
+import static com.provectus.kafka.ui.model.PollingModeDTO.LATEST;
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_OFFSET;
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
@@ -11,6 +13,7 @@ import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.ConsumerPosition.Offsets;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.serde.api.Serde;
@@ -60,10 +63,11 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static void generateMsgs() throws Exception {
     createTopic(new NewTopic(TOPIC, PARTITIONS, (short) 1));
     createTopic(new NewTopic(EMPTY_TOPIC, PARTITIONS, (short) 1));
+    long startTs = System.currentTimeMillis();
     try (var producer = KafkaTestProducer.forKafka(kafka)) {
       for (int partition = 0; partition < PARTITIONS; partition++) {
         for (int i = 0; i < MSGS_PER_PARTITION; i++) {
-          long ts = System.currentTimeMillis() + i;
+          long ts = (startTs += 100);
           var value = "msg_" + partition + "_" + i;
           var metadata = producer.send(
               new ProducerRecord<>(
@@ -110,14 +114,14 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   void pollNothingOnEmptyTopic() {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
+        new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
+        new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         100,
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()
@@ -140,14 +144,14 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   void pollFullTopicFromBeginning() {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(BEGINNING, TOPIC, null),
+        new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null),
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(LATEST, TOPIC, null),
+        new ConsumerPosition(LATEST, TOPIC, List.of(), null, null),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()
@@ -169,14 +173,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
+        new ConsumerPosition(FROM_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
+            new Offsets(null, targetOffsets)),
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
+            new Offsets(null, targetOffsets)),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()
@@ -199,47 +205,40 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
   @Test
   void pollWithTimestamps() {
-    Map<TopicPartition, Long> targetTimestamps = new HashMap<>();
-    final Map<TopicPartition, List<Record>> perPartition =
-        SENT_RECORDS.stream().collect(Collectors.groupingBy((r) -> r.tp));
-    for (int i = 0; i < PARTITIONS; i++) {
-      final List<Record> records = perPartition.get(new TopicPartition(TOPIC, i));
-      int randRecordIdx = ThreadLocalRandom.current().nextInt(records.size());
-      log.info("partition: {} position: {}", i, randRecordIdx);
-      targetTimestamps.put(
-          new TopicPartition(TOPIC, i),
-          records.get(randRecordIdx).getTimestamp()
-      );
-    }
+    var tsStats = SENT_RECORDS.stream().mapToLong(Record::getTimestamp).summaryStatistics();
+    //choosing ts in the middle
+    long targetTimestamp = tsStats.getMin() + ((tsStats.getMax() - tsStats.getMin()) / 2);
 
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
+        new ConsumerPosition(FROM_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()
     );
 
+    expectEmitter(
+        forwardEmitter,
+        SENT_RECORDS.stream()
+            .filter(r -> r.getTimestamp() >= targetTimestamp)
+            .map(Record::getValue)
+            .collect(Collectors.toList())
+    );
+
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
+        new ConsumerPosition(TO_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()
     );
 
-    var expectedValues = SENT_RECORDS.stream()
-        .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getTp()))
-        .map(Record::getValue)
-        .collect(Collectors.toList());
-
-    expectEmitter(forwardEmitter, expectedValues);
-
-    expectedValues = SENT_RECORDS.stream()
-        .filter(r -> r.getTimestamp() < targetTimestamps.get(r.getTp()))
-        .map(Record::getValue)
-        .collect(Collectors.toList());
-
-    expectEmitter(backwardEmitter, expectedValues);
+    expectEmitter(
+        backwardEmitter,
+        SENT_RECORDS.stream()
+            .filter(r -> r.getTimestamp() < targetTimestamp)
+            .map(Record::getValue)
+            .collect(Collectors.toList())
+    );
   }
 
   @Test
@@ -252,7 +251,8 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
+            new Offsets(null, targetOffsets)),
         numMessages,
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()
@@ -278,7 +278,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, offsets),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null, new Offsets(null, offsets)),
         100,
         RECORD_DESERIALIZER,
         PollingSettings.createDefault()

+ 4 - 9
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java

@@ -7,8 +7,7 @@ import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
@@ -20,6 +19,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.time.Duration;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
@@ -497,18 +497,13 @@ public class SendAndReadTests extends AbstractIntegrationTest {
       String topic = createTopicAndCreateSchemas();
       try {
         messagesService.sendMessage(targetCluster, topic, msgToSend).block();
-        TopicMessageDTO polled = messagesService.loadMessages(
+        TopicMessageDTO polled = messagesService.loadMessagesV2(
                 targetCluster,
                 topic,
-                new ConsumerPosition(
-                    SeekTypeDTO.BEGINNING,
-                    topic,
-                    Map.of(new TopicPartition(topic, 0), 0L)
-                ),
+                new ConsumerPosition(PollingModeDTO.EARLIEST, topic, List.of(), null, null),
                 null,
                 null,
                 1,
-                SeekDirectionDTO.FORWARD,
                 msgToSend.getKeySerde().get(),
                 msgToSend.getValueSerde().get()
             ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))

+ 4 - 7
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -772,9 +772,7 @@ paths:
           content:
             application/json:
               schema:
-                type: array
-                items:
-                  $ref: '#/components/schemas/MessageFilterId'
+                $ref: '#/components/schemas/MessageFilterId'
 
 
   /api/clusters/{clusterName}/topics/{topicName}/messages/v2:
@@ -804,7 +802,7 @@ paths:
           in: query
           schema:
             type: array
-            description: List of target partitions( all partitions if not provided)
+            description: List of target partitions (all partitions if not provided)
             items:
               type: integer
         - name: lim
@@ -824,7 +822,7 @@ paths:
             type: string
         - name: offs
           in: query
-          description: partition offsets to read from / to. Format is "p1:off1,p2:off2,..."
+          description: partition offsets to read from / to. Format is "p1:offset1,p2:offset2,...".
           schema:
             type: string
         - name: ts
@@ -2571,7 +2569,6 @@ components:
             - CONSUMING
             - DONE
             - CURSOR
-            - EMIT_THROTTLING
         message:
           $ref: "#/components/schemas/TopicMessage"
         phase:
@@ -2708,7 +2705,7 @@ components:
         - FROM_TIMESTAMP
         - TO_TIMESTAMP
         - LATEST
-        - FIRST
+        - EARLIEST
         - TAILING
 
     MessageFilterType: