iliax 2 年之前
父节点
当前提交
01f5648ab2

+ 21 - 75
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -5,10 +5,8 @@ import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_
 import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
 import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
 import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
-import static java.util.stream.Collectors.toMap;
 
 import com.provectus.kafka.ui.api.MessagesApi;
-import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.MessageFilterIdDTO;
@@ -26,14 +24,11 @@ import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import javax.annotation.Nullable;
 import javax.validation.Valid;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kafka.common.TopicPartition;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -86,32 +81,7 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                            String keySerde,
                                                                            String valueSerde,
                                                                            ServerWebExchange exchange) {
-    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
-        .cluster(clusterName)
-        .topic(topicName)
-        .topicActions(MESSAGES_READ)
-        .build());
-
-    seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
-    seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
-    filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
-    int recordsLimit =
-        Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
-
-    var positions = new ConsumerPosition(
-        seekType,
-        topicName,
-        parseSeekTo(topicName, seekType, seekTo)
-    );
-    Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
-        ResponseEntity.ok(
-            messagesService.loadMessages(
-                getCluster(clusterName), topicName, positions, q, filterQueryType,
-                recordsLimit, seekDirection, keySerde, valueSerde)
-        )
-    );
-
-    return validateAccess.then(job);
+    throw new IllegalStateException();
   }
 
   @Override
@@ -132,34 +102,6 @@ public class MessagesController extends AbstractController implements MessagesAp
     );
   }
 
-  /**
-   * The format is [partition]::[offset] for specifying offsets
-   * or [partition]::[timestamp in millis] for specifying timestamps.
-   */
-  @Nullable
-  private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
-    if (seekTo == null || seekTo.isEmpty()) {
-      if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
-        return null;
-      }
-      throw new ValidationException("seekTo should be set if seekType is " + seekType);
-    }
-    return seekTo.stream()
-        .map(p -> {
-          String[] split = p.split("::");
-          if (split.length != 2) {
-            throw new IllegalArgumentException(
-                "Wrong seekTo argument format. See API docs for details");
-          }
-
-          return Pair.of(
-              new TopicPartition(topic, Integer.parseInt(split[0])),
-              Long.parseLong(split[1])
-          );
-        })
-        .collect(toMap(Pair::getKey, Pair::getValue));
-  }
-
   @Override
   public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
                                                                  String topicName,
@@ -197,8 +139,8 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                              @Nullable String filterId,
                                                                              @Nullable String offsetString,
                                                                              @Nullable Long ts,
-                                                                             @Nullable String ks,
-                                                                             @Nullable String vs,
+                                                                             @Nullable String keySerde,
+                                                                             @Nullable String valueSerde,
                                                                              ServerWebExchange exchange) {
     final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
         .cluster(clusterName)
@@ -206,6 +148,8 @@ public class MessagesController extends AbstractController implements MessagesAp
         .topicActions(MESSAGES_READ)
         .build());
 
+    ConsumerPosition consumerPosition = ConsumerPosition.create(mode, topicName, partitions, ts, offsetString);
+
     int recordsLimit =
         Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
 
@@ -213,23 +157,25 @@ public class MessagesController extends AbstractController implements MessagesAp
         Mono.just(
             ResponseEntity.ok(
                 messagesService.loadMessagesV2(
-                    getCluster(clusterName), topicName, positions, q, filterQueryType,
-                    recordsLimit, seekDirection, keySerde, valueSerde)
-            )
-        )
-    );
+                    getCluster(clusterName), topicName, consumerPosition,
+                    query, filterId, recordsLimit, keySerde, valueSerde))));
   }
 
-   interface PollingMode {
-    static PollingMode create(PollingModeDTO mode, @Nullable String offsetString, @Nullable Long timestamp) {
-      return null;
-    }
-  }
 
   @Override
-  public Mono<ResponseEntity<Flux<MessageFilterIdDTO>>> registerFilter(String clusterName, String topicName,
-                                                                       Mono<MessageFilterRegistrationDTO> messageFilterRegistrationDTO,
-                                                                       ServerWebExchange exchange) {
-    return null;
+  public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
+                                                                 String topicName,
+                                                                 Mono<MessageFilterRegistrationDTO> registration,
+                                                                 ServerWebExchange exchange) {
+
+    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+        .cluster(clusterName)
+        .topic(topicName)
+        .topicActions(MESSAGES_READ)
+        .build());
+
+    return validateAccess.then(registration)
+        .map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
+        .map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
   }
 }

+ 51 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -1,44 +1,78 @@
 package com.provectus.kafka.ui.emitter;
 
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.time.Duration;
+import java.time.Instant;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
-abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
+public abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
-  private final MessagesProcessing messagesProcessing;
-  private final PollingSettings pollingSettings;
+  private final ConsumerRecordDeserializer recordDeserializer;
+  private final ConsumingStats consumingStats = new ConsumingStats();
+  private final PollingThrottler throttler;
+  protected final PollingSettings pollingSettings;
 
-  protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
-    this.messagesProcessing = messagesProcessing;
+  protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingSettings pollingSettings) {
+    this.recordDeserializer = recordDeserializer;
     this.pollingSettings = pollingSettings;
+    this.throttler = pollingSettings.getPollingThrottler();
   }
 
-  protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
-    var records = consumer.pollEnhanced(pollingSettings.getPollTimeout());
-    sendConsuming(sink, records);
-    return records;
+  protected ConsumerRecords<Bytes, Bytes> poll(
+      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
+    return poll(sink, consumer, pollingSettings.getPollTimeout());
   }
 
-  protected boolean sendLimitReached() {
-    return messagesProcessing.limitReached();
+  protected ConsumerRecords<Bytes, Bytes> poll(
+      FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
+    Instant start = Instant.now();
+    ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
+    Instant finish = Instant.now();
+    int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
+    throttler.throttleAfterPoll(polledBytes);
+    return records;
   }
 
-  protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
-    messagesProcessing.send(sink, records);
+  protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
+                                                       ConsumerRecord<Bytes, Bytes> msg) {
+    final TopicMessageDTO topicMessage = recordDeserializer.deserialize(msg);
+    sink.next(
+        new TopicMessageEventDTO()
+            .type(TopicMessageEventDTO.TypeEnum.MESSAGE)
+            .message(topicMessage)
+    );
   }
 
   protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
-    messagesProcessing.sendPhase(sink, name);
+    sink.next(
+        new TopicMessageEventDTO()
+            .type(TopicMessageEventDTO.TypeEnum.PHASE)
+            .phase(new TopicMessagePhaseDTO().name(name))
+    );
   }
 
-  protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
-    messagesProcessing.sentConsumingInfo(sink, records);
+  protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
+                               ConsumerRecords<Bytes, Bytes> records,
+                               long elapsed) {
+    return consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
   }
 
   protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
-    messagesProcessing.sendFinishEvent(sink);
+    consumingStats.sendFinishEvent(sink, getFilterApplyErrors(sink));
     sink.complete();
   }
+
+  protected Number getFilterApplyErrors(FluxSink<?> sink) {
+    return sink.contextView()
+        .<MessageFilterStats>getOrEmpty(MessageFilterStats.class)
+        .<Number>map(MessageFilterStats::getFilterApplyErrors)
+        .orElse(0);
+  }
 }

+ 128 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -0,0 +1,128 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+@Slf4j
+public class BackwardRecordEmitter extends AbstractEmitter {
+
+  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final ConsumerPosition consumerPosition;
+  private final int messagesPerPage;
+
+  public BackwardRecordEmitter(
+      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      ConsumerPosition consumerPosition,
+      int messagesPerPage,
+      ConsumerRecordDeserializer recordDeserializer,
+      PollingSettings pollingSettings) {
+    super(recordDeserializer, pollingSettings);
+    this.consumerPosition = consumerPosition;
+    this.messagesPerPage = messagesPerPage;
+    this.consumerSupplier = consumerSupplier;
+  }
+
+  @Override
+  public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting backward polling for {}", consumerPosition);
+    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+      sendPhase(sink, "Created consumer");
+
+      var seekOperations = SeekOperations.create(consumer, consumerPosition);
+      var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
+      readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
+
+      int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
+      log.debug("'Until' offsets for polling: {}", readUntilOffsets);
+
+      while (!sink.isCancelled() && !readUntilOffsets.isEmpty()) {
+        new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
+          if (sink.isCancelled()) {
+            return; //fast return in case of sink cancellation
+          }
+          long beginOffset = seekOperations.getBeginOffsets().get(tp);
+          long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
+
+          partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
+              .stream()
+              .filter(r -> !sink.isCancelled())
+              .forEach(r -> sendMessage(sink, r));
+
+          if (beginOffset == readFromOffset) {
+            // we fully read this partition -> removing it from polling iterations
+            readUntilOffsets.remove(tp);
+          } else {
+            // updating 'to' offset for next polling iteration
+            readUntilOffsets.put(tp, readFromOffset);
+          }
+        });
+        if (readUntilOffsets.isEmpty()) {
+          log.debug("begin reached after partitions poll iteration");
+        } else if (sink.isCancelled()) {
+          log.debug("sink is cancelled after partitions poll iteration");
+        }
+      }
+      sendFinishStatsAndCompleteSink(sink);
+      log.debug("Polling finished");
+    } catch (InterruptException kafkaInterruptException) {
+      log.debug("Polling finished due to thread interruption");
+      sink.complete();
+    } catch (Exception e) {
+      log.error("Error occurred while consuming records", e);
+      sink.error(e);
+    }
+  }
+
+  private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
+      TopicPartition tp,
+      long fromOffset,
+      long toOffset,
+      Consumer<Bytes, Bytes> consumer,
+      FluxSink<TopicMessageEventDTO> sink
+  ) {
+    consumer.assign(Collections.singleton(tp));
+    consumer.seek(tp, fromOffset);
+    sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
+    int desiredMsgsToPoll = (int) (toOffset - fromOffset);
+
+    var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
+
+    EmptyPollsCounter emptyPolls  = pollingSettings.createEmptyPollsCounter();
+    while (!sink.isCancelled()
+        && recordsToSend.size() < desiredMsgsToPoll
+        && !emptyPolls.noDataEmptyPollsReached()) {
+      var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
+      emptyPolls.count(polledRecords);
+
+      log.debug("{} records polled from {}", polledRecords.count(), tp);
+
+      var filteredRecords = polledRecords.records(tp).stream()
+          .filter(r -> r.offset() < toOffset)
+          .toList();
+
+      if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
+        // we already read all messages in target offsets interval
+        break;
+      }
+      recordsToSend.addAll(filteredRecords);
+    }
+    log.debug("{} records to send", recordsToSend.size());
+    Collections.reverse(recordsToSend);
+    return recordsToSend;
+  }
+}

+ 69 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -0,0 +1,69 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.utils.Bytes;
+import reactor.core.publisher.FluxSink;
+
+@Slf4j
+public class ForwardRecordEmitter
+    extends AbstractEmitter {
+
+  private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
+  private final ConsumerPosition position;
+
+  public ForwardRecordEmitter(
+      Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
+      ConsumerPosition position,
+      ConsumerRecordDeserializer recordDeserializer,
+      PollingSettings pollingSettings) {
+    super(recordDeserializer, pollingSettings);
+    this.position = position;
+    this.consumerSupplier = consumerSupplier;
+  }
+
+  @Override
+  public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting forward polling for {}", position);
+    try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
+      sendPhase(sink, "Assigning partitions");
+      var seekOperations = SeekOperations.create(consumer, position);
+      seekOperations.assignAndSeekNonEmptyPartitions();
+
+      EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
+      while (!sink.isCancelled()
+          && !seekOperations.assignedPartitionsFullyPolled()
+          && !emptyPolls.noDataEmptyPollsReached()) {
+
+        sendPhase(sink, "Polling");
+        ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
+        emptyPolls.count(records);
+
+        log.debug("{} records polled", records.count());
+
+        for (ConsumerRecord<Bytes, Bytes> msg : records) {
+          if (!sink.isCancelled()) {
+            sendMessage(sink, msg);
+          } else {
+            break;
+          }
+        }
+      }
+      sendFinishStatsAndCompleteSink(sink);
+      log.debug("Polling finished");
+    } catch (InterruptException kafkaInterruptException) {
+      log.debug("Polling finished due to thread interruption");
+      sink.complete();
+    } catch (Exception e) {
+      log.error("Error occurred while consuming records", e);
+      sink.error(e);
+    }
+  }
+}

+ 11 - 24
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java

@@ -1,7 +1,6 @@
 package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import groovy.json.JsonSlurper;
 import java.util.function.Predicate;
@@ -22,59 +21,47 @@ public class MessageFilters {
   private MessageFilters() {
   }
 
-  public static Predicate<TopicMessageDTO> createMsgFilter(String query, MessageFilterTypeDTO type) {
-    switch (type) {
-      case STRING_CONTAINS:
-        return containsStringFilter(query);
-      case GROOVY_SCRIPT:
-        return groovyScriptFilter(query);
-      default:
-        throw new IllegalStateException("Unknown query type: " + type);
-    }
-  }
-
-  static Predicate<TopicMessageDTO> containsStringFilter(String string) {
+  public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
     return msg -> StringUtils.contains(msg.getKey(), string)
         || StringUtils.contains(msg.getContent(), string);
   }
 
-  static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
-    var engine = getGroovyEngine();
-    var compiledScript = compileScript(engine, script);
+  public static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
+    var compiledScript = compileScript(script);
     var jsonSlurper = new JsonSlurper();
     return new Predicate<TopicMessageDTO>() {
       @SneakyThrows
       @Override
       public boolean test(TopicMessageDTO msg) {
-        var bindings = engine.createBindings();
+        var bindings = getGroovyEngine().createBindings();
         bindings.put("partition", msg.getPartition());
         bindings.put("offset", msg.getOffset());
         bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli());
         bindings.put("keyAsText", msg.getKey());
         bindings.put("valueAsText", msg.getContent());
         bindings.put("headers", msg.getHeaders());
-        bindings.put("key", parseToJsonOrReturnAsIs(jsonSlurper, msg.getKey()));
-        bindings.put("value", parseToJsonOrReturnAsIs(jsonSlurper, msg.getContent()));
+        bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey()));
+        bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent()));
         var result = compiledScript.eval(bindings);
         if (result instanceof Boolean) {
           return (Boolean) result;
         } else {
           throw new ValidationException(
-              "Unexpected script result: %s, Boolean should be returned instead".formatted(result));
+              String.format("Unexpected script result: %s, Boolean should be returned instead", result));
         }
       }
     };
   }
 
   @Nullable
-  private static Object parseToJsonOrReturnAsIs(JsonSlurper parser, @Nullable String str) {
+  private static Object parseToJsonOrReturnNull(JsonSlurper parser, @Nullable String str) {
     if (str == null) {
       return null;
     }
     try {
       return parser.parseText(str);
     } catch (Exception e) {
-      return str;
+      return null;
     }
   }
 
@@ -87,9 +74,9 @@ public class MessageFilters {
     return GROOVY_ENGINE;
   }
 
-  private static CompiledScript compileScript(GroovyScriptEngineImpl engine, String script) {
+  private static CompiledScript compileScript(String script) {
     try {
-      return engine.compile(script);
+      return getGroovyEngine().compile(script);
     } catch (ScriptException e) {
       throw new ValidationException("Script syntax error: " + e.getMessage());
     }

+ 53 - 48
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java

@@ -1,71 +1,58 @@
 package com.provectus.kafka.ui.emitter;
 
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
-import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 @RequiredArgsConstructor(access = AccessLevel.PACKAGE)
-public class SeekOperations {
+class SeekOperations {
 
   private final Consumer<?, ?> consumer;
   private final OffsetsInfo offsetsInfo;
   private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
 
-  public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
+  static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
     OffsetsInfo offsetsInfo;
-    if (consumerPosition.getSeekTo() == null) {
-      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
+    if (consumerPosition.partitions().isEmpty()) {
+      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.topic());
     } else {
-      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet());
+      offsetsInfo = new OffsetsInfo(consumer, consumerPosition.partitions());
     }
     return new SeekOperations(
         consumer,
         offsetsInfo,
-        getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo())
+        getOffsetsForSeek(consumer, offsetsInfo, consumerPosition)
     );
   }
 
-  public void assignAndSeekNonEmptyPartitions() {
+  void assignAndSeekNonEmptyPartitions() {
     consumer.assign(offsetsForSeek.keySet());
     offsetsForSeek.forEach(consumer::seek);
   }
 
-  public Map<TopicPartition, Long> getBeginOffsets() {
+  Map<TopicPartition, Long> getBeginOffsets() {
     return offsetsInfo.getBeginOffsets();
   }
 
-  public Map<TopicPartition, Long> getEndOffsets() {
+  Map<TopicPartition, Long> getEndOffsets() {
     return offsetsInfo.getEndOffsets();
   }
 
-  public boolean assignedPartitionsFullyPolled() {
+  boolean assignedPartitionsFullyPolled() {
     return offsetsInfo.assignedPartitionsFullyPolled();
   }
 
-  // sum of (end - start) offsets for all partitions
-  public long summaryOffsetsRange() {
-    return offsetsInfo.summaryOffsetsRange();
-  }
-
-  // sum of differences between initial consumer seek and current consumer position (across all partitions)
-  public long offsetsProcessedFromSeek() {
-    MutableLong count = new MutableLong();
-    offsetsForSeek.forEach((tp, initialOffset) -> count.add(consumer.position(tp) - initialOffset));
-    return count.getValue();
-  }
-
   // Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
-  public Map<TopicPartition, Long> getOffsetsForSeek() {
+  Map<TopicPartition, Long> getOffsetsForSeek() {
     return offsetsForSeek;
   }
 
@@ -75,27 +62,33 @@ public class SeekOperations {
   @VisibleForTesting
   static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
                                                      OffsetsInfo offsetsInfo,
-                                                     SeekTypeDTO seekType,
-                                                     @Nullable Map<TopicPartition, Long> seekTo) {
-    switch (seekType) {
-      case LATEST:
+                                                     ConsumerPosition position) {
+    switch (position.pollingMode()) {
+      case LATEST, TAILING:
         return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
-      case BEGINNING:
+      case EARLIEST:
         return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
-      case OFFSET:
-        Preconditions.checkNotNull(seekTo);
-        return fixOffsets(offsetsInfo, seekTo);
-      case TIMESTAMP:
-        Preconditions.checkNotNull(seekTo);
-        return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
+      case FROM_OFFSET, TO_OFFSET:
+        Preconditions.checkNotNull(position.offsets());
+        return fixOffsets(offsetsInfo, position.offsets());
+      case FROM_TIMESTAMP, TO_TIMESTAMP:
+        Preconditions.checkNotNull(position.timestamp());
+        return offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, position.timestamp());
       default:
         throw new IllegalStateException();
     }
   }
 
-  private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo, Map<TopicPartition, Long> offsets) {
-    offsets = new HashMap<>(offsets);
-    offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+  private static Map<TopicPartition, Long> fixOffsets(OffsetsInfo offsetsInfo,
+                                                      ConsumerPosition.Offsets positionOffset) {
+    var offsets = new HashMap<TopicPartition, Long>();
+    if (positionOffset.offset() != null) {
+      offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset()));
+    } else {
+      Preconditions.checkNotNull(positionOffset.tpOffsets());
+      offsets.putAll(positionOffset.tpOffsets());
+      offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+    }
 
     Map<TopicPartition, Long> result = new HashMap<>();
     offsets.forEach((tp, targetOffset) -> {
@@ -112,13 +105,25 @@ public class SeekOperations {
     return result;
   }
 
-  private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
-                                                               Map<TopicPartition, Long> timestamps) {
-    timestamps = new HashMap<>(timestamps);
-    timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
+  private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer,
+                                                               PollingModeDTO pollingMode,
+                                                               OffsetsInfo offsetsInfo,
+                                                               Long timestamp) {
+    Map<TopicPartition, Long> timestamps = new HashMap<>();
+    offsetsInfo.getNonEmptyPartitions().forEach(tp -> timestamps.put(tp, timestamp));
 
-    return consumer.offsetsForTimes(timestamps).entrySet().stream()
-        .filter(e -> e.getValue() != null)
-        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
+    Map<TopicPartition, Long> result = new HashMap<>();
+    consumer.offsetsForTimes(timestamps).forEach((tp, offsetAndTimestamp) -> {
+      if (offsetAndTimestamp == null) {
+        if (pollingMode == TO_TIMESTAMP && offsetsInfo.getNonEmptyPartitions().contains(tp)) {
+          // if no offset was returned this means that *all* timestamps are lower
+          // than target timestamp. Is case of TO_OFFSET mode we need to read from the ending of tp
+          result.put(tp, offsetsInfo.getEndOffsets().get(tp));
+        }
+      } else {
+        result.put(tp, offsetAndTimestamp.offset());
+      }
+    });
+    return result;
   }
 }

+ 83 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java

@@ -1,14 +1,90 @@
 package com.provectus.kafka.ui.model;
 
+import static java.util.stream.Collectors.toMap;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
-import lombok.Value;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.common.TopicPartition;
+import org.springframework.util.StringUtils;
+
+
+public record ConsumerPosition(PollingModeDTO pollingMode,
+                               String topic,
+                               List<TopicPartition> partitions, //all partitions if list is empty
+                               @Nullable Long timestamp,
+                               @Nullable Offsets offsets) {
+
+  public record Offsets(@Nullable Long offset,
+                        @Nullable Map<TopicPartition, Long> tpOffsets) {
+  }
+
+  public static ConsumerPosition create(PollingModeDTO pollingMode,
+                                        String topic,
+                                        @Nullable List<Integer> partitions,
+                                        @Nullable Long timestamp,
+                                        @Nullable String offsetsStr) {
+    Offsets offsets = parseAndValidateOffsets(pollingMode, topic, offsetsStr);
+
+    var topicPartitions = Optional.ofNullable(partitions).orElse(List.of())
+        .stream()
+        .map(p -> new TopicPartition(topic, p))
+        .collect(Collectors.toList());
+
+    // if offsets are specified -inferring partitions list from there
+    topicPartitions = offsets.tpOffsets == null ? topicPartitions : List.copyOf(offsets.tpOffsets.keySet());
+
+    return new ConsumerPosition(
+        pollingMode,
+        topic,
+        Optional.ofNullable(topicPartitions).orElse(List.of()),
+        validateTimestamp(pollingMode, timestamp),
+        offsets
+    );
+  }
+
+  private static Long validateTimestamp(PollingModeDTO pollingMode, @Nullable Long ts) {
+    if (pollingMode == PollingModeDTO.FROM_TIMESTAMP || pollingMode == PollingModeDTO.TO_TIMESTAMP) {
+      if (ts == null) {
+        throw new ValidationException("timestamp not provided for " + pollingMode);
+      }
+    }
+    return ts;
+  }
+
+  private static Offsets parseAndValidateOffsets(PollingModeDTO pollingMode,
+                                                 String topic,
+                                                 @Nullable String offsetsStr) {
+    Offsets offsets = null;
+    if (pollingMode == PollingModeDTO.FROM_OFFSET || pollingMode == PollingModeDTO.TO_OFFSET) {
+      if (!StringUtils.hasText(offsetsStr)) {
+        throw new ValidationException("offsets not provided for " + pollingMode);
+      }
+      if (offsetsStr.contains(":")) {
+        offsets = new Offsets(Long.parseLong(offsetsStr), null);
+      } else {
+        Map<TopicPartition, Long> tpOffsets = Stream.of(offsetsStr.split(","))
+            .map(p -> {
+              String[] split = p.split(":");
+              if (split.length != 2) {
+                throw new IllegalArgumentException(
+                    "Wrong seekTo argument format. See API docs for details");
+              }
+              return Pair.of(
+                  new TopicPartition(topic, Integer.parseInt(split[0])),
+                  Long.parseLong(split[1])
+              );
+            })
+            .collect(toMap(Pair::getKey, Pair::getValue));
+        offsets = new Offsets(null, tpOffsets);
+      }
+    }
+    return offsets;
+  }
 
-@Value
-public class ConsumerPosition {
-  SeekTypeDTO seekType;
-  String topic;
-  @Nullable
-  Map<TopicPartition, Long> seekTo; // null if positioning should apply to all tps
 }

+ 49 - 96
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -5,32 +5,33 @@ import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilterStats;
 import com.provectus.kafka.ui.emitter.MessageFilters;
+import com.provectus.kafka.ui.emitter.ResultSizeLimiter;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
 import com.provectus.kafka.ui.model.PollingModeDTO;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
+import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
-import com.provectus.kafka.ui.util.ResultSizeLimiter;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -41,7 +42,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -57,6 +57,8 @@ public class MessagesService {
   private final DeserializationService deserializationService;
   private final ConsumerGroupService consumerGroupService;
 
+  private final Map<String, Predicate<TopicMessageDTO>> registeredFilters = new ConcurrentHashMap<>();
+
   private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
     return adminClientService.get(cluster)
         .flatMap(client -> client.describeTopic(topicName))
@@ -137,69 +139,9 @@ public class MessagesService {
     }
   }
 
-  public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
-                                                 ConsumerPosition consumerPosition,
-                                                 @Nullable String query,
-                                                 MessageFilterTypeDTO filterQueryType,
-                                                 int limit,
-                                                 SeekDirectionDTO seekDirection,
-                                                 @Nullable String keySerde,
-                                                 @Nullable String valueSerde) {
-    return withExistingTopic(cluster, topic)
-        .flux()
-        .publishOn(Schedulers.boundedElastic())
-        .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
-            filterQueryType, limit, seekDirection, keySerde, valueSerde));
-  }
-
-  private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
-                                                      String topic,
-                                                      ConsumerPosition consumerPosition,
-                                                      @Nullable String query,
-                                                      MessageFilterTypeDTO filterQueryType,
-                                                      int limit,
-                                                      SeekDirectionDTO seekDirection,
-                                                      @Nullable String keySerde,
-                                                      @Nullable String valueSerde) {
-
-    java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
-    ConsumerRecordDeserializer recordDeserializer =
-        deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
-    if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
-      emitter = new ForwardRecordEmitter(
-          () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          recordDeserializer,
-          cluster.getThrottler().get()
-      );
-    } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
-      emitter = new BackwardRecordEmitter(
-          () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          limit,
-          recordDeserializer,
-          cluster.getThrottler().get()
-      );
-    } else {
-      emitter = new TailingEmitter(
-          () -> consumerGroupService.createConsumer(cluster),
-          consumerPosition,
-          recordDeserializer,
-          cluster.getThrottler().get()
-      );
-    }
-    MessageFilterStats filterStats = new MessageFilterStats();
-    return Flux.create(emitter)
-        .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
-        .filter(getMsgFilter(query, filterQueryType, filterStats))
-        .map(getDataMasker(cluster, topic))
-        .takeWhile(createTakeWhilePredicate(seekDirection, limit))
-        .map(throttleUiPublish(seekDirection));
-  }
-
   public Flux<TopicMessageEventDTO> loadMessagesV2(KafkaCluster cluster,
                                                    String topic,
-                                                   PollingModeDTO pollingMode,
+                                                   ConsumerPosition position,
                                                    @Nullable String query,
                                                    @Nullable String filterId,
                                                    int limit,
@@ -208,58 +150,55 @@ public class MessagesService {
     return withExistingTopic(cluster, topic)
         .flux()
         .publishOn(Schedulers.boundedElastic())
-        .flatMap(td -> loadMessagesImplV2(cluster, topic, consumerPosition, query,
-            filterQueryType, limit, seekDirection, keySerde, valueSerde));
+        .flatMap(td -> loadMessagesImplV2(cluster, topic, position, query, filterId, limit, keySerde, valueSerde));
   }
 
   private Flux<TopicMessageEventDTO> loadMessagesImplV2(KafkaCluster cluster,
                                                         String topic,
                                                         ConsumerPosition consumerPosition,
                                                         @Nullable String query,
-                                                        MessageFilterTypeDTO filterQueryType,
+                                                        @Nullable String filterId,
                                                         int limit,
-                                                        SeekDirectionDTO seekDirection,
                                                         @Nullable String keySerde,
                                                         @Nullable String valueSerde) {
 
-    java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
     ConsumerRecordDeserializer recordDeserializer =
         deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
-    if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
-      emitter = new ForwardRecordEmitter(
+
+    var emitter = switch (consumerPosition.pollingMode()) {
+      case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
+          limit,
           recordDeserializer,
-          cluster.getThrottler().get()
+          cluster.getPollingSettings()
       );
-    } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
-      emitter = new BackwardRecordEmitter(
+      case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          limit,
           recordDeserializer,
-          cluster.getThrottler().get()
+          cluster.getPollingSettings()
       );
-    } else {
-      emitter = new TailingEmitter(
+      case TAILING -> new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           recordDeserializer,
-          cluster.getThrottler().get()
+          cluster.getPollingSettings()
       );
-    }
+    };
+
     MessageFilterStats filterStats = new MessageFilterStats();
     return Flux.create(emitter)
         .contextWrite(ctx -> ctx.put(MessageFilterStats.class, filterStats))
-        .filter(getMsgFilter(query, filterQueryType, filterStats))
+        .filter(getMsgFilter(query, filterId, filterStats))
         .map(getDataMasker(cluster, topic))
-        .takeWhile(createTakeWhilePredicate(seekDirection, limit))
-        .map(throttleUiPublish(seekDirection));
+        .takeWhile(createTakeWhilePredicate(consumerPosition.pollingMode(), limit))
+        .map(throttleUiPublish(consumerPosition.pollingMode()));
   }
 
   private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
-      SeekDirectionDTO seekDirection, int limit) {
-    return seekDirection == SeekDirectionDTO.TAILING
+      PollingModeDTO pollingMode, int limit) {
+    return pollingMode == PollingModeDTO.TAILING
         ? evt -> true // no limit for tailing
         : new ResultSizeLimiter(limit);
   }
@@ -278,21 +217,35 @@ public class MessagesService {
     };
   }
 
-  private Predicate<TopicMessageEventDTO> getMsgFilter(String query,
-                                                       MessageFilterTypeDTO filterQueryType,
+  public String registerMessageFilter(String groovyCode) {
+    var filter = MessageFilters.groovyScriptFilter(groovyCode);
+    var id = RandomStringUtils.random(10, true, true);
+    registeredFilters.put(id, filter);
+    return id;
+  }
+
+  private Predicate<TopicMessageEventDTO> getMsgFilter(@Nullable String containsStrFilter,
+                                                       @Nullable String filterId,
                                                        MessageFilterStats filterStats) {
-    if (StringUtils.isEmpty(query)) {
-      return evt -> true;
+    Predicate<TopicMessageDTO> messageFilter = e -> true;
+    if (containsStrFilter != null) {
+      messageFilter = MessageFilters.containsStringFilter(containsStrFilter);
+    }
+    if (filterId != null) {
+      messageFilter = registeredFilters.get(filterId);
+      if (messageFilter == null) {
+        throw new ValidationException("No filter was registered with id " + filterId);
+      }
     }
-    var messageFilter = MessageFilters.createMsgFilter(query, filterQueryType);
+    Predicate<TopicMessageDTO> finalMessageFilter = messageFilter;
     return evt -> {
       // we only apply filter for message events
       if (evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) {
         try {
-          return messageFilter.test(evt.getMessage());
+          return finalMessageFilter.test(evt.getMessage());
         } catch (Exception e) {
           filterStats.incrementApplyErrors();
-          log.trace("Error applying filter '{}' for message {}", query, evt.getMessage());
+          log.trace("Error applying filter for message {}", evt.getMessage());
           return false;
         }
       }
@@ -300,8 +253,8 @@ public class MessagesService {
     };
   }
 
-  private <T> UnaryOperator<T> throttleUiPublish(SeekDirectionDTO seekDirection) {
-    if (seekDirection == SeekDirectionDTO.TAILING) {
+  private <T> UnaryOperator<T> throttleUiPublish(PollingModeDTO pollingMode) {
+    if (pollingMode == PollingModeDTO.TAILING) {
       RateLimiter rateLimiter = RateLimiter.create(TAILING_UI_MESSAGE_THROTTLE_RATE);
       return m -> {
         rateLimiter.acquire(1);

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java

@@ -56,7 +56,7 @@ public class KafkaConsumerTests extends AbstractIntegrationTest {
     }
 
     long count = webTestClient.get()
-        .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName)
+        .uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?m=EARLIEST", LOCAL, topicName)
         .accept(TEXT_EVENT_STREAM)
         .exchange()
         .expectStatus()

+ 22 - 14
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java

@@ -2,7 +2,9 @@ package com.provectus.kafka.ui.emitter;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.PollingModeDTO;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -14,6 +16,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
 class SeekOperationsTest {
 
@@ -45,8 +49,7 @@ class SeekOperationsTest {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.LATEST,
-          null
+          new ConsumerPosition(PollingModeDTO.LATEST, topic, null, null, null)
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L));
     }
@@ -56,33 +59,38 @@ class SeekOperationsTest {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.BEGINNING,
-          null
+          new ConsumerPosition(PollingModeDTO.EARLIEST, topic, null, null, null)
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L));
     }
 
-    @Test
-    void offsets() {
+    @ParameterizedTest
+    @CsvSource({"TO_OFFSET", "FROM_OFFSET"})
+    void offsets(PollingModeDTO mode) {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.OFFSET,
-          Map.of(tp1, 10L, tp2, 10L, tp3, 26L)
+          new ConsumerPosition(
+              mode, topic, List.of(tp1, tp2, tp3), null,
+              new ConsumerPosition.Offsets(null, Map.of(tp1, 10L, tp2, 10L, tp3, 26L))
+          )
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 10L, tp3, 26L));
     }
 
-    @Test
-    void offsetsWithBoundsFixing() {
+    @ParameterizedTest
+    @CsvSource({"TO_OFFSET", "FROM_OFFSET"})
+    void offsetsWithBoundsFixing(PollingModeDTO mode) {
       var offsets = SeekOperations.getOffsetsForSeek(
           consumer,
           new OffsetsInfo(consumer, topic),
-          SeekTypeDTO.OFFSET,
-          Map.of(tp1, 10L, tp2, 21L, tp3, 24L)
+          new ConsumerPosition(
+              mode, topic, List.of(tp1, tp2, tp3), null,
+              new ConsumerPosition.Offsets(null, Map.of(tp1, 10L, tp2, 21L, tp3, 24L))
+          )
       );
       assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 25L));
     }
   }
 
-}
+}

+ 7 - 9
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java

@@ -4,10 +4,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
+import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.service.ClustersStorage;
 import com.provectus.kafka.ui.service.MessagesService;
 import java.time.Duration;
@@ -110,14 +109,13 @@ class TailingEmitterTest extends AbstractIntegrationTest {
         .get();
 
     return applicationContext.getBean(MessagesService.class)
-        .loadMessages(cluster, topicName,
-            new ConsumerPosition(SeekTypeDTO.LATEST, topic, null),
+        .loadMessagesV2(cluster, topicName,
+            new ConsumerPosition(PollingModeDTO.TAILING, topic, List.of(), null, null),
             query,
-            MessageFilterTypeDTO.STRING_CONTAINS,
+            null,
             0,
-            SeekDirectionDTO.TAILING,
-            "String",
-            "String");
+            StringSerde.name(),
+            StringSerde.name());
   }
 
   private List<TopicMessageEventDTO> startTailing(String filterQuery) {

+ 6 - 47
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java

@@ -1,22 +1,16 @@
 package com.provectus.kafka.ui.service;
 
-import static com.provectus.kafka.ui.service.MessagesService.execSmartFilterTest;
-import static org.assertj.core.api.Assertions.assertThat;
-
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
-import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.junit.jupiter.api.BeforeEach;
@@ -60,7 +54,9 @@ class MessagesServiceTest extends AbstractIntegrationTest {
   @Test
   void loadMessagesReturnsExceptionWhenTopicNotFound() {
     StepVerifier.create(messagesService
-            .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String"))
+            .loadMessagesV2(cluster, NON_EXISTING_TOPIC,
+                new ConsumerPosition(PollingModeDTO.TAILING, NON_EXISTING_TOPIC, List.of(), null, null),
+                null, null, 1, "String", "String"))
         .expectError(TopicNotFoundException.class)
         .verify();
   }
@@ -73,14 +69,13 @@ class MessagesServiceTest extends AbstractIntegrationTest {
       producer.send(testTopic, "message1");
       producer.send(testTopic, "message2").get();
 
-      Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
+      Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessagesV2(
           cluster,
           testTopic,
-          new ConsumerPosition(SeekTypeDTO.BEGINNING, testTopic, null),
+          new ConsumerPosition(PollingModeDTO.EARLIEST, testTopic, List.of(), null, null),
           null,
           null,
           100,
-          SeekDirectionDTO.FORWARD,
           StringSerde.name(),
           StringSerde.name()
       ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
@@ -96,40 +91,4 @@ class MessagesServiceTest extends AbstractIntegrationTest {
     }
   }
 
-  @Test
-  void execSmartFilterTestReturnsExecutionResult() {
-    var params = new SmartFilterTestExecutionDTO()
-        .filterCode("key != null && value != null && headers != null && timestampMs != null && offset != null")
-        .key("1234")
-        .value("{ \"some\" : \"value\" } ")
-        .headers(Map.of("h1", "hv1"))
-        .offset(12345L)
-        .timestampMs(System.currentTimeMillis())
-        .partition(1);
-    assertThat(execSmartFilterTest(params).getResult()).isTrue();
-
-    params.setFilterCode("return false");
-    assertThat(execSmartFilterTest(params).getResult()).isFalse();
-  }
-
-  @Test
-  void execSmartFilterTestReturnsErrorOnFilterApplyError() {
-    var result = execSmartFilterTest(
-        new SmartFilterTestExecutionDTO()
-            .filterCode("return 1/0")
-    );
-    assertThat(result.getResult()).isNull();
-    assertThat(result.getError()).containsIgnoringCase("execution error");
-  }
-
-  @Test
-  void execSmartFilterTestReturnsErrorOnFilterCompilationError() {
-    var result = execSmartFilterTest(
-        new SmartFilterTestExecutionDTO()
-            .filterCode("this is invalid groovy syntax = 1")
-    );
-    assertThat(result.getResult()).isNull();
-    assertThat(result.getError()).containsIgnoringCase("Compilation error");
-  }
-
 }

+ 62 - 79
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -1,26 +1,25 @@
 package com.provectus.kafka.ui.service;
 
-import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.LATEST;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.OFFSET;
-import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP;
+import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST;
+import static com.provectus.kafka.ui.model.PollingModeDTO.FROM_OFFSET;
+import static com.provectus.kafka.ui.model.PollingModeDTO.FROM_TIMESTAMP;
+import static com.provectus.kafka.ui.model.PollingModeDTO.LATEST;
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_OFFSET;
+import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
-import com.provectus.kafka.ui.emitter.BackwardEmitter;
-import com.provectus.kafka.ui.emitter.EnhancedConsumer;
-import com.provectus.kafka.ui.emitter.ForwardEmitter;
+import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.PollingSettings;
-import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.model.ConsumerPosition;
-import com.provectus.kafka.ui.model.TopicMessageDTO;
+import com.provectus.kafka.ui.model.ConsumerPosition.Offsets;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
-import com.provectus.kafka.ui.util.ApplicationMetrics;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -31,15 +30,17 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -57,16 +58,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static final String EMPTY_TOPIC = TOPIC + "_empty";
   static final List<Record> SENT_RECORDS = new ArrayList<>();
   static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer();
-  static final Predicate<TopicMessageDTO> NOOP_FILTER = m -> true;
 
   @BeforeAll
   static void generateMsgs() throws Exception {
     createTopic(new NewTopic(TOPIC, PARTITIONS, (short) 1));
     createTopic(new NewTopic(EMPTY_TOPIC, PARTITIONS, (short) 1));
+    long startTs = System.currentTimeMillis();
     try (var producer = KafkaTestProducer.forKafka(kafka)) {
       for (int partition = 0; partition < PARTITIONS; partition++) {
         for (int i = 0; i < MSGS_PER_PARTITION; i++) {
-          long ts = System.currentTimeMillis() + i;
+          long ts = (startTs += 100);
           var value = "msg_" + partition + "_" + i;
           var metadata = producer.send(
               new ProducerRecord<>(
@@ -93,7 +94,6 @@ class RecordEmitterTest extends AbstractIntegrationTest {
   static void cleanup() {
     deleteTopic(TOPIC);
     deleteTopic(EMPTY_TOPIC);
-    SENT_RECORDS.clear();
   }
 
   private static ConsumerRecordDeserializer createRecordsDeserializer() {
@@ -106,28 +106,24 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         s.deserializer(null, Serde.Target.VALUE),
         StringSerde.name(),
         s.deserializer(null, Serde.Target.KEY),
-        s.deserializer(null, Serde.Target.VALUE),
-        msg -> msg
+        s.deserializer(null, Serde.Target.VALUE)
     );
   }
 
   @Test
   void pollNothingOnEmptyTopic() {
-    var forwardEmitter = new ForwardEmitter(
+    var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
-        100,
+        new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardEmitter(
+    var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
+        new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null),
         100,
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -146,21 +142,18 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
   @Test
   void pollFullTopicFromBeginning() {
-    var forwardEmitter = new ForwardEmitter(
+    var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(BEGINNING, TOPIC, null),
-        PARTITIONS * MSGS_PER_PARTITION,
+        new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null),
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardEmitter(
+    var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(LATEST, TOPIC, null),
+        new ConsumerPosition(LATEST, TOPIC, List.of(), null, null),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -178,21 +171,20 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       targetOffsets.put(new TopicPartition(TOPIC, i), offset);
     }
 
-    var forwardEmitter = new ForwardEmitter(
+    var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
-        PARTITIONS * MSGS_PER_PARTITION,
+        new ConsumerPosition(FROM_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
+            new Offsets(null, targetOffsets)),
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardEmitter(
+    var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
+            new Offsets(null, targetOffsets)),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -213,50 +205,40 @@ class RecordEmitterTest extends AbstractIntegrationTest {
 
   @Test
   void pollWithTimestamps() {
-    Map<TopicPartition, Long> targetTimestamps = new HashMap<>();
-    final Map<TopicPartition, List<Record>> perPartition =
-        SENT_RECORDS.stream().collect(Collectors.groupingBy((r) -> r.tp));
-    for (int i = 0; i < PARTITIONS; i++) {
-      final List<Record> records = perPartition.get(new TopicPartition(TOPIC, i));
-      int randRecordIdx = ThreadLocalRandom.current().nextInt(records.size());
-      log.info("partition: {} position: {}", i, randRecordIdx);
-      targetTimestamps.put(
-          new TopicPartition(TOPIC, i),
-          records.get(randRecordIdx).getTimestamp()
-      );
-    }
+    var tsStats = SENT_RECORDS.stream().mapToLong(Record::getTimestamp).summaryStatistics();
+    //choosing ts in the middle
+    long targetTimestamp = tsStats.getMin() + ((tsStats.getMax() - tsStats.getMin()) / 2);
 
-    var forwardEmitter = new ForwardEmitter(
+    var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
-        PARTITIONS * MSGS_PER_PARTITION,
+        new ConsumerPosition(FROM_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var backwardEmitter = new BackwardEmitter(
+    expectEmitter(
+        forwardEmitter,
+        SENT_RECORDS.stream()
+            .filter(r -> r.getTimestamp() >= targetTimestamp)
+            .map(Record::getValue)
+            .collect(Collectors.toList())
+    );
+
+    var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
+        new ConsumerPosition(TO_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null),
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
-    var expectedValues = SENT_RECORDS.stream()
-        .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getTp()))
-        .map(Record::getValue)
-        .collect(Collectors.toList());
-
-    expectEmitter(forwardEmitter, expectedValues);
-
-    expectedValues = SENT_RECORDS.stream()
-        .filter(r -> r.getTimestamp() < targetTimestamps.get(r.getTp()))
-        .map(Record::getValue)
-        .collect(Collectors.toList());
-
-    expectEmitter(backwardEmitter, expectedValues);
+    expectEmitter(
+        backwardEmitter,
+        SENT_RECORDS.stream()
+            .filter(r -> r.getTimestamp() < targetTimestamp)
+            .map(Record::getValue)
+            .collect(Collectors.toList())
+    );
   }
 
   @Test
@@ -267,12 +249,12 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       targetOffsets.put(new TopicPartition(TOPIC, i), (long) MSGS_PER_PARTITION);
     }
 
-    var backwardEmitter = new BackwardEmitter(
+    var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null,
+            new Offsets(null, targetOffsets)),
         numMessages,
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -294,12 +276,11 @@ class RecordEmitterTest extends AbstractIntegrationTest {
       offsets.put(new TopicPartition(TOPIC, i), 0L);
     }
 
-    var backwardEmitter = new BackwardEmitter(
+    var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
-        new ConsumerPosition(OFFSET, TOPIC, offsets),
+        new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null, new Offsets(null, offsets)),
         100,
         RECORD_DESERIALIZER,
-        NOOP_FILTER,
         PollingSettings.createDefault()
     );
 
@@ -339,20 +320,22 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
   }
 
-  private EnhancedConsumer createConsumer() {
+  private KafkaConsumer<Bytes, Bytes> createConsumer() {
     return createConsumer(Map.of());
   }
 
-  private EnhancedConsumer createConsumer(Map<String, Object> properties) {
+  private KafkaConsumer<Bytes, Bytes> createConsumer(Map<String, Object> properties) {
     final Map<String, ? extends Serializable> map = Map.of(
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
         ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
-        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19 // to check multiple polls
+        ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls
+        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
     );
     Properties props = new Properties();
     props.putAll(map);
     props.putAll(properties);
-    return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
+    return new KafkaConsumer<>(props);
   }
 
   @Value

+ 4 - 9
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java

@@ -7,8 +7,7 @@ import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import com.provectus.kafka.ui.model.SeekDirectionDTO;
-import com.provectus.kafka.ui.model.SeekTypeDTO;
+import com.provectus.kafka.ui.model.PollingModeDTO;
 import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
@@ -20,6 +19,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema;
 import io.confluent.kafka.schemaregistry.json.JsonSchema;
 import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
 import java.time.Duration;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
@@ -497,18 +497,13 @@ public class SendAndReadTests extends AbstractIntegrationTest {
       String topic = createTopicAndCreateSchemas();
       try {
         messagesService.sendMessage(targetCluster, topic, msgToSend).block();
-        TopicMessageDTO polled = messagesService.loadMessages(
+        TopicMessageDTO polled = messagesService.loadMessagesV2(
                 targetCluster,
                 topic,
-                new ConsumerPosition(
-                    SeekTypeDTO.BEGINNING,
-                    topic,
-                    Map.of(new TopicPartition(topic, 0), 0L)
-                ),
+                new ConsumerPosition(PollingModeDTO.EARLIEST, topic, List.of(), null, null),
                 null,
                 null,
                 1,
-                SeekDirectionDTO.FORWARD,
                 msgToSend.getKeySerde().get(),
                 msgToSend.getValueSerde().get()
             ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))

+ 4 - 7
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -772,9 +772,7 @@ paths:
           content:
             application/json:
               schema:
-                type: array
-                items:
-                  $ref: '#/components/schemas/MessageFilterId'
+                $ref: '#/components/schemas/MessageFilterId'
 
 
   /api/clusters/{clusterName}/topics/{topicName}/messages/v2:
@@ -804,7 +802,7 @@ paths:
           in: query
           schema:
             type: array
-            description: List of target partitions( all partitions if not provided)
+            description: List of target partitions (all partitions if not provided)
             items:
               type: integer
         - name: lim
@@ -824,7 +822,7 @@ paths:
             type: string
         - name: offs
           in: query
-          description: partition offsets to read from / to. Format is "p1:off1,p2:off2,..."
+          description: partition offsets to read from / to. Format is "p1:offset1,p2:offset2,...".
           schema:
             type: string
         - name: ts
@@ -2571,7 +2569,6 @@ components:
             - CONSUMING
             - DONE
             - CURSOR
-            - EMIT_THROTTLING
         message:
           $ref: "#/components/schemas/TopicMessage"
         phase:
@@ -2708,7 +2705,7 @@ components:
         - FROM_TIMESTAMP
         - TO_TIMESTAMP
         - LATEST
-        - FIRST
+        - EARLIEST
         - TAILING
 
     MessageFilterType: