Bläddra i källkod

Consumer groups loading performance impr (#3188)

ISSUE-3148, ISSUE-3188: Topic consumers retrieval performance impr
1. batch listConsumerGroupOffsets method implemented
2. describeConsumerGroups parallelized
3. minor improvements in ReactiveAdminClient
4. (not related to perf) ConsumerGroupMapper messagesBehind setting fixed
Ilya Kuramshin 2 år sedan
förälder
incheckning
578468d090

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java

@@ -189,8 +189,8 @@ public class ConsumerGroupsController extends AbstractController implements Cons
   private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage
   private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage
                                                         consumerGroupConsumerGroupsPage) {
                                                         consumerGroupConsumerGroupsPage) {
     return new ConsumerGroupsPageResponseDTO()
     return new ConsumerGroupsPageResponseDTO()
-        .pageCount(consumerGroupConsumerGroupsPage.getTotalPages())
-        .consumerGroups(consumerGroupConsumerGroupsPage.getConsumerGroups()
+        .pageCount(consumerGroupConsumerGroupsPage.totalPages())
+        .consumerGroups(consumerGroupConsumerGroupsPage.consumerGroups()
             .stream()
             .stream()
             .map(ConsumerGroupMapper::toDto)
             .map(ConsumerGroupMapper::toDto)
             .collect(Collectors.toList()));
             .collect(Collectors.toList()));

+ 11 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java

@@ -89,13 +89,17 @@ public class ConsumerGroupMapper {
             .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
             .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
     ).collect(Collectors.toSet()).size();
     ).collect(Collectors.toSet()).size();
 
 
-    long messagesBehind = c.getOffsets().entrySet().stream()
-        .mapToLong(e ->
-            Optional.ofNullable(c.getEndOffsets())
-                .map(o -> o.get(e.getKey()))
-                .map(o -> o - e.getValue())
-                .orElse(0L)
-        ).sum();
+    Long messagesBehind = null;
+    // messagesBehind should be undefined if no committed offsets found for topic
+    if (!c.getOffsets().isEmpty()) {
+      messagesBehind = c.getOffsets().entrySet().stream()
+          .mapToLong(e ->
+              Optional.ofNullable(c.getEndOffsets())
+                  .map(o -> o.get(e.getKey()))
+                  .map(o -> o - e.getValue())
+                  .orElse(0L)
+          ).sum();
+    }
 
 
     consumerGroup.setMessagesBehind(messagesBehind);
     consumerGroup.setMessagesBehind(messagesBehind);
     consumerGroup.setTopics(numTopics);
     consumerGroup.setTopics(numTopics);

+ 102 - 89
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service;
 package com.provectus.kafka.ui.service;
 
 
+import com.google.common.collect.Table;
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
 import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
@@ -7,6 +8,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
@@ -14,22 +16,21 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Properties;
 import java.util.function.ToIntFunction;
 import java.util.function.ToIntFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
-import lombok.Value;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Bytes;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 
 @Service
 @Service
 @RequiredArgsConstructor
 @RequiredArgsConstructor
@@ -41,21 +42,16 @@ public class ConsumerGroupService {
   private Mono<List<InternalConsumerGroup>> getConsumerGroups(
   private Mono<List<InternalConsumerGroup>> getConsumerGroups(
       ReactiveAdminClient ac,
       ReactiveAdminClient ac,
       List<ConsumerGroupDescription> descriptions) {
       List<ConsumerGroupDescription> descriptions) {
-    return Flux.fromIterable(descriptions)
-        // 1. getting committed offsets for all groups
-        .flatMap(desc -> ac.listConsumerGroupOffsets(desc.groupId())
-            .map(offsets -> Tuples.of(desc, offsets)))
-        .collectMap(Tuple2::getT1, Tuple2::getT2)
-        .flatMap((Map<ConsumerGroupDescription, Map<TopicPartition, Long>> groupOffsetsMap) -> {
-          var tpsFromGroupOffsets = groupOffsetsMap.values().stream()
-              .flatMap(v -> v.keySet().stream())
-              .collect(Collectors.toSet());
+    var groupNames = descriptions.stream().map(ConsumerGroupDescription::groupId).toList();
+    // 1. getting committed offsets for all groups
+    return ac.listConsumerGroupOffsets(groupNames, null)
+        .flatMap((Table<String, TopicPartition, Long> committedOffsets) -> {
           // 2. getting end offsets for partitions with committed offsets
           // 2. getting end offsets for partitions with committed offsets
-          return ac.listOffsets(tpsFromGroupOffsets, OffsetSpec.latest(), false)
+          return ac.listOffsets(committedOffsets.columnKeySet(), OffsetSpec.latest(), false)
               .map(endOffsets ->
               .map(endOffsets ->
                   descriptions.stream()
                   descriptions.stream()
                       .map(desc -> {
                       .map(desc -> {
-                        var groupOffsets = groupOffsetsMap.get(desc);
+                        var groupOffsets = committedOffsets.row(desc.groupId());
                         var endOffsetsForGroup = new HashMap<>(endOffsets);
                         var endOffsetsForGroup = new HashMap<>(endOffsets);
                         endOffsetsForGroup.keySet().retainAll(groupOffsets.keySet());
                         endOffsetsForGroup.keySet().retainAll(groupOffsets.keySet());
                         // 3. gathering description & offsets
                         // 3. gathering description & offsets
@@ -73,105 +69,122 @@ public class ConsumerGroupService {
             .flatMap(endOffsets -> {
             .flatMap(endOffsets -> {
               var tps = new ArrayList<>(endOffsets.keySet());
               var tps = new ArrayList<>(endOffsets.keySet());
               // 2. getting all consumer groups
               // 2. getting all consumer groups
-              return describeConsumerGroups(ac, null)
-                  .flatMap((List<ConsumerGroupDescription> groups) ->
-                      Flux.fromIterable(groups)
-                          // 3. for each group trying to find committed offsets for topic
-                          .flatMap(g ->
-                              ac.listConsumerGroupOffsets(g.groupId(), tps)
-                                  // 4. keeping only groups that relates to topic
-                                  .filter(offsets -> isConsumerGroupRelatesToTopic(topic, g, offsets))
-                                  // 5. constructing results
-                                  .map(offsets -> InternalTopicConsumerGroup.create(topic, g, offsets, endOffsets))
-                          ).collectList());
+              return describeConsumerGroups(ac)
+                  .flatMap((List<ConsumerGroupDescription> groups) -> {
+                        // 3. trying to find committed offsets for topic
+                        var groupNames = groups.stream().map(ConsumerGroupDescription::groupId).toList();
+                        return ac.listConsumerGroupOffsets(groupNames, tps).map(offsets ->
+                            groups.stream()
+                                // 4. keeping only groups that relates to topic
+                                .filter(g -> isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId())))
+                                .map(g ->
+                                    // 5. constructing results
+                                    InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets))
+                                .toList()
+                        );
+                      }
+                  );
             }));
             }));
   }
   }
 
 
   private boolean isConsumerGroupRelatesToTopic(String topic,
   private boolean isConsumerGroupRelatesToTopic(String topic,
                                                 ConsumerGroupDescription description,
                                                 ConsumerGroupDescription description,
-                                                Map<TopicPartition, Long> committedGroupOffsetsForTopic) {
+                                                boolean hasCommittedOffsets) {
     boolean hasActiveMembersForTopic = description.members()
     boolean hasActiveMembersForTopic = description.members()
         .stream()
         .stream()
         .anyMatch(m -> m.assignment().topicPartitions().stream().anyMatch(tp -> tp.topic().equals(topic)));
         .anyMatch(m -> m.assignment().topicPartitions().stream().anyMatch(tp -> tp.topic().equals(topic)));
-    boolean hasCommittedOffsets = !committedGroupOffsetsForTopic.isEmpty();
     return hasActiveMembersForTopic || hasCommittedOffsets;
     return hasActiveMembersForTopic || hasCommittedOffsets;
   }
   }
 
 
-  @Value
-  public static class ConsumerGroupsPage {
-    List<InternalConsumerGroup> consumerGroups;
-    int totalPages;
+  public record ConsumerGroupsPage(List<InternalConsumerGroup> consumerGroups, int totalPages) {
   }
   }
 
 
   public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
   public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
       KafkaCluster cluster,
       KafkaCluster cluster,
-      int page,
+      int pageNum,
       int perPage,
       int perPage,
       @Nullable String search,
       @Nullable String search,
       ConsumerGroupOrderingDTO orderBy,
       ConsumerGroupOrderingDTO orderBy,
       SortOrderDTO sortOrderDto) {
       SortOrderDTO sortOrderDto) {
-    var comparator = sortOrderDto.equals(SortOrderDTO.ASC)
-        ? getPaginationComparator(orderBy)
-        : getPaginationComparator(orderBy).reversed();
     return adminClientService.get(cluster).flatMap(ac ->
     return adminClientService.get(cluster).flatMap(ac ->
-        describeConsumerGroups(ac, search).flatMap(descriptions ->
-            getConsumerGroups(
-                ac,
-                descriptions.stream()
-                    .sorted(comparator)
-                    .skip((long) (page - 1) * perPage)
-                    .limit(perPage)
-                    .collect(Collectors.toList())
+        ac.listConsumerGroups()
+            .map(listing -> search == null
+                ? listing
+                : listing.stream()
+                .filter(g -> StringUtils.containsIgnoreCase(g.groupId(), search))
+                .toList()
             )
             )
-                .flatMapMany(Flux::fromIterable)
-                .filterWhen(
-                    cg -> accessControlService.isConsumerGroupAccessible(cg.getGroupId(), cluster.getName()))
-                .collect(Collectors.toList())
-                .map(cgs -> new ConsumerGroupsPage(
-                    cgs,
-                    (descriptions.size() / perPage) + (descriptions.size() % perPage == 0 ? 0 : 1))))
-    );
+            .flatMapIterable(lst -> lst)
+            .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName()))
+            .collectList()
+            .flatMap(allGroups ->
+                loadSortedDescriptions(ac, allGroups, pageNum, perPage, orderBy, sortOrderDto)
+                    .flatMap(descriptions -> getConsumerGroups(ac, descriptions)
+                        .map(page -> new ConsumerGroupsPage(
+                            page,
+                            (allGroups.size() / perPage) + (allGroups.size() % perPage == 0 ? 0 : 1))))));
   }
   }
 
 
-  private Comparator<ConsumerGroupDescription> getPaginationComparator(ConsumerGroupOrderingDTO
-                                                                           orderBy) {
-    switch (orderBy) {
-      case NAME:
-        return Comparator.comparing(ConsumerGroupDescription::groupId);
-      case STATE:
-        ToIntFunction<ConsumerGroupDescription> statesPriorities = cg -> {
-          switch (cg.state()) {
-            case STABLE:
-              return 0;
-            case COMPLETING_REBALANCE:
-              return 1;
-            case PREPARING_REBALANCE:
-              return 2;
-            case EMPTY:
-              return 3;
-            case DEAD:
-              return 4;
-            case UNKNOWN:
-              return 5;
-            default:
-              return 100;
-          }
-        };
-        return Comparator.comparingInt(statesPriorities);
-      case MEMBERS:
-        return Comparator.comparingInt(cg -> cg.members().size());
-      default:
-        throw new IllegalStateException("Unsupported order by: " + orderBy);
-    }
+  private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdminClient ac,
+                                                                      List<ConsumerGroupListing> groups,
+                                                                      int pageNum,
+                                                                      int perPage,
+                                                                      ConsumerGroupOrderingDTO orderBy,
+                                                                      SortOrderDTO sortOrderDto) {
+    return switch (orderBy) {
+      case NAME -> {
+        Comparator<ConsumerGroupListing> comparator = Comparator.comparing(ConsumerGroupListing::groupId);
+        yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
+      }
+      case STATE -> {
+        ToIntFunction<ConsumerGroupListing> statesPriorities =
+            cg -> switch (cg.state().orElse(ConsumerGroupState.UNKNOWN)) {
+              case STABLE -> 0;
+              case COMPLETING_REBALANCE -> 1;
+              case PREPARING_REBALANCE -> 2;
+              case EMPTY -> 3;
+              case DEAD -> 4;
+              case UNKNOWN -> 5;
+            };
+        var comparator = Comparator.comparingInt(statesPriorities);
+        yield loadDescriptionsByListings(ac, groups, comparator, pageNum, perPage, sortOrderDto);
+      }
+      case MEMBERS -> {
+        var comparator = Comparator.<ConsumerGroupDescription>comparingInt(cg -> cg.members().size());
+        var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
+        yield ac.describeConsumerGroups(groupNames)
+            .map(descriptions ->
+                sortAndPaginate(descriptions.values(), comparator, pageNum, perPage, sortOrderDto).toList());
+      }
+    };
   }
   }
 
 
-  private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdminClient ac,
-                                                                      @Nullable String search) {
-    return ac.listConsumerGroups()
-        .map(groupIds -> groupIds
-            .stream()
-            .filter(groupId -> search == null || StringUtils.containsIgnoreCase(groupId, search))
-            .collect(Collectors.toList()))
+  private Mono<List<ConsumerGroupDescription>> loadDescriptionsByListings(ReactiveAdminClient ac,
+                                                                          List<ConsumerGroupListing> listings,
+                                                                          Comparator<ConsumerGroupListing> comparator,
+                                                                          int pageNum,
+                                                                          int perPage,
+                                                                          SortOrderDTO sortOrderDto) {
+    List<String> sortedGroups = sortAndPaginate(listings, comparator, pageNum, perPage, sortOrderDto)
+        .map(ConsumerGroupListing::groupId)
+        .toList();
+    return ac.describeConsumerGroups(sortedGroups)
+        .map(descrMap -> sortedGroups.stream().map(descrMap::get).toList());
+  }
+
+  private <T> Stream<T> sortAndPaginate(Collection<T> collection,
+                                        Comparator<T> comparator,
+                                        int pageNum,
+                                        int perPage,
+                                        SortOrderDTO sortOrderDto) {
+    return collection.stream()
+        .sorted(sortOrderDto == SortOrderDTO.ASC ? comparator : comparator.reversed())
+        .skip((long) (pageNum - 1) * perPage)
+        .limit(perPage);
+  }
+
+  private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdminClient ac) {
+    return ac.listConsumerGroupNames()
         .flatMap(ac::describeConsumerGroups)
         .flatMap(ac::describeConsumerGroups)
         .map(cgs -> new ArrayList<>(cgs.values()));
         .map(cgs -> new ArrayList<>(cgs.values()));
   }
   }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java

@@ -98,7 +98,7 @@ public class OffsetsResetService {
         .flatMap(ac ->
         .flatMap(ac ->
             // we need to call listConsumerGroups() to check group existence, because
             // we need to call listConsumerGroups() to check group existence, because
             // describeConsumerGroups() will return consumer group even if it doesn't exist
             // describeConsumerGroups() will return consumer group even if it doesn't exist
-            ac.listConsumerGroups()
+            ac.listConsumerGroupNames()
                 .filter(cgs -> cgs.stream().anyMatch(g -> g.equals(groupId)))
                 .filter(cgs -> cgs.stream().anyMatch(g -> g.equals(groupId)))
                 .flatMap(cgs -> ac.describeConsumerGroups(List.of(groupId)))
                 .flatMap(cgs -> ac.describeConsumerGroups(List.of(groupId)))
                 .filter(cgs -> cgs.containsKey(groupId))
                 .filter(cgs -> cgs.containsKey(groupId))

+ 83 - 36
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -4,12 +4,12 @@ import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Table;
 import com.provectus.kafka.ui.exception.IllegalEntityStateException;
 import com.provectus.kafka.ui.exception.IllegalEntityStateException;
 import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.util.MapUtil;
 import com.provectus.kafka.ui.util.NumberUtil;
 import com.provectus.kafka.ui.util.NumberUtil;
 import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
 import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
 import java.io.Closeable;
 import java.io.Closeable;
@@ -18,7 +18,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
@@ -45,7 +44,7 @@ import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.DescribeClusterOptions;
 import org.apache.kafka.clients.admin.DescribeClusterOptions;
 import org.apache.kafka.clients.admin.DescribeClusterResult;
 import org.apache.kafka.clients.admin.DescribeClusterResult;
 import org.apache.kafka.clients.admin.DescribeConfigsOptions;
 import org.apache.kafka.clients.admin.DescribeConfigsOptions;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
@@ -54,7 +53,6 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.KafkaFuture;
@@ -69,6 +67,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuple2;
@@ -183,7 +182,7 @@ public class ReactiveAdminClient implements Closeable {
         topicNames,
         topicNames,
         200,
         200,
         part -> getTopicsConfigImpl(part, includeDocFixed),
         part -> getTopicsConfigImpl(part, includeDocFixed),
-        (m1, m2) -> ImmutableMap.<String, List<ConfigEntry>>builder().putAll(m1).putAll(m2).build()
+        mapMerger()
     );
     );
   }
   }
 
 
@@ -236,7 +235,7 @@ public class ReactiveAdminClient implements Closeable {
         topics,
         topics,
         200,
         200,
         this::describeTopicsImpl,
         this::describeTopicsImpl,
-        (m1, m2) -> ImmutableMap.<String, TopicDescription>builder().putAll(m1).putAll(m2).build()
+        mapMerger()
     );
     );
   }
   }
 
 
@@ -383,32 +382,57 @@ public class ReactiveAdminClient implements Closeable {
     }
     }
   }
   }
 
 
-  public Mono<List<String>> listConsumerGroups() {
-    return toMono(client.listConsumerGroups().all())
-        .map(lst -> lst.stream().map(ConsumerGroupListing::groupId).collect(toList()));
+  public Mono<List<String>> listConsumerGroupNames() {
+    return listConsumerGroups().map(lst -> lst.stream().map(ConsumerGroupListing::groupId).toList());
   }
   }
 
 
-  public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
-    return toMono(client.describeConsumerGroups(groupIds).all());
+  public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
+    return toMono(client.listConsumerGroups().all());
   }
   }
 
 
-  public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(String groupId) {
-    return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
+  public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
+    return partitionCalls(
+        groupIds,
+        25,
+        4,
+        ids -> toMono(client.describeConsumerGroups(ids).all()),
+        mapMerger()
+    );
   }
   }
 
 
-  public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
-      String groupId, List<TopicPartition> partitions) {
-    return listConsumerGroupOffsets(groupId,
-        new ListConsumerGroupOffsetsOptions().topicPartitions(partitions));
-  }
+  // group -> partition -> offset
+  // NOTE: partitions with no committed offsets will be skipped
+  public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<String> consumerGroups,
+                                                                            // all partitions if null passed
+                                                                            @Nullable List<TopicPartition> partitions) {
+    Function<Collection<String>, Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>>> call =
+        groups -> toMono(
+            client.listConsumerGroupOffsets(
+                groups.stream()
+                    .collect(Collectors.toMap(
+                        g -> g,
+                        g -> new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)
+                    ))).all()
+        );
+
+    Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
+        consumerGroups,
+        25,
+        4,
+        call,
+        mapMerger()
+    );
 
 
-  private Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
-      String groupId, ListConsumerGroupOffsetsOptions options) {
-    return toMono(client.listConsumerGroupOffsets(groupId, options).partitionsToOffsetAndMetadata())
-        .map(MapUtil::removeNullValues)
-        .map(m -> m.entrySet().stream()
-            .map(e -> Tuples.of(e.getKey(), e.getValue().offset()))
-            .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)));
+    return merged.map(map -> {
+      var table = ImmutableTable.<String, TopicPartition, Long>builder();
+      map.forEach((g, tpOffsets) -> tpOffsets.forEach((tp, offset) -> {
+        if (offset != null) {
+          // offset will be null for partitions that don't have committed offset for this group
+          table.put(g, tp, offset.offset());
+        }
+      }));
+      return table.build();
+    });
   }
   }
 
 
   public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
   public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
@@ -501,7 +525,7 @@ public class ReactiveAdminClient implements Closeable {
         partitions,
         partitions,
         200,
         200,
         call,
         call,
-        (m1, m2) -> ImmutableMap.<TopicPartition, Long>builder().putAll(m1).putAll(m2).build()
+        mapMerger()
     );
     );
   }
   }
 
 
@@ -551,7 +575,7 @@ public class ReactiveAdminClient implements Closeable {
   }
   }
 
 
   /**
   /**
-   * Splits input collection into batches, applies each batch sequentially to function
+   * Splits input collection into batches, converts each batch into Mono, sequentially subscribes to them
    * and merges output Monos into one Mono.
    * and merges output Monos into one Mono.
    */
    */
   private static <R, I> Mono<R> partitionCalls(Collection<I> items,
   private static <R, I> Mono<R> partitionCalls(Collection<I> items,
@@ -561,14 +585,37 @@ public class ReactiveAdminClient implements Closeable {
     if (items.isEmpty()) {
     if (items.isEmpty()) {
       return call.apply(items);
       return call.apply(items);
     }
     }
-    Iterator<List<I>> parts = Iterators.partition(items.iterator(), partitionSize);
-    Mono<R> mono = call.apply(parts.next());
-    while (parts.hasNext()) {
-      var nextPart = parts.next();
-      // calls will be executed sequentially
-      mono = mono.flatMap(res1 -> call.apply(nextPart).map(res2 -> merger.apply(res1, res2)));
+    Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
+    return Flux.fromIterable(parts)
+        .concatMap(call)
+        .reduce(merger);
+  }
+
+  /**
+   * Splits input collection into batches, converts each batch into Mono, subscribes to them (concurrently,
+   * with specified concurrency level) and merges output Monos into one Mono.
+   */
+  private static <R, I> Mono<R> partitionCalls(Collection<I> items,
+                                               int partitionSize,
+                                               int concurrency,
+                                               Function<Collection<I>, Mono<R>> call,
+                                               BiFunction<R, R, R> merger) {
+    if (items.isEmpty()) {
+      return call.apply(items);
     }
     }
-    return mono;
+    Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
+    return Flux.fromIterable(parts)
+        .flatMap(call, concurrency)
+        .reduce(merger);
+  }
+
+  private static <K, V> BiFunction<Map<K, V>, Map<K, V>, Map<K, V>> mapMerger() {
+    return (m1, m2) -> {
+      var merged = new HashMap<K, V>();
+      merged.putAll(m1);
+      merged.putAll(m2);
+      return merged;
+    };
   }
   }
 
 
   @Override
   @Override

+ 0 - 21
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/MapUtil.java

@@ -1,21 +0,0 @@
-package com.provectus.kafka.ui.util;
-
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class MapUtil {
-
-  private MapUtil() {
-  }
-
-  public static <K, V> Map<K, V> removeNullValues(Map<K, V> map) {
-    return map.entrySet().stream()
-        .filter(e -> e.getValue() != null)
-        .collect(
-            Collectors.toMap(
-                Map.Entry::getKey,
-                Map.Entry::getValue
-            )
-        );
-  }
-}

+ 91 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java

@@ -2,14 +2,18 @@ package com.provectus.kafka.ui.service;
 
 
 import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExceptionFilter;
 import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExceptionFilter;
 import static java.util.Objects.requireNonNull;
 import static java.util.Objects.requireNonNull;
+import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.UUID;
+import java.util.function.Function;
 import java.util.stream.Stream;
 import java.util.stream.Stream;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClient;
@@ -18,12 +22,16 @@ import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.function.ThrowingRunnable;
 import org.junit.function.ThrowingRunnable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -96,6 +104,14 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
     clearings.add(() -> adminClient.deleteTopics(Stream.of(topics).map(NewTopic::name).toList()).all().get());
     clearings.add(() -> adminClient.deleteTopics(Stream.of(topics).map(NewTopic::name).toList()).all().get());
   }
   }
 
 
+  void fillTopic(String topic, int msgsCnt) {
+    try (var producer = KafkaTestProducer.forKafka(kafka)) {
+      for (int i = 0; i < msgsCnt; i++) {
+        producer.send(topic, UUID.randomUUID().toString());
+      }
+    }
+  }
+
   @Test
   @Test
   void testToMonoWithExceptionFilter() {
   void testToMonoWithExceptionFilter() {
     var failedFuture = new KafkaFutureImpl<String>();
     var failedFuture = new KafkaFutureImpl<String>();
@@ -152,4 +168,79 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
         .verifyComplete();
         .verifyComplete();
   }
   }
 
 
+
+  @Test
+  void testListConsumerGroupOffsets() throws Exception {
+    String topic = UUID.randomUUID().toString();
+    String anotherTopic = UUID.randomUUID().toString();
+    createTopics(new NewTopic(topic, 2, (short) 1), new NewTopic(anotherTopic, 1, (short) 1));
+    fillTopic(topic, 10);
+
+    Function<String, KafkaConsumer<String, String>> consumerSupplier = groupName -> {
+      Properties p = new Properties();
+      p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+      p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupName);
+      p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+      p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+      p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+      p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+      return new KafkaConsumer<String, String>(p);
+    };
+
+    String fullyPolledConsumer = UUID.randomUUID().toString();
+    try (KafkaConsumer<String, String> c = consumerSupplier.apply(fullyPolledConsumer)) {
+      c.subscribe(List.of(topic));
+      int polled = 0;
+      while (polled < 10) {
+        polled += c.poll(Duration.ofMillis(50)).count();
+      }
+      c.commitSync();
+    }
+
+    String polled1MsgConsumer = UUID.randomUUID().toString();
+    try (KafkaConsumer<String, String> c = consumerSupplier.apply(polled1MsgConsumer)) {
+      c.subscribe(List.of(topic));
+      c.poll(Duration.ofMillis(100));
+      c.commitSync(Map.of(tp(topic, 0), new OffsetAndMetadata(1)));
+    }
+
+    String noCommitConsumer = UUID.randomUUID().toString();
+    try (KafkaConsumer<String, String> c = consumerSupplier.apply(noCommitConsumer)) {
+      c.subscribe(List.of(topic));
+      c.poll(Duration.ofMillis(100));
+    }
+
+    Map<TopicPartition, ListOffsetsResultInfo> endOffsets = adminClient.listOffsets(Map.of(
+        tp(topic, 0), OffsetSpec.latest(),
+        tp(topic, 1), OffsetSpec.latest())).all().get();
+
+    StepVerifier.create(
+            reactiveAdminClient.listConsumerGroupOffsets(
+                List.of(fullyPolledConsumer, polled1MsgConsumer, noCommitConsumer),
+                List.of(
+                    tp(topic, 0),
+                    tp(topic, 1),
+                    tp(anotherTopic, 0))
+            )
+        ).assertNext(table -> {
+
+          assertThat(table.row(polled1MsgConsumer))
+              .containsEntry(tp(topic, 0), 1L)
+              .hasSize(1);
+
+          assertThat(table.row(noCommitConsumer))
+              .isEmpty();
+
+          assertThat(table.row(fullyPolledConsumer))
+              .containsEntry(tp(topic, 0), endOffsets.get(tp(topic, 0)).offset())
+              .containsEntry(tp(topic, 1), endOffsets.get(tp(topic, 1)).offset())
+              .hasSize(2);
+        })
+        .verifyComplete();
+  }
+
+  private static TopicPartition tp(String topic, int partition) {
+    return new TopicPartition(topic, partition);
+  }
+
 }
 }