Bladeren bron

Merge branch 'master' into vlad/develop

VladSenyuta 2 jaren geleden
bovenliggende
commit
bfb80fdf5c
18 gewijzigde bestanden met toevoegingen van 544 en 229 verwijderingen
  1. 2 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java
  2. 11 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java
  3. 102 89
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java
  4. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java
  5. 83 36
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
  6. 0 21
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/MapUtil.java
  7. 91 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java
  8. 5 2
      kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/BasePage.java
  9. 80 6
      kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/topic/TopicDetails.java
  10. 65 18
      kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/services/ApiService.java
  11. 16 0
      kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/utilities/TimeUtils.java
  12. 1 0
      kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/base/BaseTest.java
  13. 10 11
      kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/suite/connectors/ConnectorsTests.java
  14. 2 3
      kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/suite/schemas/SchemasTests.java
  15. 70 24
      kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/suite/topics/TopicMessagesTests.java
  16. 5 7
      kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/suite/topics/TopicsTests.java
  17. 0 1
      kafka-ui-e2e-checks/src/test/resources/producedkey.txt
  18. 0 1
      kafka-ui-e2e-checks/src/test/resources/testData.txt

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java

@@ -189,8 +189,8 @@ public class ConsumerGroupsController extends AbstractController implements Cons
   private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage
                                                         consumerGroupConsumerGroupsPage) {
     return new ConsumerGroupsPageResponseDTO()
-        .pageCount(consumerGroupConsumerGroupsPage.getTotalPages())
-        .consumerGroups(consumerGroupConsumerGroupsPage.getConsumerGroups()
+        .pageCount(consumerGroupConsumerGroupsPage.totalPages())
+        .consumerGroups(consumerGroupConsumerGroupsPage.consumerGroups()
             .stream()
             .map(ConsumerGroupMapper::toDto)
             .collect(Collectors.toList()));

+ 11 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java

@@ -89,13 +89,17 @@ public class ConsumerGroupMapper {
             .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
     ).collect(Collectors.toSet()).size();
 
-    long messagesBehind = c.getOffsets().entrySet().stream()
-        .mapToLong(e ->
-            Optional.ofNullable(c.getEndOffsets())
-                .map(o -> o.get(e.getKey()))
-                .map(o -> o - e.getValue())
-                .orElse(0L)
-        ).sum();
+    Long messagesBehind = null;
+    // messagesBehind should be undefined if no committed offsets found for topic
+    if (!c.getOffsets().isEmpty()) {
+      messagesBehind = c.getOffsets().entrySet().stream()
+          .mapToLong(e ->
+              Optional.ofNullable(c.getEndOffsets())
+                  .map(o -> o.get(e.getKey()))
+                  .map(o -> o - e.getValue())
+                  .orElse(0L)
+          ).sum();
+    }
 
     consumerGroup.setMessagesBehind(messagesBehind);
     consumerGroup.setTopics(numTopics);

+ 102 - 89
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service;
 
+import com.google.common.collect.Table;
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
@@ -7,6 +8,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -14,22 +16,21 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.function.ToIntFunction;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
-import lombok.Value;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 @Service
 @RequiredArgsConstructor
@@ -41,21 +42,16 @@ public class ConsumerGroupService {
   private Mono<List<InternalConsumerGroup>> getConsumerGroups(
       ReactiveAdminClient ac,
       List<ConsumerGroupDescription> descriptions) {
-    return Flux.fromIterable(descriptions)
-        // 1. getting committed offsets for all groups
-        .flatMap(desc -> ac.listConsumerGroupOffsets(desc.groupId())
-            .map(offsets -> Tuples.of(desc, offsets)))
-        .collectMap(Tuple2::getT1, Tuple2::getT2)
-        .flatMap((Map<ConsumerGroupDescription, Map<TopicPartition, Long>> groupOffsetsMap) -> {
-          var tpsFromGroupOffsets = groupOffsetsMap.values().stream()
-              .flatMap(v -> v.keySet().stream())
-              .collect(Collectors.toSet());
+    var groupNames = descriptions.stream().map(ConsumerGroupDescription::groupId).toList();
+    // 1. getting committed offsets for all groups
+    return ac.listConsumerGroupOffsets(groupNames, null)
+        .flatMap((Table<String, TopicPartition, Long> committedOffsets) -> {
           // 2. getting end offsets for partitions with committed offsets
-          return ac.listOffsets(tpsFromGroupOffsets, OffsetSpec.latest(), false)
+          return ac.listOffsets(committedOffsets.columnKeySet(), OffsetSpec.latest(), false)
               .map(endOffsets ->
                   descriptions.stream()
                       .map(desc -> {
-                        var groupOffsets = groupOffsetsMap.get(desc);
+                        var groupOffsets = committedOffsets.row(desc.groupId());
                         var endOffsetsForGroup = new HashMap<>(endOffsets);
                         endOffsetsForGroup.keySet().retainAll(groupOffsets.keySet());
                         // 3. gathering description & offsets
@@ -73,105 +69,122 @@ public class ConsumerGroupService {
             .flatMap(endOffsets -> {
               var tps = new ArrayList<>(endOffsets.keySet());
               // 2. getting all consumer groups
-              return describeConsumerGroups(ac, null)
-                  .flatMap((List<ConsumerGroupDescription> groups) ->
-                      Flux.fromIterable(groups)
-                          // 3. for each group trying to find committed offsets for topic
-                          .flatMap(g ->
-                              ac.listConsumerGroupOffsets(g.groupId(), tps)
-                                  // 4. keeping only groups that relates to topic
-                                  .filter(offsets -> isConsumerGroupRelatesToTopic(topic, g, offsets))
-                                  // 5. constructing results
-                                  .map(offsets -> InternalTopicConsumerGroup.create(topic, g, offsets, endOffsets))
-                          ).collectList());
+              return describeConsumerGroups(ac)
+                  .flatMap((List<ConsumerGroupDescription> groups) -> {
+                        // 3. trying to find committed offsets for topic
+                        var groupNames = groups.stream().map(ConsumerGroupDescription::groupId).toList();
+                        return ac.listConsumerGroupOffsets(groupNames, tps).map(offsets ->
+                            groups.stream()
+                                // 4. keeping only groups that relates to topic
+                                .filter(g -> isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId())))
+                                .map(g ->
+                                    // 5. constructing results
+                                    InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets))
+                                .toList()
+                        );
+                      }
+                  );
             }));
   }
 
   private boolean isConsumerGroupRelatesToTopic(String topic,
                                                 ConsumerGroupDescription description,
-                                                Map<TopicPartition, Long> committedGroupOffsetsForTopic) {
+                                                boolean hasCommittedOffsets) {
     boolean hasActiveMembersForTopic = description.members()
         .stream()
         .anyMatch(m -> m.assignment().topicPartitions().stream().anyMatch(tp -> tp.topic().equals(topic)));
-    boolean hasCommittedOffsets = !committedGroupOffsetsForTopic.isEmpty();
     return hasActiveMembersForTopic || hasCommittedOffsets;
   }
 
-  @Value
-  public static class ConsumerGroupsPage {
-    List<InternalConsumerGroup> consumerGroups;
-    int totalPages;
+  public record ConsumerGroupsPage(List<InternalConsumerGroup> consumerGroups, int totalPages) {
   }
 
   public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
       KafkaCluster cluster,
-      int page,
+      int pageNum,
       int perPage,
       @Nullable String search,
       ConsumerGroupOrderingDTO orderBy,
       SortOrderDTO sortOrderDto) {
-    var comparator = sortOrderDto.equals(SortOrderDTO.ASC)
-        ? getPaginationComparator(orderBy)
-        : getPaginationComparator(orderBy).reversed();
     return adminClientService.get(cluster).flatMap(ac ->
-        describeConsumerGroups(ac, search).flatMap(descriptions ->
-            getConsumerGroups(
-                ac,
-                descriptions.stream()
-                    .sorted(comparator)
-                    .skip((long) (page - 1) * perPage)
-                    .limit(perPage)
-                    .collect(Collectors.toList())
+        ac.listConsumerGroups()
+            .map(listing -> search == null
+                ? listing
+                : listing.stream()
+                .filter(g -> StringUtils.containsIgnoreCase(g.groupId(), search))
+                .toList()
             )
-                .flatMapMany(Flux::fromIterable)
-                .filterWhen(
-                    cg -> accessControlService.isConsumerGroupAccessible(cg.getGroupId(), cluster.getName()))
-                .collect(Collectors.toList())
-                .map(cgs -> new ConsumerGroupsPage(
-                    cgs,
-                    (descriptions.size() / perPage) + (descriptions.size() % perPage == 0 ? 0 : 1))))
-    );
+            .flatMapIterable(lst -> lst)
+            .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName()))
+            .collectList()
+            .flatMap(allGroups ->
+                loadSortedDescriptions(ac, allGroups, pageNum, perPage, orderBy, sortOrderDto)
+                    .flatMap(descriptions -> getConsumerGroups(ac, descriptions)
+                        .map(page -> new ConsumerGroupsPage(
+                            page,
+                            (allGroups.size() / perPage) + (allGroups.size() % perPage == 0 ? 0 : 1))))));
   }
 
-  private Comparator<ConsumerGroupDescription> getPaginationComparator(ConsumerGroupOrderingDTO
-                                                                           orderBy) {
-    switch (orderBy) {
-      case NAME:
-        return Comparator.comparing(ConsumerGroupDescription::groupId);
-      case STATE:
-        ToIntFunction<ConsumerGroupDescription> statesPriorities = cg -> {
-          switch (cg.state()) {
-            case STABLE:
-              return 0;
-            case COMPLETING_REBALANCE:
-              return 1;
-            case PREPARING_REBALANCE:
-              return 2;
-            case EMPTY:
-              return 3;
-            case DEAD:
-              return 4;
-            case UNKNOWN:
-              return 5;
-            default:
-              return 100;
-          }
-        };
-        return Comparator.comparingInt(statesPriorities);
-      case MEMBERS:
-        return Comparator.comparingInt(cg -> cg.members().size());
-      default:
-        throw new IllegalStateException("Unsupported order by: " + orderBy);
-    }
+  private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdminClient ac,
+                                                                      List<ConsumerGroupListing> groups,
+                                                                      int pageNum,
+                                                                      int perPage,
+                                                                      ConsumerGroupOrderingDTO orderBy,
+                                                                      SortOrderDTO sortOrderDto) {
+    return switch (orderBy) {
+      case NAME -> {
+        Comparator<ConsumerGroupListing> comparator = Comparator.comparing(ConsumerGroupListing::groupId);
+        yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
+      }
+      case STATE -> {
+        ToIntFunction<ConsumerGroupListing> statesPriorities =
+            cg -> switch (cg.state().orElse(ConsumerGroupState.UNKNOWN)) {
+              case STABLE -> 0;
+              case COMPLETING_REBALANCE -> 1;
+              case PREPARING_REBALANCE -> 2;
+              case EMPTY -> 3;
+              case DEAD -> 4;
+              case UNKNOWN -> 5;
+            };
+        var comparator = Comparator.comparingInt(statesPriorities);
+        yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
+      }
+      case MEMBERS -> {
+        var comparator = Comparator.<ConsumerGroupDescription>comparingInt(cg -> cg.members().size());
+        var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
+        yield ac.describeConsumerGroups(groupNames)
+            .map(descriptions ->
+                sortAndPaginate(descriptions.values(), comparator, pageNum, perPage, sortOrderDto).toList());
+      }
+    };
   }
 
-  private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdminClient ac,
-                                                                      @Nullable String search) {
-    return ac.listConsumerGroups()
-        .map(groupIds -> groupIds
-            .stream()
-            .filter(groupId -> search == null || StringUtils.containsIgnoreCase(groupId, search))
-            .collect(Collectors.toList()))
+  private Mono<List<ConsumerGroupDescription>> loadDescriptionsByListings(ReactiveAdminClient ac,
+                                                                          List<ConsumerGroupListing> listings,
+                                                                          Comparator<ConsumerGroupListing> comparator,
+                                                                          int pageNum,
+                                                                          int perPage,
+                                                                          SortOrderDTO sortOrderDto) {
+    List<String> sortedGroups = sortAndPaginate(listings, comparator, pageNum, perPage, sortOrderDto)
+        .map(ConsumerGroupListing::groupId)
+        .toList();
+    return ac.describeConsumerGroups(sortedGroups)
+        .map(descrMap -> sortedGroups.stream().map(descrMap::get).toList());
+  }
+
+  private <T> Stream<T> sortAndPaginate(Collection<T> collection,
+                                        Comparator<T> comparator,
+                                        int pageNum,
+                                        int perPage,
+                                        SortOrderDTO sortOrderDto) {
+    return collection.stream()
+        .sorted(sortOrderDto == SortOrderDTO.ASC ? comparator : comparator.reversed())
+        .skip((long) (pageNum - 1) * perPage)
+        .limit(perPage);
+  }
+
+  private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdminClient ac) {
+    return ac.listConsumerGroupNames()
         .flatMap(ac::describeConsumerGroups)
         .map(cgs -> new ArrayList<>(cgs.values()));
   }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java

@@ -98,7 +98,7 @@ public class OffsetsResetService {
         .flatMap(ac ->
             // we need to call listConsumerGroups() to check group existence, because
             // describeConsumerGroups() will return consumer group even if it doesn't exist
-            ac.listConsumerGroups()
+            ac.listConsumerGroupNames()
                 .filter(cgs -> cgs.stream().anyMatch(g -> g.equals(groupId)))
                 .flatMap(cgs -> ac.describeConsumerGroups(List.of(groupId)))
                 .filter(cgs -> cgs.containsKey(groupId))

+ 83 - 36
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -4,12 +4,12 @@ import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Table;
 import com.provectus.kafka.ui.exception.IllegalEntityStateException;
 import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.util.MapUtil;
 import com.provectus.kafka.ui.util.NumberUtil;
 import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
 import java.io.Closeable;
@@ -18,7 +18,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -45,7 +44,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.DescribeClusterOptions;
 import org.apache.kafka.clients.admin.DescribeClusterResult;
 import org.apache.kafka.clients.admin.DescribeConfigsOptions;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
@@ -54,7 +53,6 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -69,6 +67,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
@@ -183,7 +182,7 @@ public class ReactiveAdminClient implements Closeable {
         topicNames,
         200,
         part -> getTopicsConfigImpl(part, includeDocFixed),
-        (m1, m2) -> ImmutableMap.<String, List<ConfigEntry>>builder().putAll(m1).putAll(m2).build()
+        mapMerger()
     );
   }
 
@@ -236,7 +235,7 @@ public class ReactiveAdminClient implements Closeable {
         topics,
         200,
         this::describeTopicsImpl,
-        (m1, m2) -> ImmutableMap.<String, TopicDescription>builder().putAll(m1).putAll(m2).build()
+        mapMerger()
     );
   }
 
@@ -383,32 +382,57 @@ public class ReactiveAdminClient implements Closeable {
     }
   }
 
-  public Mono<List<String>> listConsumerGroups() {
-    return toMono(client.listConsumerGroups().all())
-        .map(lst -> lst.stream().map(ConsumerGroupListing::groupId).collect(toList()));
+  public Mono<List<String>> listConsumerGroupNames() {
+    return listConsumerGroups().map(lst -> lst.stream().map(ConsumerGroupListing::groupId).toList());
   }
 
-  public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
-    return toMono(client.describeConsumerGroups(groupIds).all());
+  public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
+    return toMono(client.listConsumerGroups().all());
   }
 
-  public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(String groupId) {
-    return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
+  public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
+    return partitionCalls(
+        groupIds,
+        25,
+        4,
+        ids -> toMono(client.describeConsumerGroups(ids).all()),
+        mapMerger()
+    );
   }
 
-  public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
-      String groupId, List<TopicPartition> partitions) {
-    return listConsumerGroupOffsets(groupId,
-        new ListConsumerGroupOffsetsOptions().topicPartitions(partitions));
-  }
+  // group -> partition -> offset
+  // NOTE: partitions with no committed offsets will be skipped
+  public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<String> consumerGroups,
+                                                                            // all partitions if null passed
+                                                                            @Nullable List<TopicPartition> partitions) {
+    Function<Collection<String>, Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>>> call =
+        groups -> toMono(
+            client.listConsumerGroupOffsets(
+                groups.stream()
+                    .collect(Collectors.toMap(
+                        g -> g,
+                        g -> new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)
+                    ))).all()
+        );
+
+    Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
+        consumerGroups,
+        25,
+        4,
+        call,
+        mapMerger()
+    );
 
-  private Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
-      String groupId, ListConsumerGroupOffsetsOptions options) {
-    return toMono(client.listConsumerGroupOffsets(groupId, options).partitionsToOffsetAndMetadata())
-        .map(MapUtil::removeNullValues)
-        .map(m -> m.entrySet().stream()
-            .map(e -> Tuples.of(e.getKey(), e.getValue().offset()))
-            .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)));
+    return merged.map(map -> {
+      var table = ImmutableTable.<String, TopicPartition, Long>builder();
+      map.forEach((g, tpOffsets) -> tpOffsets.forEach((tp, offset) -> {
+        if (offset != null) {
+          // offset will be null for partitions that don't have committed offset for this group
+          table.put(g, tp, offset.offset());
+        }
+      }));
+      return table.build();
+    });
   }
 
   public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
@@ -501,7 +525,7 @@ public class ReactiveAdminClient implements Closeable {
         partitions,
         200,
         call,
-        (m1, m2) -> ImmutableMap.<TopicPartition, Long>builder().putAll(m1).putAll(m2).build()
+        mapMerger()
     );
   }
 
@@ -551,7 +575,7 @@ public class ReactiveAdminClient implements Closeable {
   }
 
   /**
-   * Splits input collection into batches, applies each batch sequentially to function
+   * Splits input collection into batches, converts each batch into Mono, sequentially subscribes to them
    * and merges output Monos into one Mono.
    */
   private static <R, I> Mono<R> partitionCalls(Collection<I> items,
@@ -561,14 +585,37 @@ public class ReactiveAdminClient implements Closeable {
     if (items.isEmpty()) {
       return call.apply(items);
     }
-    Iterator<List<I>> parts = Iterators.partition(items.iterator(), partitionSize);
-    Mono<R> mono = call.apply(parts.next());
-    while (parts.hasNext()) {
-      var nextPart = parts.next();
-      // calls will be executed sequentially
-      mono = mono.flatMap(res1 -> call.apply(nextPart).map(res2 -> merger.apply(res1, res2)));
+    Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
+    return Flux.fromIterable(parts)
+        .concatMap(call)
+        .reduce(merger);
+  }
+
+  /**
+   * Splits input collection into batches, converts each batch into Mono, subscribes to them (concurrently,
+   * with specified concurrency level) and merges output Monos into one Mono.
+   */
+  private static <R, I> Mono<R> partitionCalls(Collection<I> items,
+                                               int partitionSize,
+                                               int concurrency,
+                                               Function<Collection<I>, Mono<R>> call,
+                                               BiFunction<R, R, R> merger) {
+    if (items.isEmpty()) {
+      return call.apply(items);
     }
-    return mono;
+    Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
+    return Flux.fromIterable(parts)
+        .flatMap(call, concurrency)
+        .reduce(merger);
+  }
+
+  private static <K, V> BiFunction<Map<K, V>, Map<K, V>, Map<K, V>> mapMerger() {
+    return (m1, m2) -> {
+      var merged = new HashMap<K, V>();
+      merged.putAll(m1);
+      merged.putAll(m2);
+      return merged;
+    };
   }
 
   @Override

+ 0 - 21
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/MapUtil.java

@@ -1,21 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class MapUtil {
-
-  private MapUtil() {
-  }
-
-  public static <K, V> Map<K, V> removeNullValues(Map<K, V> map) {
-    return map.entrySet().stream()
-        .filter(e -> e.getValue() != null)
-        .collect(
-            Collectors.toMap(
-                Map.Entry::getKey,
-                Map.Entry::getValue
-            )
-        );
-  }
-}

+ 91 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java

@@ -2,14 +2,18 @@ package com.provectus.kafka.ui.service;
 
 import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExceptionFilter;
 import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
+import java.util.function.Function;
 import java.util.stream.Stream;
 import lombok.SneakyThrows;
 import org.apache.kafka.clients.admin.AdminClient;
@@ -18,12 +22,16 @@ import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.function.ThrowingRunnable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -96,6 +104,14 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
     clearings.add(() -> adminClient.deleteTopics(Stream.of(topics).map(NewTopic::name).toList()).all().get());
   }
 
+  void fillTopic(String topic, int msgsCnt) {
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      for (int i = 0; i < msgsCnt; i++) {
+        producer.send(topic, UUID.randomUUID().toString());
+      }
+    }
+  }
+
   @Test
   void testToMonoWithExceptionFilter() {
     var failedFuture = new KafkaFutureImpl<String>();
@@ -152,4 +168,79 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
         .verifyComplete();
   }
 
+
+  @Test
+  void testListConsumerGroupOffsets() throws Exception {
+    String topic = UUID.randomUUID().toString();
+    String anotherTopic = UUID.randomUUID().toString();
+    createTopics(new NewTopic(topic, 2, (short) 1), new NewTopic(anotherTopic, 1, (short) 1));
+    fillTopic(topic, 10);
+
+    Function<String, KafkaConsumer<String, String>> consumerSupplier = groupName -> {
+      Properties p = new Properties();
+      p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+      p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupName);
+      p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+      p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+      p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+      p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+      return new KafkaConsumer<String, String>(p);
+    };
+
+    String fullyPolledConsumer = UUID.randomUUID().toString();
+    try (KafkaConsumer<String, String> c = consumerSupplier.apply(fullyPolledConsumer)) {
+      c.subscribe(List.of(topic));
+      int polled = 0;
+      while (polled < 10) {
+        polled += c.poll(Duration.ofMillis(50)).count();
+      }
+      c.commitSync();
+    }
+
+    String polled1MsgConsumer = UUID.randomUUID().toString();
+    try (KafkaConsumer<String, String> c = consumerSupplier.apply(polled1MsgConsumer)) {
+      c.subscribe(List.of(topic));
+      c.poll(Duration.ofMillis(100));
+      c.commitSync(Map.of(tp(topic, 0), new OffsetAndMetadata(1)));
+    }
+
+    String noCommitConsumer = UUID.randomUUID().toString();
+    try (KafkaConsumer<String, String> c = consumerSupplier.apply(noCommitConsumer)) {
+      c.subscribe(List.of(topic));
+      c.poll(Duration.ofMillis(100));
+    }
+
+    Map<TopicPartition, ListOffsetsResultInfo> endOffsets = adminClient.listOffsets(Map.of(
+        tp(topic, 0), OffsetSpec.latest(),
+        tp(topic, 1), OffsetSpec.latest())).all().get();
+
+    StepVerifier.create(
+            reactiveAdminClient.listConsumerGroupOffsets(
+                List.of(fullyPolledConsumer, polled1MsgConsumer, noCommitConsumer),
+                List.of(
+                    tp(topic, 0),
+                    tp(topic, 1),
+                    tp(anotherTopic, 0))
+            )
+        ).assertNext(table -> {
+
+          assertThat(table.row(polled1MsgConsumer))
+              .containsEntry(tp(topic, 0), 1L)
+              .hasSize(1);
+
+          assertThat(table.row(noCommitConsumer))
+              .isEmpty();
+
+          assertThat(table.row(fullyPolledConsumer))
+              .containsEntry(tp(topic, 0), endOffsets.get(tp(topic, 0)).offset())
+              .containsEntry(tp(topic, 1), endOffsets.get(tp(topic, 1)).offset())
+              .hasSize(2);
+        })
+        .verifyComplete();
+  }
+
+  private static TopicPartition tp(String topic, int partition) {
+    return new TopicPartition(topic, partition);
+  }
+
 }

+ 5 - 2
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/BasePage.java

@@ -7,12 +7,13 @@ import com.codeborne.selenide.Condition;
 import com.codeborne.selenide.ElementsCollection;
 import com.codeborne.selenide.SelenideElement;
 import com.provectus.kafka.ui.utilities.WebUtils;
+import java.time.Duration;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public abstract class BasePage extends WebUtils {
 
-  protected SelenideElement loadingSpinner = $x("//*[contains(text(),'Loading')]");
+  protected SelenideElement loadingSpinner = $x("//div[@role='progressbar']");
   protected SelenideElement submitBtn = $x("//button[@type='submit']");
   protected SelenideElement tableGrid = $x("//table");
   protected SelenideElement dotMenuBtn = $x("//button[@aria-label='Dropdown Toggle']");
@@ -26,7 +27,9 @@ public abstract class BasePage extends WebUtils {
 
   protected void waitUntilSpinnerDisappear() {
     log.debug("\nwaitUntilSpinnerDisappear");
-    loadingSpinner.shouldBe(Condition.disappear);
+    if(isVisible(loadingSpinner)){
+      loadingSpinner.shouldBe(Condition.disappear, Duration.ofSeconds(30));
+    }
   }
 
   protected void clickSubmitBtn() {

+ 80 - 6
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/topic/TopicDetails.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.pages.topic;
 import static com.codeborne.selenide.Selenide.$;
 import static com.codeborne.selenide.Selenide.$$x;
 import static com.codeborne.selenide.Selenide.$x;
+import static com.codeborne.selenide.Selenide.sleep;
 import static org.apache.commons.lang.math.RandomUtils.nextInt;
 
 import com.codeborne.selenide.CollectionCondition;
@@ -11,9 +12,17 @@ import com.codeborne.selenide.ElementsCollection;
 import com.codeborne.selenide.SelenideElement;
 import com.provectus.kafka.ui.pages.BasePage;
 import io.qameta.allure.Step;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.YearMonth;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
 import org.openqa.selenium.By;
 
 public class TopicDetails extends BasePage {
@@ -24,7 +33,7 @@ public class TopicDetails extends BasePage {
   protected SelenideElement overviewTab = $x("//a[contains(text(),'Overview')]");
   protected SelenideElement messagesTab = $x("//a[contains(text(),'Messages')]");
   protected SelenideElement seekTypeDdl = $x("//ul[@id='selectSeekType']/li");
-  protected SelenideElement seekTypeField = $x("//label[text()='Seek Type']//..//input");
+  protected SelenideElement seekTypeField = $x("//label[text()='Seek Type']//..//div/input");
   protected SelenideElement addFiltersBtn = $x("//button[text()='Add Filters']");
   protected SelenideElement savedFiltersLink = $x("//div[text()='Saved Filters']");
   protected SelenideElement addFilterCodeModalTitle = $x("//label[text()='Filter code']");
@@ -33,6 +42,7 @@ public class TopicDetails extends BasePage {
   protected SelenideElement displayNameInputAddFilterMdl = $x("//input[@placeholder='Enter Name']");
   protected SelenideElement cancelBtnAddFilterMdl = $x("//button[text()='Cancel']");
   protected SelenideElement addFilterBtnAddFilterMdl = $x("//button[text()='Add filter']");
+  protected SelenideElement addFiltersBtnMessages = $x("//button[text()='Add Filters']");
   protected SelenideElement selectFilterBtnAddFilterMdl = $x("//button[text()='Select filter']");
   protected SelenideElement editSettingsMenu = $x("//li[@role][contains(text(),'Edit settings')]");
   protected SelenideElement removeTopicBtn = $x("//ul[@role='menu']//div[contains(text(),'Remove Topic')]");
@@ -43,6 +53,11 @@ public class TopicDetails extends BasePage {
   protected SelenideElement backToCreateFiltersLink = $x("//div[text()='Back To create filters']");
   protected SelenideElement confirmationMdl = $x("//div[text()= 'Confirm the action']/..");
   protected ElementsCollection messageGridItems = $$x("//tbody//tr");
+  protected SelenideElement actualCalendarDate = $x("//div[@class='react-datepicker__current-month']");
+  protected SelenideElement previousMonthButton = $x("//button[@aria-label='Previous Month']");
+  protected SelenideElement nextMonthButton = $x("//button[@aria-label='Next Month']");
+  protected SelenideElement calendarTimeFld = $x("//input[@placeholder='Time']");
+  protected String dayCellLtr = "//div[@role='option'][contains(text(),'%d')]";
   protected String seekFilterDdlLocator = "//ul[@id='selectSeekType']/ul/li[text()='%s']";
   protected String savedFilterNameLocator = "//div[@role='savedFilter']/div[contains(text(),'%s')]";
   protected String consumerIdLocator = "//a[@title='%s']";
@@ -53,7 +68,7 @@ public class TopicDetails extends BasePage {
   @Step
   public TopicDetails waitUntilScreenReady() {
     waitUntilSpinnerDisappear();
-    dotMenuBtn.shouldBe(Condition.visible);
+    overviewTab.shouldBe(Condition.visible);
     return this;
   }
 
@@ -265,6 +280,63 @@ public class TopicDetails extends BasePage {
     return contentMessage.matches(contentMessageTab.getText().trim());
   }
 
+  private void selectYear(int expectedYear) {
+    while (getActualCalendarDate().getYear() > expectedYear) {
+      clickByJavaScript(previousMonthButton);
+      sleep(1000);
+      if (LocalTime.now().plusMinutes(3).isBefore(LocalTime.now())) {
+        throw new IllegalArgumentException("Unable to select year");
+      }
+    }
+  }
+
+  private void selectMonth(int expectedMonth) {
+    while (getActualCalendarDate().getMonthValue() > expectedMonth) {
+      clickByJavaScript(previousMonthButton);
+      sleep(1000);
+      if (LocalTime.now().plusMinutes(3).isBefore(LocalTime.now())) {
+        throw new IllegalArgumentException("Unable to select month");
+      }
+    }
+  }
+
+  private void selectDay(int expectedDay) {
+    Objects.requireNonNull($$x(String.format(dayCellLtr, expectedDay)).stream()
+        .filter(day -> !Objects.requireNonNull(day.getAttribute("class")).contains("outside-month"))
+        .findFirst().orElse(null)).shouldBe(Condition.enabled).click();
+  }
+
+  private void setTime(LocalDateTime dateTime) {
+    calendarTimeFld.shouldBe(Condition.enabled)
+        .sendKeys(String.valueOf(dateTime.getHour()), String.valueOf(dateTime.getMinute()));
+  }
+
+  @Step
+  public TopicDetails selectDateAndTimeByCalendar(LocalDateTime dateTime) {
+    setTime(dateTime);
+    selectYear(dateTime.getYear());
+    selectMonth(dateTime.getMonthValue());
+    selectDay(dateTime.getDayOfMonth());
+    return this;
+  }
+
+  private LocalDate getActualCalendarDate() {
+    String monthAndYearStr = actualCalendarDate.getText().trim();
+    DateTimeFormatter formatter = new DateTimeFormatterBuilder()
+        .parseCaseInsensitive()
+        .append(DateTimeFormatter.ofPattern("MMMM yyyy"))
+        .toFormatter(Locale.ENGLISH);
+    YearMonth yearMonth = formatter.parse(monthAndYearStr, YearMonth::from);
+    return yearMonth.atDay(1);
+  }
+
+  @Step
+  public TopicDetails openCalendarSeekType(){
+    seekTypeField.shouldBe(Condition.enabled).click();
+    actualCalendarDate.shouldBe(Condition.visible);
+    return this;
+  }
+
   @Step
   public int getMessageCountAmount() {
     return Integer.parseInt(messageAmountCell.getText().trim());
@@ -278,7 +350,7 @@ public class TopicDetails extends BasePage {
   }
 
   @Step
-  public TopicDetails.MessageGridItem getMessage(int offset) {
+  public TopicDetails.MessageGridItem getMessageByOffset(int offset) {
     return initItems().stream()
         .filter(e -> e.getOffset() == offset)
         .findFirst().orElse(null);
@@ -291,7 +363,7 @@ public class TopicDetails extends BasePage {
 
   @Step
   public TopicDetails.MessageGridItem getRandomMessage() {
-    return getMessage(nextInt(initItems().size() - 1));
+    return getMessageByOffset(nextInt(initItems().size() - 1));
   }
 
   public enum TopicMenu {
@@ -340,8 +412,10 @@ public class TopicDetails extends BasePage {
     }
 
     @Step
-    public String getTimestamp() {
-      return element.$x("./td[4]/div").getText().trim();
+    public LocalDateTime getTimestamp() {
+      String timestampValue = element.$x("./td[4]/div").getText().trim();
+      DateTimeFormatter formatter = DateTimeFormatter.ofPattern("M/d/yyyy, HH:mm:ss");
+      return LocalDateTime.parse(timestampValue, formatter);
     }
 
     @Step

+ 65 - 18
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/services/ApiService.java

@@ -1,7 +1,6 @@
 package com.provectus.kafka.ui.services;
 
 import static com.codeborne.selenide.Selenide.sleep;
-import static com.provectus.kafka.ui.settings.BaseSource.BASE_LOCAL_URL;
 import static com.provectus.kafka.ui.utilities.FileUtils.fileToString;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -17,6 +16,8 @@ import com.provectus.kafka.ui.api.model.TopicCreation;
 import com.provectus.kafka.ui.models.Connector;
 import com.provectus.kafka.ui.models.Schema;
 import com.provectus.kafka.ui.models.Topic;
+import com.provectus.kafka.ui.settings.BaseSource;
+import io.qameta.allure.Step;
 import java.util.HashMap;
 import java.util.Map;
 import lombok.SneakyThrows;
@@ -25,7 +26,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
 
 
 @Slf4j
-public class ApiService {
+public class ApiService extends BaseSource {
 
     @SneakyThrows
     private TopicsApi topicApi() {
@@ -48,7 +49,7 @@ public class ApiService {
     }
 
     @SneakyThrows
-    public void createTopic(String clusterName, String topicName) {
+    private void createTopic(String clusterName, String topicName) {
       TopicCreation topic = new TopicCreation();
       topic.setName(topicName);
       topic.setPartitions(1);
@@ -61,15 +62,28 @@ public class ApiService {
       }
     }
 
-    public void deleteTopic(String clusterName, String topicName) {
+    @Step
+    public ApiService createTopic(String topicName) {
+      createTopic(CLUSTER_NAME, topicName);
+      return this;
+    }
+
+    @SneakyThrows
+    private void deleteTopic(String clusterName, String topicName) {
         try {
             topicApi().deleteTopic(clusterName, topicName).block();
         } catch (WebClientResponseException ignore) {
         }
     }
 
+    @Step
+    public ApiService deleteTopic(String topicName){
+      deleteTopic(CLUSTER_NAME, topicName);
+      return this;
+    }
+
     @SneakyThrows
-    public void createSchema(String clusterName, Schema schema) {
+    private void createSchema(String clusterName, Schema schema) {
         NewSchemaSubject schemaSubject = new NewSchemaSubject();
         schemaSubject.setSubject(schema.getName());
         schemaSubject.setSchema(fileToString(schema.getValuePath()));
@@ -81,24 +95,42 @@ public class ApiService {
         }
     }
 
+    @Step
+    public ApiService createSchema(Schema schema){
+      createSchema(CLUSTER_NAME, schema);
+      return this;
+    }
+
     @SneakyThrows
-    public void deleteSchema(String clusterName, String schemaName) {
+    private void deleteSchema(String clusterName, String schemaName) {
         try {
             schemaApi().deleteSchema(clusterName, schemaName).block();
         } catch (WebClientResponseException ignore) {
         }
     }
 
+    @Step
+    public ApiService deleteSchema(String schemaName){
+      deleteSchema(CLUSTER_NAME, schemaName);
+      return this;
+    }
+
     @SneakyThrows
-    public void deleteConnector(String clusterName, String connectName, String connectorName) {
+    private void deleteConnector(String clusterName, String connectName, String connectorName) {
         try {
             connectorApi().deleteConnector(clusterName, connectName, connectorName).block();
         } catch (WebClientResponseException ignore) {
         }
     }
 
+    @Step
+    public ApiService deleteConnector(String connectName, String connectorName){
+      deleteConnector(CLUSTER_NAME, connectName, connectorName);
+      return this;
+    }
+
     @SneakyThrows
-    public void createConnector(String clusterName, String connectName, Connector connector) {
+    private void createConnector(String clusterName, String connectName, Connector connector) {
         NewConnector connectorProperties = new NewConnector();
         connectorProperties.setName(connector.getName());
         Map<String, Object> configMap = new ObjectMapper().readValue(connector.getConfig(), HashMap.class);
@@ -110,20 +142,35 @@ public class ApiService {
         connectorApi().createConnector(clusterName, connectName, connectorProperties).block();
     }
 
+    @Step
+    public ApiService createConnector(String connectName, Connector connector){
+      createConnector(CLUSTER_NAME, connectName, connector);
+      return this;
+    }
+
+    @Step
     public String getFirstConnectName(String clusterName) {
         return connectorApi().getConnects(clusterName).blockFirst().getName();
     }
 
     @SneakyThrows
-    public void sendMessage(String clusterName, Topic topic) {
-        CreateTopicMessage createMessage = new CreateTopicMessage();
-        createMessage.partition(0);
-        createMessage.setContent(topic.getMessageContent());
-        createMessage.setKey(topic.getMessageKey());
-        try {
-            messageApi().sendTopicMessages(clusterName, topic.getName(), createMessage).block();
-        } catch (WebClientResponseException ex) {
-            ex.getRawStatusCode();
-        }
+    private void sendMessage(String clusterName, Topic topic) {
+      CreateTopicMessage createMessage = new CreateTopicMessage();
+      createMessage.setPartition(0);
+      createMessage.setKeySerde("String");
+      createMessage.setValueSerde("String");
+      createMessage.setKey(topic.getMessageKey());
+      createMessage.setContent(topic.getMessageContent());
+      try {
+        messageApi().sendTopicMessages(clusterName, topic.getName(), createMessage).block();
+      } catch (WebClientResponseException ex) {
+        ex.getRawStatusCode();
+      }
+    }
+
+    @Step
+    public ApiService sendMessage(Topic topic) {
+      sendMessage(CLUSTER_NAME, topic);
+      return this;
     }
 }

+ 16 - 0
kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/utilities/TimeUtils.java

@@ -0,0 +1,16 @@
+package com.provectus.kafka.ui.utilities;
+
+import static com.codeborne.selenide.Selenide.sleep;
+
+import java.time.LocalTime;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TimeUtils {
+
+  public static void waitUntilNewMinuteStarted(){
+    int secondsLeft = 60 - LocalTime.now().getSecond();
+    log.debug("\nwaitUntilNewMinuteStarted: {}s", secondsLeft);
+    sleep(secondsLeft * 1000);
+  }
+}

+ 1 - 0
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/base/BaseTest.java

@@ -62,6 +62,7 @@ public abstract class BaseTest extends Facade {
                 .addArguments("--disable-gpu")
                 .addArguments("--no-sandbox")
                 .addArguments("--verbose")
+                .addArguments("--lang=es")
             )
             .withLogConsumer(new Slf4jLogConsumer(log).withPrefix("[CHROME]: "));
         try {

+ 10 - 11
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/suite/connectors/ConnectorsTests.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.suite.connectors;
 
 import static com.provectus.kafka.ui.pages.BasePage.AlertHeader.SUCCESS;
 import static com.provectus.kafka.ui.pages.NaviSideBar.SideMenuOption.KAFKA_CONNECT;
-import static com.provectus.kafka.ui.settings.BaseSource.CLUSTER_NAME;
 import static com.provectus.kafka.ui.utilities.FileUtils.getResourceAsString;
 import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
 
@@ -50,14 +49,14 @@ public class ConnectorsTests extends BaseTest {
 
     @BeforeAll
     public void beforeAll() {
-        TOPIC_LIST.addAll(List.of(TOPIC_FOR_CREATE, TOPIC_FOR_DELETE, TOPIC_FOR_UPDATE));
-        TOPIC_LIST.forEach(topic -> {
-            apiService.createTopic(CLUSTER_NAME, topic.getName());
-            apiService.sendMessage(CLUSTER_NAME, topic);
-        });
-        CONNECTOR_LIST.addAll(List.of(CONNECTOR_FOR_DELETE, CONNECTOR_FOR_UPDATE));
-        CONNECTOR_LIST.forEach(connector -> apiService
-                .createConnector(CLUSTER_NAME, CONNECT_NAME, connector));
+      TOPIC_LIST.addAll(List.of(TOPIC_FOR_CREATE, TOPIC_FOR_DELETE, TOPIC_FOR_UPDATE));
+      TOPIC_LIST.forEach(topic -> apiService
+          .createTopic(topic.getName())
+          .sendMessage(topic)
+      );
+      CONNECTOR_LIST.addAll(List.of(CONNECTOR_FOR_DELETE, CONNECTOR_FOR_UPDATE));
+      CONNECTOR_LIST.forEach(connector -> apiService
+          .createConnector(CONNECT_NAME, connector));
     }
 
     @DisplayName("should create a connector")
@@ -120,8 +119,8 @@ public class ConnectorsTests extends BaseTest {
     @AfterAll
     public void afterAll() {
         CONNECTOR_LIST.forEach(connector ->
-                apiService.deleteConnector(CLUSTER_NAME, CONNECT_NAME, connector.getName()));
-        TOPIC_LIST.forEach(topic -> apiService.deleteTopic(CLUSTER_NAME, topic.getName()));
+                apiService.deleteConnector(CONNECT_NAME, connector.getName()));
+        TOPIC_LIST.forEach(topic -> apiService.deleteTopic(topic.getName()));
     }
 
     @Step

+ 2 - 3
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/suite/schemas/SchemasTests.java

@@ -1,7 +1,6 @@
 package com.provectus.kafka.ui.suite.schemas;
 
 import static com.provectus.kafka.ui.pages.NaviSideBar.SideMenuOption.SCHEMA_REGISTRY;
-import static com.provectus.kafka.ui.settings.BaseSource.CLUSTER_NAME;
 import static com.provectus.kafka.ui.utilities.FileUtils.fileToString;
 
 import com.codeborne.selenide.Condition;
@@ -41,7 +40,7 @@ public class SchemasTests extends BaseTest {
     @SneakyThrows
     public void beforeAll() {
         SCHEMA_LIST.addAll(List.of(AVRO_API, JSON_API, PROTOBUF_API));
-        SCHEMA_LIST.forEach(schema -> apiService.createSchema(CLUSTER_NAME, schema));
+        SCHEMA_LIST.forEach(schema -> apiService.createSchema(schema));
     }
 
     @DisplayName("should create AVRO schema")
@@ -228,7 +227,7 @@ public class SchemasTests extends BaseTest {
 
     @AfterAll
     public void afterAll() {
-        SCHEMA_LIST.forEach(schema -> apiService.deleteSchema(CLUSTER_NAME, schema.getName()));
+        SCHEMA_LIST.forEach(schema -> apiService.deleteSchema(schema.getName()));
     }
 
     @Step

+ 70 - 24
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/suite/topics/TopicMessagesTests.java

@@ -2,8 +2,8 @@ package com.provectus.kafka.ui.suite.topics;
 
 import static com.provectus.kafka.ui.pages.BasePage.AlertHeader.SUCCESS;
 import static com.provectus.kafka.ui.pages.topic.TopicDetails.TopicMenu.MESSAGES;
-import static com.provectus.kafka.ui.settings.BaseSource.CLUSTER_NAME;
-import static com.provectus.kafka.ui.utilities.FileUtils.fileToString;
+import static com.provectus.kafka.ui.pages.topic.TopicDetails.TopicMenu.OVERVIEW;
+import static com.provectus.kafka.ui.utilities.TimeUtils.waitUntilNewMinuteStarted;
 import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -15,8 +15,12 @@ import com.provectus.kafka.ui.utilities.qaseIoUtils.annotations.Suite;
 import com.provectus.kafka.ui.utilities.qaseIoUtils.enums.Status;
 import io.qameta.allure.Issue;
 import io.qase.api.annotation.CaseId;
+import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -32,22 +36,29 @@ public class TopicMessagesTests extends BaseTest {
   private static final String SUITE_TITLE = "Topics";
   private static final Topic TOPIC_FOR_MESSAGES = new Topic()
       .setName("topic-with-clean-message-attribute-" + randomAlphabetic(5))
-      .setMessageKey(fileToString(System.getProperty("user.dir") + "/src/test/resources/producedkey.txt"))
-      .setMessageContent(fileToString(System.getProperty("user.dir") + "/src/test/resources/testData.txt"));
+      .setMessageKey(randomAlphabetic(5))
+      .setMessageContent(randomAlphabetic(10));
   private static final Topic TOPIC_TO_CLEAR_MESSAGES = new Topic()
       .setName("topic-to-clear-message-attribute-" + randomAlphabetic(5))
-      .setMessageKey(fileToString(System.getProperty("user.dir") + "/src/test/resources/producedkey.txt"))
-      .setMessageContent(fileToString(System.getProperty("user.dir") + "/src/test/resources/testData.txt"));
+      .setMessageKey(randomAlphabetic(5))
+      .setMessageContent(randomAlphabetic(10));
+  private static final Topic TOPIC_FOR_CHECKING_FILTERS = new Topic()
+      .setName("topic_for_checking_filters" + randomAlphabetic(5))
+      .setMessageKey(randomAlphabetic(5))
+      .setMessageContent(randomAlphabetic(10));
   private static final Topic TOPIC_TO_RECREATE = new Topic()
       .setName("topic-to-recreate-attribute-" + randomAlphabetic(5))
-      .setMessageKey(fileToString(System.getProperty("user.dir") + "/src/test/resources/producedkey.txt"))
-      .setMessageContent(fileToString(System.getProperty("user.dir") + "/src/test/resources/testData.txt"));
+      .setMessageKey(randomAlphabetic(5))
+      .setMessageContent(randomAlphabetic(10));
   private static final List<Topic> TOPIC_LIST = new ArrayList<>();
 
   @BeforeAll
   public void beforeAll() {
-    TOPIC_LIST.addAll(List.of(TOPIC_FOR_MESSAGES, TOPIC_TO_CLEAR_MESSAGES, TOPIC_TO_RECREATE));
-    TOPIC_LIST.forEach(topic -> apiService.createTopic(CLUSTER_NAME, topic.getName()));
+    TOPIC_LIST.addAll(List.of(TOPIC_FOR_MESSAGES, TOPIC_FOR_CHECKING_FILTERS, TOPIC_TO_CLEAR_MESSAGES, TOPIC_TO_RECREATE));
+    TOPIC_LIST.forEach(topic -> apiService.createTopic(topic.getName()));
+    IntStream.range(1, 3).forEach(i -> apiService.sendMessage(TOPIC_FOR_CHECKING_FILTERS));
+    waitUntilNewMinuteStarted();
+    IntStream.range(1, 3).forEach(i -> apiService.sendMessage(TOPIC_FOR_CHECKING_FILTERS));
   }
 
   @DisplayName("produce message")
@@ -58,7 +69,7 @@ public class TopicMessagesTests extends BaseTest {
   void produceMessage() {
     navigateToTopicsAndOpenDetails(TOPIC_FOR_MESSAGES.getName());
     topicDetails
-        .openDetailsTab(TopicDetails.TopicMenu.MESSAGES)
+        .openDetailsTab(MESSAGES)
         .clickProduceMessageBtn();
     produceMessagePanel
         .waitUntilScreenReady()
@@ -85,7 +96,7 @@ public class TopicMessagesTests extends BaseTest {
   void clearMessage() {
     navigateToTopicsAndOpenDetails(TOPIC_FOR_MESSAGES.getName());
     topicDetails
-        .openDetailsTab(TopicDetails.TopicMenu.OVERVIEW)
+        .openDetailsTab(OVERVIEW)
         .clickProduceMessageBtn();
     int messageAmount = topicDetails.getMessageCountAmount();
     produceMessagePanel
@@ -111,35 +122,70 @@ public class TopicMessagesTests extends BaseTest {
   @CaseId(21)
   @Test
   void copyMessageFromTopicProfile() {
-    navigateToTopicsAndOpenDetails("_schemas");
+    navigateToTopicsAndOpenDetails(TOPIC_FOR_CHECKING_FILTERS.getName());
     topicDetails
-        .openDetailsTab(TopicDetails.TopicMenu.MESSAGES)
+        .openDetailsTab(MESSAGES)
         .getRandomMessage()
         .openDotMenu()
         .clickCopyToClipBoard();
-    Assertions.assertTrue(topicDetails.isAlertWithMessageVisible(SUCCESS,"Copied successfully!"),
+    Assertions.assertTrue(topicDetails.isAlertWithMessageVisible(SUCCESS, "Copied successfully!"),
         "isAlertWithMessageVisible()");
   }
 
   @Disabled
-  @Issue("https://github.com/provectus/kafka-ui/issues/2856")
+  @Issue("https://github.com/provectus/kafka-ui/issues/2394")
   @DisplayName("Checking messages filtering by Offset within Topic/Messages")
   @Suite(suiteId = SUITE_ID, title = SUITE_TITLE)
   @AutomationStatus(status = Status.AUTOMATED)
   @CaseId(15)
   @Test
   void checkingMessageFilteringByOffset() {
-    String offsetValue = "2";
-    navigateToTopicsAndOpenDetails("_schemas");
+    navigateToTopicsAndOpenDetails(TOPIC_FOR_CHECKING_FILTERS.getName());
+    topicDetails
+        .openDetailsTab(MESSAGES);
+    TopicDetails.MessageGridItem secondMessage = topicDetails.getMessageByOffset(1);
     topicDetails
-        .openDetailsTab(MESSAGES)
         .selectSeekTypeDdlMessagesTab("Offset")
-        .setSeekTypeValueFldMessagesTab(offsetValue)
+        .setSeekTypeValueFldMessagesTab(String.valueOf(secondMessage.getOffset()))
+        .clickSubmitFiltersBtnMessagesTab();
+    SoftAssertions softly = new SoftAssertions();
+    topicDetails.getAllMessages().forEach(message ->
+        softly.assertThat(message.getOffset() == secondMessage.getOffset()
+                || message.getOffset() > secondMessage.getOffset())
+            .as(String.format("Expected offset is: %s, but found: %s", secondMessage.getOffset(), message.getOffset()))
+            .isTrue());
+    softly.assertAll();
+  }
+
+  @Disabled
+  @Issue("https://github.com/provectus/kafka-ui/issues/3215")
+  @Issue("https://github.com/provectus/kafka-ui/issues/2345")
+  @DisplayName("Checking messages filtering by Timestamp within Messages/Topic")
+  @Suite(suiteId = SUITE_ID, title = SUITE_TITLE)
+  @AutomationStatus(status = Status.AUTOMATED)
+  @CaseId(16)
+  @Test
+  void checkingMessageFilteringByTimestamp() {
+    navigateToTopicsAndOpenDetails(TOPIC_FOR_CHECKING_FILTERS.getName());
+    topicDetails
+        .openDetailsTab(MESSAGES);
+    LocalDateTime firstTimestamp = topicDetails.getMessageByOffset(0).getTimestamp();
+    List<TopicDetails.MessageGridItem> nextMessages = topicDetails.getAllMessages().stream()
+        .filter(message -> message.getTimestamp().getMinute() != firstTimestamp.getMinute())
+        .collect(Collectors.toList());
+    LocalDateTime nextTimestamp = Objects.requireNonNull(nextMessages.stream()
+        .findFirst().orElse(null)).getTimestamp();
+    topicDetails
+        .selectSeekTypeDdlMessagesTab("Timestamp")
+        .openCalendarSeekType()
+        .selectDateAndTimeByCalendar(nextTimestamp)
         .clickSubmitFiltersBtnMessagesTab();
     SoftAssertions softly = new SoftAssertions();
-    topicDetails.getAllMessages()
-        .forEach(messages -> softly.assertThat(messages.getOffset() == Integer.parseInt(offsetValue))
-        .as("getAllMessages()").isTrue());
+    topicDetails.getAllMessages().forEach(message ->
+        softly.assertThat(message.getTimestamp().isEqual(nextTimestamp)
+                || message.getTimestamp().isAfter(nextTimestamp))
+            .as(String.format("Expected timestamp is: %s, but found: %s", nextTimestamp, message.getTimestamp()))
+            .isTrue());
     softly.assertAll();
   }
 
@@ -215,6 +261,6 @@ public class TopicMessagesTests extends BaseTest {
 
   @AfterAll
   public void afterAll() {
-    TOPIC_LIST.forEach(topic -> apiService.deleteTopic(CLUSTER_NAME, topic.getName()));
+    TOPIC_LIST.forEach(topic -> apiService.deleteTopic(topic.getName()));
   }
 }

+ 5 - 7
kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/suite/topics/TopicsTests.java

@@ -9,11 +9,9 @@ import static com.provectus.kafka.ui.pages.topic.enums.CustomParameterType.COMPR
 import static com.provectus.kafka.ui.pages.topic.enums.MaxSizeOnDisk.NOT_SET;
 import static com.provectus.kafka.ui.pages.topic.enums.MaxSizeOnDisk.SIZE_1_GB;
 import static com.provectus.kafka.ui.pages.topic.enums.MaxSizeOnDisk.SIZE_20_GB;
-import static com.provectus.kafka.ui.settings.BaseSource.CLUSTER_NAME;
-import static com.provectus.kafka.ui.utilities.FileUtils.fileToString;
 import static org.apache.commons.lang.RandomStringUtils.randomAlphabetic;
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.apache.commons.lang3.RandomUtils.nextInt;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import com.codeborne.selenide.Condition;
 import com.provectus.kafka.ui.base.BaseTest;
@@ -56,8 +54,8 @@ public class TopicsTests extends BaseTest {
       .setTimeToRetainData("604800001")
       .setMaxSizeOnDisk(SIZE_20_GB)
       .setMaxMessageBytes("1000020")
-      .setMessageKey(fileToString(System.getProperty("user.dir") + "/src/test/resources/producedkey.txt"))
-      .setMessageContent(fileToString(System.getProperty("user.dir") + "/src/test/resources/testData.txt"));
+      .setMessageKey(randomAlphabetic(5))
+      .setMessageContent(randomAlphabetic(10));
   private static final Topic TOPIC_TO_CHECK_SETTINGS = new Topic()
       .setName("new-topic-" + randomAlphabetic(5))
       .setNumberOfPartitions(1)
@@ -69,7 +67,7 @@ public class TopicsTests extends BaseTest {
   @BeforeAll
   public void beforeAll() {
     TOPIC_LIST.addAll(List.of(TOPIC_TO_UPDATE, TOPIC_FOR_DELETE));
-    TOPIC_LIST.forEach(topic -> apiService.createTopic(CLUSTER_NAME, topic.getName()));
+    TOPIC_LIST.forEach(topic -> apiService.createTopic(topic.getName()));
   }
 
   @DisplayName("should create a topic")
@@ -505,6 +503,6 @@ public class TopicsTests extends BaseTest {
 
   @AfterAll
   public void afterAll() {
-    TOPIC_LIST.forEach(topic -> apiService.deleteTopic(CLUSTER_NAME, topic.getName()));
+    TOPIC_LIST.forEach(topic -> apiService.deleteTopic(topic.getName()));
   }
 }

+ 0 - 1
kafka-ui-e2e-checks/src/test/resources/producedkey.txt

@@ -1 +0,0 @@
-"key"

+ 0 - 1
kafka-ui-e2e-checks/src/test/resources/testData.txt

@@ -1 +0,0 @@
-"print"