From 6828a412427534305fdd1b0d501821e2cf8b1c5e Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Mon, 17 Jan 2022 13:58:33 +0300 Subject: [PATCH] Consumer groups pagination (Backend) (#1318) * consumer-groups endpoint with pagination added * consumer group dto mappings moved to controller * consumer groups loading logic refactored --- .../controller/ConsumerGroupsController.java | 46 +++- .../kafka/ui/mapper/ConsumerGroupMapper.java | 112 +++++++++ .../kafka/ui/model/InternalConsumerGroup.java | 64 ++++- .../ui/service/ConsumerGroupService.java | 218 +++++++++++------- .../kafka/ui/service/ReactiveAdminClient.java | 21 +- .../provectus/kafka/ui/util/ClusterUtil.java | 170 -------------- .../kafka/ui/KafkaConsumerGroupTests.java | 61 ++++- .../main/resources/swagger/kafka-ui-api.yaml | 57 +++++ 8 files changed, 490 insertions(+), 259 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index 2fd2321655..bca15d2203 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -4,16 +4,21 @@ import static java.util.stream.Collectors.toMap; import com.provectus.kafka.ui.api.ConsumerGroupsApi; import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.mapper.ConsumerGroupMapper; import com.provectus.kafka.ui.model.ConsumerGroupDTO; import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; import com.provectus.kafka.ui.model.ConsumerGroupOffsetsResetDTO; +import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO; +import com.provectus.kafka.ui.model.ConsumerGroupsPageResponseDTO; import com.provectus.kafka.ui.model.PartitionOffsetDTO; import com.provectus.kafka.ui.service.ConsumerGroupService; import com.provectus.kafka.ui.service.OffsetsResetService; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.ResponseEntity; import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.RestController; @@ -29,17 +34,21 @@ public class ConsumerGroupsController extends AbstractController implements Cons private final ConsumerGroupService consumerGroupService; private final OffsetsResetService offsetsResetService; + @Value("${consumer.groups.page.size:25}") + private int defaultConsumerGroupsPageSize; + @Override public Mono> deleteConsumerGroup(String clusterName, String id, ServerWebExchange exchange) { return consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id) - .map(ResponseEntity::ok); + .thenReturn(ResponseEntity.ok().build()); } @Override public Mono> getConsumerGroup( String clusterName, String consumerGroupId, ServerWebExchange exchange) { return consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId) + .map(ConsumerGroupMapper::toDetailsDto) .map(ResponseEntity::ok); } @@ -47,8 +56,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons @Override public Mono>> getConsumerGroups(String clusterName, ServerWebExchange exchange) { - return consumerGroupService.getConsumerGroups(getCluster(clusterName)) + return consumerGroupService.getAllConsumerGroups(getCluster(clusterName)) .map(Flux::fromIterable) + .map(f -> f.map(ConsumerGroupMapper::toDto)) .map(ResponseEntity::ok) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); } @@ -56,13 +66,41 @@ public class ConsumerGroupsController extends AbstractController implements Cons @Override public Mono>> getTopicConsumerGroups( String clusterName, String topicName, ServerWebExchange exchange) { - return consumerGroupService.getConsumerGroups( - getCluster(clusterName), Optional.of(topicName)) + return consumerGroupService.getConsumerGroupsForTopic(getCluster(clusterName), topicName) .map(Flux::fromIterable) + .map(f -> f.map(ConsumerGroupMapper::toDto)) .map(ResponseEntity::ok) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); } + @Override + public Mono> getConsumerGroupsPage( + String clusterName, + Integer page, + Integer perPage, + String search, + ConsumerGroupOrderingDTO orderBy, + ServerWebExchange exchange) { + return consumerGroupService.getConsumerGroupsPage( + getCluster(clusterName), + Optional.ofNullable(page).filter(i -> i > 0).orElse(1), + Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize), + search, + Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME) + ) + .map(this::convertPage) + .map(ResponseEntity::ok); + } + + private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage + consumerGroupConsumerGroupsPage) { + return new ConsumerGroupsPageResponseDTO() + .pageCount(consumerGroupConsumerGroupsPage.getTotalPages()) + .consumerGroups(consumerGroupConsumerGroupsPage.getConsumerGroups() + .stream() + .map(ConsumerGroupMapper::toDto) + .collect(Collectors.toList())); + } @Override public Mono> resetConsumerGroupOffsets(String clusterName, String group, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java new file mode 100644 index 0000000000..b821d920cf --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ConsumerGroupMapper.java @@ -0,0 +1,112 @@ +package com.provectus.kafka.ui.mapper; + +import com.provectus.kafka.ui.model.BrokerDTO; +import com.provectus.kafka.ui.model.ConsumerGroupDTO; +import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; +import com.provectus.kafka.ui.model.ConsumerGroupStateDTO; +import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO; +import com.provectus.kafka.ui.model.InternalConsumerGroup; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; + +public class ConsumerGroupMapper { + + public static ConsumerGroupDTO toDto(InternalConsumerGroup c) { + return convertToConsumerGroup(c, new ConsumerGroupDTO()); + } + + public static ConsumerGroupDetailsDTO toDetailsDto(InternalConsumerGroup g) { + ConsumerGroupDetailsDTO details = convertToConsumerGroup(g, new ConsumerGroupDetailsDTO()); + Map partitionMap = new HashMap<>(); + + for (Map.Entry entry : g.getOffsets().entrySet()) { + ConsumerGroupTopicPartitionDTO partition = new ConsumerGroupTopicPartitionDTO(); + partition.setTopic(entry.getKey().topic()); + partition.setPartition(entry.getKey().partition()); + partition.setCurrentOffset(entry.getValue()); + + final Optional endOffset = Optional.ofNullable(g.getEndOffsets()) + .map(o -> o.get(entry.getKey())); + + final Long behind = endOffset.map(o -> o - entry.getValue()) + .orElse(0L); + + partition.setEndOffset(endOffset.orElse(0L)); + partition.setMessagesBehind(behind); + + partitionMap.put(entry.getKey(), partition); + } + + for (InternalConsumerGroup.InternalMember member : g.getMembers()) { + for (TopicPartition topicPartition : member.getAssignment()) { + final ConsumerGroupTopicPartitionDTO partition = partitionMap.computeIfAbsent( + topicPartition, + (tp) -> new ConsumerGroupTopicPartitionDTO() + .topic(tp.topic()) + .partition(tp.partition()) + ); + partition.setHost(member.getHost()); + partition.setConsumerId(member.getConsumerId()); + partitionMap.put(topicPartition, partition); + } + } + details.setPartitions(new ArrayList<>(partitionMap.values())); + return details; + } + + private static T convertToConsumerGroup( + InternalConsumerGroup c, T consumerGroup) { + consumerGroup.setGroupId(c.getGroupId()); + consumerGroup.setMembers(c.getMembers().size()); + + int numTopics = Stream.concat( + c.getOffsets().keySet().stream().map(TopicPartition::topic), + c.getMembers().stream() + .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic)) + ).collect(Collectors.toSet()).size(); + + long messagesBehind = c.getOffsets().entrySet().stream() + .mapToLong(e -> + Optional.ofNullable(c.getEndOffsets()) + .map(o -> o.get(e.getKey())) + .map(o -> o - e.getValue()) + .orElse(0L) + ).sum(); + + consumerGroup.setMessagesBehind(messagesBehind); + consumerGroup.setTopics(numTopics); + consumerGroup.setSimple(c.isSimple()); + + Optional.ofNullable(c.getState()) + .ifPresent(s -> consumerGroup.setState(mapConsumerGroupState(s))); + Optional.ofNullable(c.getCoordinator()) + .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd))); + + consumerGroup.setPartitionAssignor(c.getPartitionAssignor()); + return consumerGroup; + } + + private static BrokerDTO mapCoordinator(Node node) { + return new BrokerDTO().host(node.host()).id(node.id()); + } + + private static ConsumerGroupStateDTO mapConsumerGroupState( + org.apache.kafka.common.ConsumerGroupState state) { + switch (state) { + case DEAD: return ConsumerGroupStateDTO.DEAD; + case EMPTY: return ConsumerGroupStateDTO.EMPTY; + case STABLE: return ConsumerGroupStateDTO.STABLE; + case PREPARING_REBALANCE: return ConsumerGroupStateDTO.PREPARING_REBALANCE; + case COMPLETING_REBALANCE: return ConsumerGroupStateDTO.COMPLETING_REBALANCE; + default: return ConsumerGroupStateDTO.UNKNOWN; + } + } + + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java index 0cc7c54671..2a16bea48a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java @@ -2,10 +2,13 @@ package com.provectus.kafka.ui.model; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; import lombok.Builder; import lombok.Data; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -16,7 +19,7 @@ public class InternalConsumerGroup { private final String groupId; private final boolean simple; private final Collection members; - private final Map offsets; + private final Map offsets; private final Map endOffsets; private final String partitionAssignor; private final ConsumerGroupState state; @@ -31,4 +34,61 @@ public class InternalConsumerGroup { private final String host; private final Set assignment; } + + public static InternalConsumerGroup create( + ConsumerGroupDescription description, + Map groupOffsets, + Map topicEndOffsets) { + var builder = InternalConsumerGroup.builder(); + builder.groupId(description.groupId()); + builder.simple(description.isSimpleConsumerGroup()); + builder.state(description.state()); + builder.partitionAssignor(description.partitionAssignor()); + builder.members( + description.members().stream() + .map(m -> + InternalConsumerGroup.InternalMember.builder() + .assignment(m.assignment().topicPartitions()) + .clientId(m.clientId()) + .groupInstanceId(m.groupInstanceId().orElse("")) + .consumerId(m.consumerId()) + .clientId(m.clientId()) + .host(m.host()) + .build() + ).collect(Collectors.toList()) + ); + builder.offsets(groupOffsets); + builder.endOffsets(topicEndOffsets); + Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator); + return builder.build(); + } + + // removes data for all partitions that are not fit filter + public InternalConsumerGroup retainDataForPartitions(Predicate partitionsFilter) { + var offsets = getOffsets().entrySet().stream() + .filter(e -> partitionsFilter.test(e.getKey())) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue + )); + + var members = getMembers().stream() + .map(m -> filterConsumerMemberTopic(m, partitionsFilter)) + .filter(m -> !m.getAssignment().isEmpty()) + .collect(Collectors.toList()); + + return toBuilder() + .offsets(offsets) + .members(members) + .build(); + } + + private InternalConsumerGroup.InternalMember filterConsumerMemberTopic( + InternalConsumerGroup.InternalMember member, Predicate partitionsFilter) { + var topicPartitions = member.getAssignment() + .stream() + .filter(partitionsFilter) + .collect(Collectors.toSet()); + return member.toBuilder().assignment(topicPartitions).build(); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java index 009df43f05..a0e67950a2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java @@ -1,21 +1,22 @@ package com.provectus.kafka.ui.service; -import com.provectus.kafka.ui.model.ConsumerGroupDTO; -import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; +import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO; import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.util.ClusterUtil; -import java.util.Collection; -import java.util.Collections; +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; -import java.util.Set; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; +import lombok.Value; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -25,6 +26,8 @@ import org.apache.kafka.common.utils.Bytes; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; @Service @@ -33,88 +36,145 @@ public class ConsumerGroupService { private final AdminClientService adminClientService; - private Mono> getConsumerGroupsInternal(KafkaCluster cluster) { - return adminClientService.get(cluster).flatMap(ac -> - ac.listConsumerGroups() - .flatMap(groupIds -> getConsumerGroupsInternal(cluster, groupIds))); + private Mono> getConsumerGroups( + ReactiveAdminClient ac, + List descriptions) { + return Flux.fromIterable(descriptions) + // 1. getting committed offsets for all groups + .flatMap(desc -> ac.listConsumerGroupOffsets(desc.groupId()) + .map(offsets -> Tuples.of(desc, offsets))) + .collectMap(Tuple2::getT1, Tuple2::getT2) + .flatMap((Map> groupOffsetsMap) -> { + var tpsFromGroupOffsets = groupOffsetsMap.values().stream() + .flatMap(v -> v.keySet().stream()) + .collect(Collectors.toSet()); + // 2. getting end offsets for partitions with in committed offsets + return ac.listOffsets(tpsFromGroupOffsets, OffsetSpec.latest()) + .map(endOffsets -> + descriptions.stream() + .map(desc -> { + var groupOffsets = groupOffsetsMap.get(desc); + var endOffsetsForGroup = new HashMap<>(endOffsets); + endOffsetsForGroup.keySet().retainAll(groupOffsets.keySet()); + // 3. gathering description & offsets + return InternalConsumerGroup.create(desc, groupOffsets, endOffsetsForGroup); + }) + .collect(Collectors.toList())); + }); } - private Mono> getConsumerGroupsInternal(KafkaCluster cluster, - List groupIds) { - return adminClientService.get(cluster).flatMap(ac -> - ac.describeConsumerGroups(groupIds) - .map(Map::values) - .flatMap(descriptions -> - Flux.fromIterable(descriptions) - .parallel() - .flatMap(d -> - ac.listConsumerGroupOffsets(d.groupId()) - .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets)) - ) - .sequential() - .collectList())); + @Deprecated // need to migrate to pagination + public Mono> getAllConsumerGroups(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ac -> describeConsumerGroups(ac, null) + .flatMap(descriptions -> getConsumerGroups(ac, descriptions))); } - public Mono> getConsumerGroups( - KafkaCluster cluster, Optional topic, List groupIds) { - final Mono> consumerGroups; + public Mono> getConsumerGroupsForTopic(KafkaCluster cluster, + String topic) { + return adminClientService.get(cluster) + // 1. getting topic's end offsets + .flatMap(ac -> ac.listOffsets(topic, OffsetSpec.latest()) + .flatMap(endOffsets -> { + var tps = new ArrayList<>(endOffsets.keySet()); + // 2. getting all consumer groups + return ac.listConsumerGroups() + .flatMap((List groups) -> + Flux.fromIterable(groups) + // 3. for each group trying to find committed offsets for topic + .flatMap(g -> + ac.listConsumerGroupOffsets(g, tps) + .map(offsets -> Tuples.of(g, offsets))) + .filter(t -> !t.getT2().isEmpty()) + .collectMap(Tuple2::getT1, Tuple2::getT2) + ) + .flatMap((Map> groupOffsets) -> + // 4. getting description for groups with non-emtpy offsets + ac.describeConsumerGroups(new ArrayList<>(groupOffsets.keySet())) + .map((Map descriptions) -> + descriptions.values().stream().map(desc -> + // 5. gathering and filter non-target-topic data + InternalConsumerGroup.create( + desc, groupOffsets.get(desc.groupId()), endOffsets) + .retainDataForPartitions(p -> p.topic().equals(topic)) + ) + .collect(Collectors.toList()))); + })); + } - if (groupIds.isEmpty()) { - consumerGroups = getConsumerGroupsInternal(cluster); - } else { - consumerGroups = getConsumerGroupsInternal(cluster, groupIds); + @Value + public static class ConsumerGroupsPage { + List consumerGroups; + int totalPages; + } + + public Mono getConsumerGroupsPage( + KafkaCluster cluster, + int page, + int perPage, + @Nullable String search, + ConsumerGroupOrderingDTO orderBy) { + return adminClientService.get(cluster).flatMap(ac -> + describeConsumerGroups(ac, search).flatMap(descriptions -> + getConsumerGroups( + ac, + descriptions.stream() + .sorted(getPaginationComparator(orderBy)) + .skip((long) (page - 1) * perPage) + .limit(perPage) + .collect(Collectors.toList()) + ).map(cgs -> new ConsumerGroupsPage( + cgs, + (descriptions.size() / perPage) + (descriptions.size() % perPage == 0 ? 0 : 1)))) + ); + } + + private Comparator getPaginationComparator(ConsumerGroupOrderingDTO + orderBy) { + switch (orderBy) { + case NAME: + return Comparator.comparing(ConsumerGroupDescription::groupId); + case STATE: + Function statesPriorities = cg -> { + switch (cg.state()) { + case STABLE: return 0; + case COMPLETING_REBALANCE: return 1; + case PREPARING_REBALANCE: return 2; + case EMPTY: return 3; + case DEAD: return 4; + case UNKNOWN: return 5; + default: return 100; + } + }; + return Comparator.comparingInt(statesPriorities::apply); + case MEMBERS: + return Comparator.comparingInt(cg -> -cg.members().size()); + default: + throw new IllegalStateException("Unsupported order by: " + orderBy); } - - return consumerGroups.flatMap(c -> { - final List groups = c.stream() - .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic)) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(Collectors.toList()); - - final Set topicPartitions = - groups.stream().flatMap(g -> g.getOffsets().keySet().stream()) - .collect(Collectors.toSet()); - - return topicPartitionsEndOffsets(cluster, topicPartitions).map(offsets -> - groups.stream().map(g -> { - Map offsetsCopy = new HashMap<>(offsets); - offsetsCopy.keySet().retainAll(g.getOffsets().keySet()); - return g.toBuilder().endOffsets(offsetsCopy).build(); - }).collect(Collectors.toList()) - ); - }); } - public Mono> getConsumerGroups(KafkaCluster cluster) { - return getConsumerGroups(cluster, Optional.empty()); + private Mono> describeConsumerGroups(ReactiveAdminClient ac, + @Nullable String search) { + return ac.listConsumerGroups() + .map(groupIds -> groupIds + .stream() + .filter(groupId -> search == null || StringUtils.containsIgnoreCase(groupId, search)) + .collect(Collectors.toList())) + .flatMap(ac::describeConsumerGroups) + .map(cgs -> new ArrayList<>(cgs.values())); } - public Mono> getConsumerGroups(KafkaCluster cluster, - Optional topic) { - return getConsumerGroups(cluster, topic, Collections.emptyList()) - .map(c -> - c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList()) - ); - } - - private Mono> topicPartitionsEndOffsets( - KafkaCluster cluster, Collection topicPartitions) { - - return adminClientService.get(cluster).flatMap(ac -> - ac.listOffsets(topicPartitions, OffsetSpec.latest()) - ); - } - - public Mono getConsumerGroupDetail(KafkaCluster cluster, - String consumerGroupId) { - return getConsumerGroups( - cluster, - Optional.empty(), - Collections.singletonList(consumerGroupId) - ).filter(groups -> !groups.isEmpty()).map(groups -> groups.get(0)).map( - ClusterUtil::convertToConsumerGroupDetails - ); + public Mono getConsumerGroupDetail(KafkaCluster cluster, + String consumerGroupId) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.describeConsumerGroups(List.of(consumerGroupId)) + .filter(m -> m.containsKey(consumerGroupId)) + .map(r -> r.get(consumerGroupId)) + .flatMap(descr -> + getConsumerGroups(ac, List.of(descr)) + .filter(groups -> !groups.isEmpty()) + .map(groups -> groups.get(0)))); } public Mono deleteConsumerGroupById(KafkaCluster cluster, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index a16e15dfd8..b333d3ecba 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -31,6 +31,7 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.DescribeConfigsOptions; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; @@ -293,9 +294,23 @@ public class ReactiveAdminClient implements Closeable { return toMono(client.describeConsumerGroups(groupIds).all()); } - public Mono> listConsumerGroupOffsets(String groupId) { - return toMono(client.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()) - .map(MapUtil::removeNullValues); + public Mono> listConsumerGroupOffsets(String groupId) { + return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions()); + } + + public Mono> listConsumerGroupOffsets( + String groupId, List partitions) { + return listConsumerGroupOffsets(groupId, + new ListConsumerGroupOffsetsOptions().topicPartitions(partitions)); + } + + private Mono> listConsumerGroupOffsets( + String groupId, ListConsumerGroupOffsetsOptions options) { + return toMono(client.listConsumerGroupOffsets(groupId, options).partitionsToOffsetAndMetadata()) + .map(MapUtil::removeNullValues) + .map(m -> m.entrySet().stream() + .map(e -> Tuples.of(e.getKey(), e.getValue().offset())) + .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2))); } public Mono alterConsumerGroupOffsets(String groupId, Map offsets) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index 7a075e29f7..c39135c49e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -1,11 +1,5 @@ package com.provectus.kafka.ui.util; -import com.provectus.kafka.ui.model.BrokerDTO; -import com.provectus.kafka.ui.model.ConsumerGroupDTO; -import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; -import com.provectus.kafka.ui.model.ConsumerGroupStateDTO; -import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO; -import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.MessageFormatDTO; import com.provectus.kafka.ui.model.ServerStatusDTO; import com.provectus.kafka.ui.model.TopicMessageDTO; @@ -13,20 +7,10 @@ import com.provectus.kafka.ui.serde.RecordSerDe; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Bytes; @@ -36,123 +20,6 @@ public class ClusterUtil { private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); - public static InternalConsumerGroup convertToInternalConsumerGroup( - ConsumerGroupDescription description, Map offsets) { - - var builder = InternalConsumerGroup.builder(); - builder.groupId(description.groupId()); - builder.simple(description.isSimpleConsumerGroup()); - builder.state(description.state()); - builder.partitionAssignor(description.partitionAssignor()); - builder.members( - description.members().stream() - .map(m -> - InternalConsumerGroup.InternalMember.builder() - .assignment(m.assignment().topicPartitions()) - .clientId(m.clientId()) - .groupInstanceId(m.groupInstanceId().orElse("")) - .consumerId(m.consumerId()) - .clientId(m.clientId()) - .host(m.host()) - .build() - ).collect(Collectors.toList()) - ); - builder.offsets(offsets); - Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator); - return builder.build(); - } - - public static ConsumerGroupDTO convertToConsumerGroup(InternalConsumerGroup c) { - return convertToConsumerGroup(c, new ConsumerGroupDTO()); - } - - public static T convertToConsumerGroup( - InternalConsumerGroup c, T consumerGroup) { - consumerGroup.setGroupId(c.getGroupId()); - consumerGroup.setMembers(c.getMembers().size()); - - int numTopics = Stream.concat( - c.getOffsets().keySet().stream().map(TopicPartition::topic), - c.getMembers().stream() - .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic)) - ).collect(Collectors.toSet()).size(); - - long messagesBehind = c.getOffsets().entrySet().stream() - .mapToLong(e -> - Optional.ofNullable(c.getEndOffsets()) - .map(o -> o.get(e.getKey())) - .map(o -> o - e.getValue().offset()) - .orElse(0L) - ).sum(); - - consumerGroup.setMessagesBehind(messagesBehind); - consumerGroup.setTopics(numTopics); - consumerGroup.setSimple(c.isSimple()); - - Optional.ofNullable(c.getState()) - .ifPresent(s -> consumerGroup.setState(mapConsumerGroupState(s))); - Optional.ofNullable(c.getCoordinator()) - .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd))); - - consumerGroup.setPartitionAssignor(c.getPartitionAssignor()); - return consumerGroup; - } - - public static ConsumerGroupDetailsDTO convertToConsumerGroupDetails(InternalConsumerGroup g) { - ConsumerGroupDetailsDTO details = convertToConsumerGroup(g, new ConsumerGroupDetailsDTO()); - Map partitionMap = new HashMap<>(); - - for (Map.Entry entry : g.getOffsets().entrySet()) { - ConsumerGroupTopicPartitionDTO partition = new ConsumerGroupTopicPartitionDTO(); - partition.setTopic(entry.getKey().topic()); - partition.setPartition(entry.getKey().partition()); - partition.setCurrentOffset(entry.getValue().offset()); - - final Optional endOffset = Optional.ofNullable(g.getEndOffsets()) - .map(o -> o.get(entry.getKey())); - - final Long behind = endOffset.map(o -> o - entry.getValue().offset()) - .orElse(0L); - - partition.setEndOffset(endOffset.orElse(0L)); - partition.setMessagesBehind(behind); - - partitionMap.put(entry.getKey(), partition); - } - - for (InternalConsumerGroup.InternalMember member : g.getMembers()) { - for (TopicPartition topicPartition : member.getAssignment()) { - final ConsumerGroupTopicPartitionDTO partition = partitionMap.computeIfAbsent( - topicPartition, - (tp) -> new ConsumerGroupTopicPartitionDTO() - .topic(tp.topic()) - .partition(tp.partition()) - ); - partition.setHost(member.getHost()); - partition.setConsumerId(member.getConsumerId()); - partitionMap.put(topicPartition, partition); - } - } - details.setPartitions(new ArrayList<>(partitionMap.values())); - return details; - } - - private static BrokerDTO mapCoordinator(Node node) { - return new BrokerDTO().host(node.host()).id(node.id()); - } - - private static ConsumerGroupStateDTO mapConsumerGroupState( - org.apache.kafka.common.ConsumerGroupState state) { - switch (state) { - case DEAD: return ConsumerGroupStateDTO.DEAD; - case EMPTY: return ConsumerGroupStateDTO.EMPTY; - case STABLE: return ConsumerGroupStateDTO.STABLE; - case PREPARING_REBALANCE: return ConsumerGroupStateDTO.PREPARING_REBALANCE; - case COMPLETING_REBALANCE: return ConsumerGroupStateDTO.COMPLETING_REBALANCE; - default: return ConsumerGroupStateDTO.UNKNOWN; - } - } - public static int convertToIntServerStatus(ServerStatusDTO serverStatus) { return serverStatus.equals(ServerStatusDTO.ONLINE) ? 1 : 0; } @@ -211,41 +78,4 @@ public class ClusterUtil { throw new IllegalArgumentException("Unknown timestampType: " + timestampType); } } - - public static Optional filterConsumerGroupTopic( - InternalConsumerGroup consumerGroup, Optional topic) { - - final Map offsets = - consumerGroup.getOffsets().entrySet().stream() - .filter(e -> topic.isEmpty() || e.getKey().topic().equals(topic.get())) - .collect(Collectors.toMap( - Map.Entry::getKey, - Map.Entry::getValue - )); - - final Collection members = - consumerGroup.getMembers().stream() - .map(m -> filterConsumerMemberTopic(m, topic)) - .filter(m -> !m.getAssignment().isEmpty()) - .collect(Collectors.toList()); - - if (!members.isEmpty() || !offsets.isEmpty()) { - return Optional.of( - consumerGroup.toBuilder() - .offsets(offsets) - .members(members) - .build() - ); - } else { - return Optional.empty(); - } - } - - public static InternalConsumerGroup.InternalMember filterConsumerMemberTopic( - InternalConsumerGroup.InternalMember member, Optional topic) { - final Set topicPartitions = member.getAssignment() - .stream().filter(tp -> topic.isEmpty() || tp.topic().equals(topic.get())) - .collect(Collectors.toSet()); - return member.toBuilder().assignment(topicPartitions).build(); - } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerGroupTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerGroupTests.java index 90e64e6216..16b2a14036 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerGroupTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerGroupTests.java @@ -1,9 +1,17 @@ package com.provectus.kafka.ui; +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.model.ConsumerGroupDTO; +import com.provectus.kafka.ui.model.ConsumerGroupsPageResponseDTO; +import java.io.Closeable; import java.time.Duration; +import java.util.Comparator; import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import lombok.val; import org.apache.kafka.clients.admin.NewTopic; @@ -78,8 +86,59 @@ public class KafkaConsumerGroupTests extends AbstractBaseTest { .isBadRequest(); } + + @Test + void shouldReturnConsumerGroupsWithPagination() throws Exception { + try (var groups1 = startConsumerGroups(3, "cgPageTest1"); + var groups2 = startConsumerGroups(2, "cgPageTest2")) { + webTestClient + .get() + .uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=3&search=cgPageTest", LOCAL) + .exchange() + .expectStatus() + .isOk() + .expectBody(ConsumerGroupsPageResponseDTO.class) + .value(page -> { + assertThat(page.getPageCount()).isEqualTo(2); + assertThat(page.getConsumerGroups().size()).isEqualTo(3); + }); + + webTestClient + .get() + .uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&search=cgPageTest", LOCAL) + .exchange() + .expectStatus() + .isOk() + .expectBody(ConsumerGroupsPageResponseDTO.class) + .value(page -> { + assertThat(page.getPageCount()).isEqualTo(1); + assertThat(page.getConsumerGroups().size()).isEqualTo(5); + assertThat(page.getConsumerGroups()) + .isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId)); + }); + } + } + + private Closeable startConsumerGroups(int count, String consumerGroupPrefix) { + String topicName = createTopicWithRandomName(); + var consumers = + Stream.generate(() -> { + String groupId = consumerGroupPrefix + UUID.randomUUID(); + val consumer = createTestConsumerWithGroupId(groupId); + consumer.subscribe(List.of(topicName)); + consumer.poll(Duration.ofMillis(100)); + return consumer; + }) + .limit(count) + .collect(Collectors.toList()); + return () -> { + consumers.forEach(KafkaConsumer::close); + deleteTopic(topicName); + }; + } + private String createTopicWithRandomName() { - String topicName = UUID.randomUUID().toString(); + String topicName = getClass().getSimpleName() + "-" + UUID.randomUUID(); short replicationFactor = 1; int partitions = 1; createTopic(new NewTopic(topicName, partitions, replicationFactor)); diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 5c46e0be74..70af448952 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -615,6 +615,46 @@ paths: items: $ref: '#/components/schemas/ConsumerGroup' + /api/clusters/{clusterName}/consumer-groups/paged: + get: + tags: + - Consumer Groups + summary: Get consumer croups with paging support + operationId: getConsumerGroupsPage + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: page + in: query + required: false + schema: + type: integer + - name: perPage + in: query + required: false + schema: + type: integer + - name: search + in: query + required: false + schema: + type: string + - name: orderBy + in: query + required: false + schema: + $ref: '#/components/schemas/ConsumerGroupOrdering' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ConsumerGroupsPageResponse' + /api/clusters/{clusterName}/consumer-groups/{id}: get: @@ -1863,6 +1903,23 @@ components: required: - groupId + ConsumerGroupOrdering: + type: string + enum: + - NAME + - MEMBERS + - STATE + + ConsumerGroupsPageResponse: + type: object + properties: + pageCount: + type: integer + consumerGroups: + type: array + items: + $ref: '#/components/schemas/ConsumerGroup' + CreateTopicMessage: type: object properties: