浏览代码

ISSUE-2080: Consumer groups not found for topic (#2628)

* Fixing consumers visibility when offsets not committed
Co-authored-by: iliax <ikuramshin@provectus.com>
Ilya Kuramshin 2 年之前
父节点
当前提交
2f786c080b

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java

@@ -6,6 +6,7 @@ import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupStateDTO;
 import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
+import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -24,6 +25,20 @@ public class ConsumerGroupMapper {
     return convertToConsumerGroup(c, new ConsumerGroupDTO());
   }
 
+  public static ConsumerGroupDTO toDto(InternalTopicConsumerGroup c) {
+    ConsumerGroupDTO consumerGroup = new ConsumerGroupDetailsDTO();
+    consumerGroup.setTopics(1); //for ui backward-compatibility, need to rm usage from ui
+    consumerGroup.setGroupId(c.getGroupId());
+    consumerGroup.setMembers(c.getMembers());
+    consumerGroup.setMessagesBehind(c.getMessagesBehind());
+    consumerGroup.setSimple(c.isSimple());
+    consumerGroup.setPartitionAssignor(c.getPartitionAssignor());
+    consumerGroup.setState(mapConsumerGroupState(c.getState()));
+    Optional.ofNullable(c.getCoordinator())
+        .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd)));
+    return consumerGroup;
+  }
+
   public static ConsumerGroupDetailsDTO toDetailsDto(InternalConsumerGroup g) {
     ConsumerGroupDetailsDTO details = convertToConsumerGroup(g, new ConsumerGroupDetailsDTO());
     Map<TopicPartition, ConsumerGroupTopicPartitionDTO> partitionMap = new HashMap<>();

+ 0 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java

@@ -4,7 +4,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import lombok.Builder;
 import lombok.Data;
@@ -62,13 +61,4 @@ public class InternalConsumerGroup {
     Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
     return builder.build();
   }
-
-  private InternalConsumerGroup.InternalMember filterConsumerMemberTopic(
-      InternalConsumerGroup.InternalMember member, Predicate<TopicPartition> partitionsFilter) {
-    var topicPartitions = member.getAssignment()
-        .stream()
-        .filter(partitionsFilter)
-        .collect(Collectors.toSet());
-    return member.toBuilder().assignment(topicPartitions).build();
-  }
 }

+ 61 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConsumerGroup.java

@@ -0,0 +1,61 @@
+package com.provectus.kafka.ui.model;
+
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import lombok.Builder;
+import lombok.Value;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+
+@Value
+@Builder
+public class InternalTopicConsumerGroup {
+
+  String groupId;
+  int members;
+  @Nullable
+  Long messagesBehind; //null means no committed offsets found for this group
+  boolean isSimple;
+  String partitionAssignor;
+  ConsumerGroupState state;
+  @Nullable
+  Node coordinator;
+
+  public static InternalTopicConsumerGroup create(
+      String topic,
+      ConsumerGroupDescription g,
+      Map<TopicPartition, Long> committedOffsets,
+      Map<TopicPartition, Long> endOffsets) {
+    return InternalTopicConsumerGroup.builder()
+        .groupId(g.groupId())
+        .members(
+            (int) g.members().stream()
+                // counting only members with target topic assignment
+                .filter(m -> m.assignment().topicPartitions().stream().anyMatch(p -> p.topic().equals(topic)))
+                .count()
+        )
+        .messagesBehind(calculateMessagesBehind(committedOffsets, endOffsets))
+        .isSimple(g.isSimpleConsumerGroup())
+        .partitionAssignor(g.partitionAssignor())
+        .state(g.state())
+        .coordinator(g.coordinator())
+        .build();
+  }
+
+  @Nullable
+  private static Long calculateMessagesBehind(Map<TopicPartition, Long> committedOffsets,
+                                              Map<TopicPartition, Long> endOffsets) {
+    if (committedOffsets.isEmpty()) {
+      return null;
+    }
+    return committedOffsets.entrySet().stream()
+        .mapToLong(e ->
+            Optional.ofNullable(endOffsets.get(e.getKey()))
+                .map(o -> o - e.getValue())
+                .orElse(0L)
+        ).sum();
+  }
+}

+ 21 - 20
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
 
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
+import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SortOrderDTO;
 import java.util.ArrayList;
@@ -30,7 +31,6 @@ import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
-
 @Service
 @RequiredArgsConstructor
 public class ConsumerGroupService {
@@ -71,37 +71,38 @@ public class ConsumerGroupService {
             .flatMap(descriptions -> getConsumerGroups(ac, descriptions)));
   }
 
-  public Mono<List<InternalConsumerGroup>> getConsumerGroupsForTopic(KafkaCluster cluster,
-                                                                     String topic) {
+  public Mono<List<InternalTopicConsumerGroup>> getConsumerGroupsForTopic(KafkaCluster cluster,
+                                                                          String topic) {
     return adminClientService.get(cluster)
         // 1. getting topic's end offsets
         .flatMap(ac -> ac.listOffsets(topic, OffsetSpec.latest())
             .flatMap(endOffsets -> {
               var tps = new ArrayList<>(endOffsets.keySet());
               // 2. getting all consumer groups
-              return ac.listConsumerGroups()
-                  .flatMap((List<String> groups) ->
+              return describeConsumerGroups(ac, null)
+                  .flatMap((List<ConsumerGroupDescription> groups) ->
                       Flux.fromIterable(groups)
                           // 3. for each group trying to find committed offsets for topic
                           .flatMap(g ->
-                              ac.listConsumerGroupOffsets(g, tps)
-                                  .map(offsets -> Tuples.of(g, offsets)))
-                          .filter(t -> !t.getT2().isEmpty())
-                          .collectMap(Tuple2::getT1, Tuple2::getT2)
-                  )
-                  .flatMap((Map<String, Map<TopicPartition, Long>> groupOffsets) ->
-                      // 4. getting description for groups with non-emtpy offsets
-                      ac.describeConsumerGroups(groupOffsets.keySet())
-                          .map((Map<String, ConsumerGroupDescription> descriptions) ->
-                              descriptions.values().stream().map(desc ->
-                                      // 5. gathering into InternalConsumerGroup
-                                      InternalConsumerGroup.create(
-                                              desc, groupOffsets.get(desc.groupId()), endOffsets)
-                                  )
-                                  .collect(Collectors.toList())));
+                              ac.listConsumerGroupOffsets(g.groupId(), tps)
+                                  // 4. keeping only groups that relates to topic
+                                  .filter(offsets -> isConsumerGroupRelatesToTopic(topic, g, offsets))
+                                  // 5. constructing results
+                                  .map(offsets -> InternalTopicConsumerGroup.create(topic, g, offsets, endOffsets))
+                          ).collectList());
             }));
   }
 
+  private boolean isConsumerGroupRelatesToTopic(String topic,
+                                                ConsumerGroupDescription description,
+                                                Map<TopicPartition, Long> committedGroupOffsetsForTopic) {
+    boolean hasActiveMembersForTopic = description.members()
+        .stream()
+        .anyMatch(m -> m.assignment().topicPartitions().stream().anyMatch(tp -> tp.topic().equals(topic)));
+    boolean hasCommittedOffsets = !committedGroupOffsetsForTopic.isEmpty();
+    return hasActiveMembersForTopic || hasCommittedOffsets;
+  }
+
   @Value
   public static class ConsumerGroupsPage {
     List<InternalConsumerGroup> consumerGroups;

+ 2 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -2329,6 +2329,7 @@ components:
         messagesBehind:
           type: integer
           format: int64
+          description: null if consumer group has no offsets committed
       required:
         - groupId
 
@@ -2542,6 +2543,7 @@ components:
         messagesBehind:
           type: integer
           format: int64
+          description: null if consumer group has no offsets committed
         consumerId:
           type: string
         host: