Pārlūkot izejas kodu

FE: Allow sorting consumer groups by topic num (#3633)

Signed-off-by: nisanohana3 <nisana230@gmail.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Nisan Ohana 2 gadi atpakaļ
vecāks
revīzija
734d4ccdf7

+ 1 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java

@@ -11,8 +11,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 
@@ -82,15 +80,8 @@ public class ConsumerGroupMapper {
       InternalConsumerGroup c, T consumerGroup) {
     consumerGroup.setGroupId(c.getGroupId());
     consumerGroup.setMembers(c.getMembers().size());
-
-    int numTopics = Stream.concat(
-        c.getOffsets().keySet().stream().map(TopicPartition::topic),
-        c.getMembers().stream()
-            .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
-    ).collect(Collectors.toSet()).size();
-
     consumerGroup.setMessagesBehind(c.getMessagesBehind());
-    consumerGroup.setTopics(numTopics);
+    consumerGroup.setTopics(c.getTopicNum());
     consumerGroup.setSimple(c.isSimple());
 
     Optional.ofNullable(c.getState())

+ 32 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java

@@ -5,6 +5,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import lombok.Builder;
 import lombok.Data;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
@@ -21,6 +22,7 @@ public class InternalConsumerGroup {
   private final Map<TopicPartition, Long> offsets;
   private final Map<TopicPartition, Long> endOffsets;
   private final Long messagesBehind;
+  private final Integer topicNum;
   private final String partitionAssignor;
   private final ConsumerGroupState state;
   private final Node coordinator;
@@ -44,22 +46,12 @@ public class InternalConsumerGroup {
     builder.simple(description.isSimpleConsumerGroup());
     builder.state(description.state());
     builder.partitionAssignor(description.partitionAssignor());
-    builder.members(
-        description.members().stream()
-            .map(m ->
-                InternalConsumerGroup.InternalMember.builder()
-                    .assignment(m.assignment().topicPartitions())
-                    .clientId(m.clientId())
-                    .groupInstanceId(m.groupInstanceId().orElse(""))
-                    .consumerId(m.consumerId())
-                    .clientId(m.clientId())
-                    .host(m.host())
-                    .build()
-            ).collect(Collectors.toList())
-    );
+    Collection<InternalMember> internalMembers = initInternalMembers(description);
+    builder.members(internalMembers);
     builder.offsets(groupOffsets);
     builder.endOffsets(topicEndOffsets);
     builder.messagesBehind(calculateMessagesBehind(groupOffsets, topicEndOffsets));
+    builder.topicNum(calculateTopicNum(groupOffsets, internalMembers));
     Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
     return builder.build();
   }
@@ -80,4 +72,31 @@ public class InternalConsumerGroup {
     return messagesBehind;
   }
 
+  private static Integer calculateTopicNum(Map<TopicPartition, Long> offsets, Collection<InternalMember> members) {
+
+    long topicNum = Stream.concat(
+        offsets.keySet().stream().map(TopicPartition::topic),
+        members.stream()
+            .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
+    ).distinct().count();
+
+    return Integer.valueOf((int) topicNum);
+
+  }
+
+  private static Collection<InternalMember> initInternalMembers(ConsumerGroupDescription description) {
+    return description.members().stream()
+        .map(m ->
+            InternalConsumerGroup.InternalMember.builder()
+                .assignment(m.assignment().topicPartitions())
+                .clientId(m.clientId())
+                .groupInstanceId(m.groupInstanceId().orElse(""))
+                .consumerId(m.consumerId())
+                .clientId(m.clientId())
+                .host(m.host())
+                .build()
+        ).collect(Collectors.toList());
+  }
+
+
 }

+ 32 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -101,6 +101,9 @@ public class ConsumerGroupService {
   public record ConsumerGroupsPage(List<InternalConsumerGroup> consumerGroups, int totalPages) {
   }
 
+  private record GroupWithDescr(InternalConsumerGroup icg, ConsumerGroupDescription cgd) {
+  }
+
   public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
       KafkaCluster cluster,
       int pageNum,
@@ -159,22 +162,19 @@ public class ConsumerGroupService {
                 sortAndPaginate(descriptions.values(), comparator, pageNum, perPage, sortOrderDto).toList());
       }
       case MESSAGES_BEHIND -> {
-        record GroupWithDescr(InternalConsumerGroup icg, ConsumerGroupDescription cgd) { }
 
         Comparator<GroupWithDescr> comparator = Comparator.comparingLong(gwd ->
             gwd.icg.getMessagesBehind() == null ? 0L : gwd.icg.getMessagesBehind());
 
-        var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
+        yield loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
+      }
+
+      case TOPIC_NUM -> {
+
+        Comparator<GroupWithDescr> comparator = Comparator.comparingInt(gwd -> gwd.icg.getTopicNum());
+
+        yield loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
 
-        yield ac.describeConsumerGroups(groupNames)
-            .flatMap(descriptionsMap -> {
-                  List<ConsumerGroupDescription> descriptions = descriptionsMap.values().stream().toList();
-                  return getConsumerGroups(ac, descriptions)
-                      .map(icg -> Streams.zip(icg.stream(), descriptions.stream(), GroupWithDescr::new).toList())
-                      .map(gwd -> sortAndPaginate(gwd, comparator, pageNum, perPage, sortOrderDto)
-                            .map(GroupWithDescr::cgd).toList());
-                }
-            );
       }
     };
   }
@@ -209,6 +209,27 @@ public class ConsumerGroupService {
         .map(cgs -> new ArrayList<>(cgs.values()));
   }
 
+
+  private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac,
+                                                                                  List<ConsumerGroupListing> groups,
+                                                                                  Comparator<GroupWithDescr> comparator,
+                                                                                  int pageNum,
+                                                                                  int perPage,
+                                                                                  SortOrderDTO sortOrderDto) {
+    var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
+
+    return ac.describeConsumerGroups(groupNames)
+        .flatMap(descriptionsMap -> {
+              List<ConsumerGroupDescription> descriptions = descriptionsMap.values().stream().toList();
+              return getConsumerGroups(ac, descriptions)
+                  .map(icg -> Streams.zip(icg.stream(), descriptions.stream(), GroupWithDescr::new).toList())
+                  .map(gwd -> sortAndPaginate(gwd, comparator, pageNum, perPage, sortOrderDto)
+                      .map(GroupWithDescr::cgd).toList());
+            }
+        );
+
+  }
+
   public Mono<InternalConsumerGroup> getConsumerGroupDetail(KafkaCluster cluster,
                                                             String consumerGroupId) {
     return adminClientService.get(cluster)

+ 1 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -2441,6 +2441,7 @@ components:
         - MEMBERS
         - STATE
         - MESSAGES_BEHIND
+        - TOPIC_NUM
 
     ConsumerGroupsPageResponse:
       type: object

+ 1 - 1
kafka-ui-react-app/src/components/ConsumerGroups/List.tsx

@@ -51,9 +51,9 @@ const List = () => {
         accessorKey: 'members',
       },
       {
+        id: ConsumerGroupOrdering.TOPIC_NUM,
         header: 'Num Of Topics',
         accessorKey: 'topics',
-        enableSorting: false,
       },
       {
         id: ConsumerGroupOrdering.MESSAGES_BEHIND,