diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java index d04484b381..75c0a99039 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java @@ -6,6 +6,7 @@ import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; import com.provectus.kafka.ui.model.ConsumerGroupStateDTO; import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO; import com.provectus.kafka.ui.model.InternalConsumerGroup; +import com.provectus.kafka.ui.model.InternalTopicConsumerGroup; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -24,6 +25,20 @@ public class ConsumerGroupMapper { return convertToConsumerGroup(c, new ConsumerGroupDTO()); } + public static ConsumerGroupDTO toDto(InternalTopicConsumerGroup c) { + ConsumerGroupDTO consumerGroup = new ConsumerGroupDetailsDTO(); + consumerGroup.setTopics(1); //for ui backward-compatibility, need to rm usage from ui + consumerGroup.setGroupId(c.getGroupId()); + consumerGroup.setMembers(c.getMembers()); + consumerGroup.setMessagesBehind(c.getMessagesBehind()); + consumerGroup.setSimple(c.isSimple()); + consumerGroup.setPartitionAssignor(c.getPartitionAssignor()); + consumerGroup.setState(mapConsumerGroupState(c.getState())); + Optional.ofNullable(c.getCoordinator()) + .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd))); + return consumerGroup; + } + public static ConsumerGroupDetailsDTO toDetailsDto(InternalConsumerGroup g) { ConsumerGroupDetailsDTO details = convertToConsumerGroup(g, new ConsumerGroupDetailsDTO()); Map partitionMap = new HashMap<>(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java index ab5b6eed38..d7b3a732f1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java @@ -4,7 +4,6 @@ import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.Predicate; import java.util.stream.Collectors; import lombok.Builder; import lombok.Data; @@ -62,13 +61,4 @@ public class InternalConsumerGroup { Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator); return builder.build(); } - - private InternalConsumerGroup.InternalMember filterConsumerMemberTopic( - InternalConsumerGroup.InternalMember member, Predicate partitionsFilter) { - var topicPartitions = member.getAssignment() - .stream() - .filter(partitionsFilter) - .collect(Collectors.toSet()); - return member.toBuilder().assignment(topicPartitions).build(); - } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConsumerGroup.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConsumerGroup.java new file mode 100644 index 0000000000..82e455ac58 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConsumerGroup.java @@ -0,0 +1,61 @@ +package com.provectus.kafka.ui.model; + +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Value; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; + +@Value +@Builder +public class InternalTopicConsumerGroup { + + String groupId; + int members; + @Nullable + Long messagesBehind; //null means no committed offsets found for this group + boolean isSimple; + String partitionAssignor; + ConsumerGroupState state; + @Nullable + Node coordinator; + + public static InternalTopicConsumerGroup create( + String topic, + ConsumerGroupDescription g, + Map committedOffsets, + Map endOffsets) { + return InternalTopicConsumerGroup.builder() + .groupId(g.groupId()) + .members( + (int) g.members().stream() + // counting only members with target topic assignment + .filter(m -> m.assignment().topicPartitions().stream().anyMatch(p -> p.topic().equals(topic))) + .count() + ) + .messagesBehind(calculateMessagesBehind(committedOffsets, endOffsets)) + .isSimple(g.isSimpleConsumerGroup()) + .partitionAssignor(g.partitionAssignor()) + .state(g.state()) + .coordinator(g.coordinator()) + .build(); + } + + @Nullable + private static Long calculateMessagesBehind(Map committedOffsets, + Map endOffsets) { + if (committedOffsets.isEmpty()) { + return null; + } + return committedOffsets.entrySet().stream() + .mapToLong(e -> + Optional.ofNullable(endOffsets.get(e.getKey())) + .map(o -> o - e.getValue()) + .orElse(0L) + ).sum(); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java index 81e4e763e5..a82224ed27 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO; import com.provectus.kafka.ui.model.InternalConsumerGroup; +import com.provectus.kafka.ui.model.InternalTopicConsumerGroup; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.SortOrderDTO; import java.util.ArrayList; @@ -30,7 +31,6 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; - @Service @RequiredArgsConstructor public class ConsumerGroupService { @@ -71,37 +71,38 @@ public class ConsumerGroupService { .flatMap(descriptions -> getConsumerGroups(ac, descriptions))); } - public Mono> getConsumerGroupsForTopic(KafkaCluster cluster, - String topic) { + public Mono> getConsumerGroupsForTopic(KafkaCluster cluster, + String topic) { return adminClientService.get(cluster) // 1. getting topic's end offsets .flatMap(ac -> ac.listOffsets(topic, OffsetSpec.latest()) .flatMap(endOffsets -> { var tps = new ArrayList<>(endOffsets.keySet()); // 2. getting all consumer groups - return ac.listConsumerGroups() - .flatMap((List groups) -> + return describeConsumerGroups(ac, null) + .flatMap((List groups) -> Flux.fromIterable(groups) // 3. for each group trying to find committed offsets for topic .flatMap(g -> - ac.listConsumerGroupOffsets(g, tps) - .map(offsets -> Tuples.of(g, offsets))) - .filter(t -> !t.getT2().isEmpty()) - .collectMap(Tuple2::getT1, Tuple2::getT2) - ) - .flatMap((Map> groupOffsets) -> - // 4. getting description for groups with non-emtpy offsets - ac.describeConsumerGroups(groupOffsets.keySet()) - .map((Map descriptions) -> - descriptions.values().stream().map(desc -> - // 5. gathering into InternalConsumerGroup - InternalConsumerGroup.create( - desc, groupOffsets.get(desc.groupId()), endOffsets) - ) - .collect(Collectors.toList()))); + ac.listConsumerGroupOffsets(g.groupId(), tps) + // 4. keeping only groups that relates to topic + .filter(offsets -> isConsumerGroupRelatesToTopic(topic, g, offsets)) + // 5. constructing results + .map(offsets -> InternalTopicConsumerGroup.create(topic, g, offsets, endOffsets)) + ).collectList()); })); } + private boolean isConsumerGroupRelatesToTopic(String topic, + ConsumerGroupDescription description, + Map committedGroupOffsetsForTopic) { + boolean hasActiveMembersForTopic = description.members() + .stream() + .anyMatch(m -> m.assignment().topicPartitions().stream().anyMatch(tp -> tp.topic().equals(topic))); + boolean hasCommittedOffsets = !committedGroupOffsetsForTopic.isEmpty(); + return hasActiveMembersForTopic || hasCommittedOffsets; + } + @Value public static class ConsumerGroupsPage { List consumerGroups; diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 0ac9e6c46e..e06c453781 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2329,6 +2329,7 @@ components: messagesBehind: type: integer format: int64 + description: null if consumer group has no offsets committed required: - groupId @@ -2542,6 +2543,7 @@ components: messagesBehind: type: integer format: int64 + description: null if consumer group has no offsets committed consumerId: type: string host: