From 13463fe95f8c6c375b09d47e7da41a87b8a47134 Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 9 Jul 2021 15:55:43 +0300 Subject: [PATCH] Refactor consumer groups requests (#636) * Refactor consumer groups requests * Fixed offsets tests * Moved state enum to separate class * Adjust frontend for the new API Co-authored-by: Alexander --- .../controller/ConsumerGroupsController.java | 11 +- .../kafka/ui/model/InternalConsumerGroup.java | 34 +++ .../kafka/ui/service/ClusterService.java | 49 +--- .../kafka/ui/service/KafkaService.java | 78 ++++--- .../kafka/ui/service/OffsetsResetService.java | 9 +- .../provectus/kafka/ui/util/ClusterUtil.java | 217 +++++++++++------- .../main/resources/swagger/kafka-ui-api.yaml | 87 ++++--- .../ConsumerGroups/Details/Details.tsx | 15 +- .../Details/DetailsContainer.ts | 1 - .../ConsumerGroups/Details/ListItem.tsx | 4 +- .../Details/__tests__/Details.spec.tsx | 5 +- .../__snapshots__/Details.spec.tsx.snap | 2 - .../components/ConsumerGroups/List/List.tsx | 9 +- .../ConsumerGroups/List/ListItem.tsx | 14 +- .../ConsumerGroups/TopicConsumerGroups.tsx | 41 ++-- .../__test__/TopicConsumerGroups.spec.tsx | 35 ++- .../ConsumerGroupStateTag.tsx | 40 ++++ .../actions/__test__/thunks/topics.spec.ts | 4 +- .../src/redux/interfaces/consumerGroup.ts | 6 +- .../src/redux/interfaces/topic.ts | 3 +- .../consumerGroups/__test__/reducer.spec.ts | 3 +- .../redux/reducers/consumerGroups/reducer.ts | 6 +- .../src/redux/reducers/topics/selectors.ts | 2 +- 23 files changed, 395 insertions(+), 280 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java create mode 100644 kafka-ui-react-app/src/components/common/ConsumerGroupState/ConsumerGroupStateTag.tsx diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index add1508348..b0bac49169 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -9,11 +9,11 @@ import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerGroupDetails; import com.provectus.kafka.ui.model.ConsumerGroupOffsetsReset; import com.provectus.kafka.ui.model.PartitionOffset; -import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.service.ClusterService; import com.provectus.kafka.ui.service.ClustersStorage; import com.provectus.kafka.ui.service.OffsetsResetService; import java.util.Map; +import java.util.Optional; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.http.ResponseEntity; @@ -56,12 +56,15 @@ public class ConsumerGroupsController implements ConsumerGroupsApi { } @Override - public Mono> getTopicConsumerGroups( + public Mono>> getTopicConsumerGroups( String clusterName, String topicName, ServerWebExchange exchange) { - return clusterService.getTopicConsumerGroupDetail(clusterName, topicName) - .map(ResponseEntity::ok); + return clusterService.getConsumerGroups(clusterName, Optional.of(topicName)) + .map(Flux::fromIterable) + .map(ResponseEntity::ok) + .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); } + @Override public Mono> resetConsumerGroupOffsets(String clusterName, String group, Mono diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java new file mode 100644 index 0000000000..0cc7c54671 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalConsumerGroup.java @@ -0,0 +1,34 @@ +package com.provectus.kafka.ui.model; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import lombok.Builder; +import lombok.Data; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; + +@Data +@Builder(toBuilder = true) +public class InternalConsumerGroup { + private final String groupId; + private final boolean simple; + private final Collection members; + private final Map offsets; + private final Map endOffsets; + private final String partitionAssignor; + private final ConsumerGroupState state; + private final Node coordinator; + + @Data + @Builder(toBuilder = true) + public static class InternalMember { + private final String consumerId; + private final String groupInstanceId; + private final String clientId; + private final String host; + private final Set assignment; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index bfbbef5186..57faf2d5e4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -25,7 +25,6 @@ import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse; import com.provectus.kafka.ui.model.Topic; import com.provectus.kafka.ui.model.TopicColumnsToSort; import com.provectus.kafka.ui.model.TopicConfig; -import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicDetails; import com.provectus.kafka.ui.model.TopicMessage; @@ -37,24 +36,20 @@ import com.provectus.kafka.ui.util.ClusterUtil; import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuples; @Service @RequiredArgsConstructor @@ -190,46 +185,26 @@ public class ClusterService { public Mono getConsumerGroupDetail(String clusterName, String consumerGroupId) { var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new); - - return kafkaService.getOrCreateAdminClient(cluster).map(ac -> - ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all() - ).flatMap(groups -> - kafkaService.groupMetadata(cluster, consumerGroupId) - .flatMap(offsets -> { - Map endOffsets = - kafkaService.topicPartitionsEndOffsets(cluster, offsets.keySet()); - return ClusterUtil.toMono(groups).map(s -> - Tuples.of( - s.get(consumerGroupId), - s.get(consumerGroupId).members().stream() - .flatMap(c -> - Stream.of( - ClusterUtil.convertToConsumerTopicPartitionDetails( - c, offsets, endOffsets, consumerGroupId - ) - ) - ) - .collect(Collectors.toList()).stream() - .flatMap(t -> - t.stream().flatMap(Stream::of) - ).collect(Collectors.toList()) - ) - ); - }).map(c -> ClusterUtil.convertToConsumerGroupDetails(c.getT1(), c.getT2())) + return kafkaService.getConsumerGroups( + cluster, + Optional.empty(), + Collections.singletonList(consumerGroupId) + ).filter(groups -> !groups.isEmpty()).map(groups -> groups.get(0)).map( + ClusterUtil::convertToConsumerGroupDetails ); } public Mono> getConsumerGroups(String clusterName) { - return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) - .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) - .flatMap(kafkaService::getConsumerGroups); + return getConsumerGroups(clusterName, Optional.empty()); } - public Mono getTopicConsumerGroupDetail( - String clusterName, String topicName) { + public Mono> getConsumerGroups(String clusterName, Optional topic) { return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) - .flatMap(c -> kafkaService.getTopicConsumerGroups(c, topicName)); + .flatMap(c -> kafkaService.getConsumerGroups(c, topic, Collections.emptyList())) + .map(c -> + c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList()) + ); } public Flux getBrokers(String clusterName) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index 445b8ea422..d61f807c15 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -1,12 +1,12 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.exception.ValidationException; -import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics; +import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalSegmentSizeDto; @@ -17,7 +17,6 @@ import com.provectus.kafka.ui.model.Metric; import com.provectus.kafka.ui.model.PartitionsIncrease; import com.provectus.kafka.ui.model.ReplicationFactorChange; import com.provectus.kafka.ui.model.ServerStatus; -import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicUpdate; import com.provectus.kafka.ui.serde.DeserializationService; @@ -51,7 +50,6 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; @@ -327,45 +325,59 @@ public class KafkaService { ); } - public Mono> getConsumerGroupsInternal( + public Mono> getConsumerGroupsInternal( KafkaCluster cluster) { return getOrCreateAdminClient(cluster).flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()) .flatMap(s -> - ClusterUtil.toMono( - ac.getAdminClient().describeConsumerGroups( - s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()) - ).all() - ).map(Map::values) + getConsumerGroupsInternal( + cluster, + s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())) + ) + ); + } + + public Mono> getConsumerGroupsInternal( + KafkaCluster cluster, List groupIds) { + + return getOrCreateAdminClient(cluster).flatMap(ac -> + ClusterUtil.toMono( + ac.getAdminClient().describeConsumerGroups(groupIds).all() + ).map(Map::values) + ).flatMap(descriptions -> + Flux.fromIterable(descriptions) + .parallel() + .flatMap(d -> + groupMetadata(cluster, d.groupId()) + .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets)) ) + .sequential() + .collectList() ); } - public Mono> getConsumerGroups(KafkaCluster cluster) { - return getConsumerGroupsInternal(cluster) - .map(c -> c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())); - } + public Mono> getConsumerGroups( + KafkaCluster cluster, Optional topic, List groupIds) { + final Mono> consumerGroups; - public Mono getTopicConsumerGroups(KafkaCluster cluster, String topic) { - final Map endOffsets = topicEndOffsets(cluster, topic); + if (groupIds.isEmpty()) { + consumerGroups = getConsumerGroupsInternal(cluster); + } else { + consumerGroups = getConsumerGroupsInternal(cluster, groupIds); + } - return getConsumerGroupsInternal(cluster) - .flatMapIterable(c -> + return consumerGroups.map(c -> c.stream() .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic)) .filter(Optional::isPresent) .map(Optional::get) - .map(d -> - groupMetadata(cluster, d.groupId()) - .flatMapIterable(meta -> - d.members().stream().flatMap(m -> - ClusterUtil.convertToConsumerTopicPartitionDetails( - m, meta, endOffsets, d.groupId() - ).stream() - ).collect(Collectors.toList()) - ) - ).collect(Collectors.toList()) - ).flatMap(f -> f).collectList().map(l -> new TopicConsumerGroups().consumers(l)); + .map(g -> + g.toBuilder().endOffsets( + topicPartitionsEndOffsets(cluster, g.getOffsets().keySet()) + ).build() + ) + .collect(Collectors.toList()) + ); } public Mono> groupMetadata(KafkaCluster cluster, @@ -377,16 +389,6 @@ public class KafkaService { ).flatMap(ClusterUtil::toMono); } - public Map topicEndOffsets( - KafkaCluster cluster, String topic) { - try (KafkaConsumer consumer = createConsumer(cluster)) { - final List topicPartitions = consumer.partitionsFor(topic).stream() - .map(i -> new TopicPartition(i.topic(), i.partition())) - .collect(Collectors.toList()); - return consumer.endOffsets(topicPartitions); - } - } - public Map topicPartitionsEndOffsets( KafkaCluster cluster, Collection topicPartitions) { try (KafkaConsumer consumer = createConsumer(cluster)) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java index d04d649989..73d86cecb3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java @@ -9,6 +9,7 @@ import static org.apache.kafka.common.ConsumerGroupState.EMPTY; import com.google.common.collect.Sets; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.KafkaCluster; import java.util.Collection; import java.util.HashMap; @@ -78,20 +79,20 @@ public class OffsetsResetService { } private void checkGroupCondition(KafkaCluster cluster, String groupId) { - ConsumerGroupDescription description = + InternalConsumerGroup description = kafkaService.getConsumerGroupsInternal(cluster) .blockOptional() .stream() .flatMap(Collection::stream) - .filter(cgd -> cgd.groupId().equals(groupId)) + .filter(cgd -> cgd.getGroupId().equals(groupId)) .findAny() .orElseThrow(() -> new NotFoundException("Consumer group not found")); - if (!Set.of(DEAD, EMPTY).contains(description.state())) { + if (!Set.of(DEAD, EMPTY).contains(description.getState())) { throw new ValidationException( String.format( "Group's offsets can be reset only if group is inactive, but group is in %s state", - description.state())); + description.getState())); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index fd50b44745..8248372b05 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -3,10 +3,13 @@ package com.provectus.kafka.ui.util; import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS; import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; +import com.provectus.kafka.ui.model.Broker; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerGroupDetails; -import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail; +import com.provectus.kafka.ui.model.ConsumerGroupState; +import com.provectus.kafka.ui.model.ConsumerGroupTopicPartition; import com.provectus.kafka.ui.model.ExtendedAdminClient; +import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalTopic; @@ -17,6 +20,7 @@ import com.provectus.kafka.ui.serde.RecordSerDe; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -31,8 +35,6 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; -import org.apache.kafka.clients.admin.MemberAssignment; -import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -72,57 +74,120 @@ public class ClusterUtil { })); } - public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c) { - ConsumerGroup consumerGroup = new ConsumerGroup(); - consumerGroup.setConsumerGroupId(c.groupId()); - consumerGroup.setNumConsumers(c.members().size()); - int numTopics = c.members().stream() - .flatMap(m -> m.assignment().topicPartitions().stream().flatMap(t -> Stream.of(t.topic()))) - .collect(Collectors.toSet()).size(); - consumerGroup.setNumTopics(numTopics); - consumerGroup.setSimple(c.isSimpleConsumerGroup()); - Optional.ofNullable(c.state()) - .ifPresent(s -> consumerGroup.setState(s.name())); - Optional.ofNullable(c.coordinator()) - .ifPresent(coord -> consumerGroup.setCoordintor(coord.host())); - consumerGroup.setPartitionAssignor(c.partitionAssignor()); + public static InternalConsumerGroup convertToInternalConsumerGroup( + ConsumerGroupDescription description, Map offsets) { + + var builder = InternalConsumerGroup.builder(); + builder.groupId(description.groupId()); + builder.simple(description.isSimpleConsumerGroup()); + builder.state(description.state()); + builder.partitionAssignor(description.partitionAssignor()); + builder.members( + description.members().stream() + .map(m -> + InternalConsumerGroup.InternalMember.builder() + .assignment(m.assignment().topicPartitions()) + .clientId(m.clientId()) + .groupInstanceId(m.groupInstanceId().orElse("")) + .consumerId(m.consumerId()) + .clientId(m.clientId()) + .host(m.host()) + .build() + ).collect(Collectors.toList()) + ); + builder.offsets(offsets); + Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator); + return builder.build(); + } + + public static ConsumerGroup convertToConsumerGroup(InternalConsumerGroup c) { + return convertToConsumerGroup(c, new ConsumerGroup()); + } + + public static T convertToConsumerGroup( + InternalConsumerGroup c, T consumerGroup) { + consumerGroup.setGroupId(c.getGroupId()); + consumerGroup.setMembers(c.getMembers().size()); + + int numTopics = Stream.concat( + c.getOffsets().keySet().stream().map(TopicPartition::topic), + c.getMembers().stream() + .flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic)) + ).collect(Collectors.toSet()).size(); + + long messagesBehind = c.getOffsets().entrySet().stream() + .mapToLong(e -> + Optional.ofNullable(c.getEndOffsets()) + .map(o -> o.get(e.getKey())) + .map(o -> o - e.getValue().offset()) + .orElse(0L) + ).sum(); + + consumerGroup.setMessagesBehind(messagesBehind); + consumerGroup.setTopics(numTopics); + consumerGroup.setSimple(c.isSimple()); + + Optional.ofNullable(c.getState()) + .ifPresent(s -> consumerGroup.setState(mapConsumerGroupState(s))); + Optional.ofNullable(c.getCoordinator()) + .ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd))); + + consumerGroup.setPartitionAssignor(c.getPartitionAssignor()); return consumerGroup; } - public static ConsumerGroupDetails convertToConsumerGroupDetails( - ConsumerGroupDescription desc, List consumers - ) { - return new ConsumerGroupDetails() - .consumers(consumers) - .consumerGroupId(desc.groupId()) - .simple(desc.isSimpleConsumerGroup()) - .coordintor(Optional.ofNullable(desc.coordinator()).map(Node::host).orElse("")) - .state(Optional.ofNullable(desc.state()).map(Enum::name).orElse("")) - .partitionAssignor(desc.partitionAssignor()); + public static ConsumerGroupDetails convertToConsumerGroupDetails(InternalConsumerGroup g) { + final ConsumerGroupDetails details = convertToConsumerGroup(g, new ConsumerGroupDetails()); + Map partitionMap = new HashMap<>(); + + for (Map.Entry entry : g.getOffsets().entrySet()) { + ConsumerGroupTopicPartition partition = new ConsumerGroupTopicPartition(); + partition.setTopic(entry.getKey().topic()); + partition.setPartition(entry.getKey().partition()); + partition.setCurrentOffset(entry.getValue().offset()); + + final Optional endOffset = Optional.ofNullable(g.getEndOffsets()) + .map(o -> o.get(entry.getKey())); + + final Long behind = endOffset.map(o -> o - entry.getValue().offset()) + .orElse(0L); + + partition.setEndOffset(endOffset.orElse(0L)); + partition.setMessagesBehind(behind); + + partitionMap.put(entry.getKey(), partition); + } + + for (InternalConsumerGroup.InternalMember member : g.getMembers()) { + for (TopicPartition topicPartition : member.getAssignment()) { + final ConsumerGroupTopicPartition partition = partitionMap.computeIfAbsent(topicPartition, + (tp) -> new ConsumerGroupTopicPartition() + .topic(tp.topic()) + .partition(tp.partition()) + ); + partition.setHost(member.getHost()); + partition.setConsumerId(member.getConsumerId()); + partitionMap.put(topicPartition, partition); + } + } + details.setPartitions(new ArrayList<>(partitionMap.values())); + return details; } - public static List convertToConsumerTopicPartitionDetails( - MemberDescription consumer, - Map groupOffsets, - Map endOffsets, - String groupId - ) { - return consumer.assignment().topicPartitions().stream() - .map(tp -> { - long currentOffset = Optional.ofNullable(groupOffsets.get(tp)) - .map(OffsetAndMetadata::offset).orElse(0L); - long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L); - ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail(); - cd.setGroupId(groupId); - cd.setConsumerId(consumer.consumerId()); - cd.setHost(consumer.host()); - cd.setTopic(tp.topic()); - cd.setPartition(tp.partition()); - cd.setCurrentOffset(currentOffset); - cd.setEndOffset(endOffset); - cd.setMessagesBehind(endOffset - currentOffset); - return cd; - }).collect(Collectors.toList()); + private static Broker mapCoordinator(Node node) { + return new Broker().host(node.host()).id(node.id()); + } + + private static ConsumerGroupState mapConsumerGroupState( + org.apache.kafka.common.ConsumerGroupState state) { + switch (state) { + case DEAD: return ConsumerGroupState.DEAD; + case EMPTY: return ConsumerGroupState.EMPTY; + case STABLE: return ConsumerGroupState.STABLE; + case PREPARING_REBALANCE: return ConsumerGroupState.PREPARING_REBALANCE; + case COMPLETING_REBALANCE: return ConsumerGroupState.COMPLETING_REBALANCE; + default: return ConsumerGroupState.UNKNOWN; + } } @@ -282,42 +347,40 @@ public class ClusterUtil { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow(); } - public static Optional filterConsumerGroupTopic( - ConsumerGroupDescription description, String topic) { - final List members = description.members().stream() - .map(m -> filterConsumerMemberTopic(m, topic)) - .filter(m -> !m.assignment().topicPartitions().isEmpty()) - .collect(Collectors.toList()); + public static Optional filterConsumerGroupTopic( + InternalConsumerGroup consumerGroup, Optional topic) { - if (!members.isEmpty()) { + final Map offsets = + consumerGroup.getOffsets().entrySet().stream() + .filter(e -> topic.isEmpty() || e.getKey().topic().equals(topic.get())) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue + )); + + final Collection members = + consumerGroup.getMembers().stream() + .map(m -> filterConsumerMemberTopic(m, topic)) + .filter(m -> !m.getAssignment().isEmpty()) + .collect(Collectors.toList()); + + if (!members.isEmpty() || !offsets.isEmpty()) { return Optional.of( - new ConsumerGroupDescription( - description.groupId(), - description.isSimpleConsumerGroup(), - members, - description.partitionAssignor(), - description.state(), - description.coordinator() - ) + consumerGroup.toBuilder() + .offsets(offsets) + .members(members) + .build() ); } else { return Optional.empty(); } } - public static MemberDescription filterConsumerMemberTopic( - MemberDescription description, String topic) { - final Set topicPartitions = description.assignment().topicPartitions() - .stream().filter(tp -> tp.topic().equals(topic)) + public static InternalConsumerGroup.InternalMember filterConsumerMemberTopic( + InternalConsumerGroup.InternalMember member, Optional topic) { + final Set topicPartitions = member.getAssignment() + .stream().filter(tp -> topic.isEmpty() || tp.topic().equals(topic.get())) .collect(Collectors.toSet()); - MemberAssignment assignment = new MemberAssignment(topicPartitions); - return new MemberDescription( - description.consumerId(), - description.groupInstanceId(), - description.clientId(), - description.host(), - assignment - ); + return member.toBuilder().assignment(topicPartitions).build(); } - } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 2049fdb746..488566c166 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -472,7 +472,7 @@ paths: schema: $ref: '#/components/schemas/TopicMessageSchema' - /api/clusters/{clusterName}/topics/{topicName}/consumergroups: + /api/clusters/{clusterName}/topics/{topicName}/consumer-groups: get: tags: - Consumer Groups @@ -495,7 +495,10 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/TopicConsumerGroups' + type: array + items: + $ref: '#/components/schemas/ConsumerGroup' + /api/clusters/{clusterName}/consumer-groups/{id}: get: @@ -542,7 +545,7 @@ paths: 200: description: OK - /api/clusters/{clusterName}/consumerGroups: + /api/clusters/{clusterName}/consumer-groups: get: tags: - Consumer Groups @@ -1580,28 +1583,38 @@ components: required: - id + ConsumerGroupState: + type: string + enum: + - UNKNOWN + - PREPARING_REBALANCE + - COMPLETING_REBALANCE + - STABLE + - DEAD + - EMPTY + ConsumerGroup: type: object properties: - clusterId: + groupId: type: string - consumerGroupId: - type: string - numConsumers: + members: type: integer - numTopics: + topics: type: integer simple: type: boolean partitionAssignor: type: string state: - type: string - coordintor: - type: string + $ref: "#/components/schemas/ConsumerGroupState" + coordinator: + $ref: "#/components/schemas/Broker" + messagesBehind: + type: integer + format: int64 required: - - clusterId - - consumerGroupId + - groupId CreateTopicMessage: type: object @@ -1713,17 +1726,11 @@ components: - offsetMax - offsetMin - ConsumerTopicPartitionDetail: + ConsumerGroupTopicPartition: type: object properties: - groupId: - type: string - consumerId: - type: string topic: type: string - host: - type: string partition: type: integer currentOffset: @@ -1735,36 +1742,24 @@ components: messagesBehind: type: integer format: int64 + consumerId: + type: string + host: + type: string required: - - consumerId + - topic + - partition - TopicConsumerGroups: - type: object - properties: - consumers: - type: array - items: - $ref: '#/components/schemas/ConsumerTopicPartitionDetail' ConsumerGroupDetails: - type: object - properties: - consumerGroupId: - type: string - simple: - type: boolean - partitionAssignor: - type: string - state: - type: string - coordintor: - type: string - consumers: - type: array - items: - $ref: '#/components/schemas/ConsumerTopicPartitionDetail' - required: - - consumerGroupId + allOf: + - $ref: '#/components/schemas/ConsumerGroup' + - type: object + properties: + partitions: + type: array + items: + $ref: '#/components/schemas/ConsumerGroupTopicPartition' Metric: type: object diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/Details/Details.tsx b/kafka-ui-react-app/src/components/ConsumerGroups/Details/Details.tsx index 50c4a65f56..dfb3834b47 100644 --- a/kafka-ui-react-app/src/components/ConsumerGroups/Details/Details.tsx +++ b/kafka-ui-react-app/src/components/ConsumerGroups/Details/Details.tsx @@ -6,7 +6,7 @@ import { ConsumerGroupID } from 'redux/interfaces/consumerGroup'; import { ConsumerGroup, ConsumerGroupDetails, - ConsumerTopicPartitionDetail, + ConsumerGroupTopicPartition, } from 'generated-sources'; import PageLoader from 'components/common/PageLoader/PageLoader'; import ConfirmationModal from 'components/common/ConfirmationModal/ConfirmationModal'; @@ -16,8 +16,7 @@ import ListItem from './ListItem'; export interface Props extends ConsumerGroup, ConsumerGroupDetails { clusterName: ClusterName; - consumerGroupId: ConsumerGroupID; - consumers?: ConsumerTopicPartitionDetail[]; + consumers?: ConsumerGroupTopicPartition[]; isFetched: boolean; isDeleted: boolean; fetchConsumerGroupDetails: ( @@ -29,7 +28,7 @@ export interface Props extends ConsumerGroup, ConsumerGroupDetails { const Details: React.FC = ({ clusterName, - consumerGroupId, + groupId, consumers, isFetched, isDeleted, @@ -37,8 +36,8 @@ const Details: React.FC = ({ deleteConsumerGroup, }) => { React.useEffect(() => { - fetchConsumerGroupDetails(clusterName, consumerGroupId); - }, [fetchConsumerGroupDetails, clusterName, consumerGroupId]); + fetchConsumerGroupDetails(clusterName, groupId); + }, [fetchConsumerGroupDetails, clusterName, groupId]); const items = consumers || []; const [isConfirmationModelVisible, setIsConfirmationModelVisible] = React.useState(false); @@ -46,7 +45,7 @@ const Details: React.FC = ({ const onDelete = () => { setIsConfirmationModelVisible(false); - deleteConsumerGroup(clusterName, consumerGroupId); + deleteConsumerGroup(clusterName, groupId); }; React.useEffect(() => { if (isDeleted) { @@ -66,7 +65,7 @@ const Details: React.FC = ({ }, ]} > - {consumerGroupId} + {groupId} diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/Details/DetailsContainer.ts b/kafka-ui-react-app/src/components/ConsumerGroups/Details/DetailsContainer.ts index ebe186d091..9e0cbf397c 100644 --- a/kafka-ui-react-app/src/components/ConsumerGroups/Details/DetailsContainer.ts +++ b/kafka-ui-react-app/src/components/ConsumerGroups/Details/DetailsContainer.ts @@ -30,7 +30,6 @@ const mapStateToProps = ( }: OwnProps ) => ({ clusterName, - consumerGroupID, isFetched: getIsConsumerGroupDetailsFetched(state), isDeleted: getIsConsumerGroupsDeleted(state), ...getConsumerGroupByID(state, consumerGroupID), diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/Details/ListItem.tsx b/kafka-ui-react-app/src/components/ConsumerGroups/Details/ListItem.tsx index 372ab4ac11..58cd9369f5 100644 --- a/kafka-ui-react-app/src/components/ConsumerGroups/Details/ListItem.tsx +++ b/kafka-ui-react-app/src/components/ConsumerGroups/Details/ListItem.tsx @@ -1,11 +1,11 @@ import React from 'react'; -import { ConsumerTopicPartitionDetail } from 'generated-sources'; +import { ConsumerGroupTopicPartition } from 'generated-sources'; import { NavLink } from 'react-router-dom'; import { ClusterName } from 'redux/interfaces/cluster'; interface Props { clusterName: ClusterName; - consumer: ConsumerTopicPartitionDetail; + consumer: ConsumerGroupTopicPartition; } const ListItem: React.FC = ({ clusterName, consumer }) => { diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/Details/__tests__/Details.spec.tsx b/kafka-ui-react-app/src/components/ConsumerGroups/Details/__tests__/Details.spec.tsx index d129453dd4..ab359c1628 100644 --- a/kafka-ui-react-app/src/components/ConsumerGroups/Details/__tests__/Details.spec.tsx +++ b/kafka-ui-react-app/src/components/ConsumerGroups/Details/__tests__/Details.spec.tsx @@ -15,15 +15,13 @@ describe('Details component', () => { const setupWrapper = (props?: Partial) => (
{ messagesBehind: 0, }, { - groupId: 'messages-consumer', consumerId: 'consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d1', topic: 'messages', diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/Details/__tests__/__snapshots__/Details.spec.tsx.snap b/kafka-ui-react-app/src/components/ConsumerGroups/Details/__tests__/__snapshots__/Details.spec.tsx.snap index d8db8f1f57..18ef613e0b 100644 --- a/kafka-ui-react-app/src/components/ConsumerGroups/Details/__tests__/__snapshots__/Details.spec.tsx.snap +++ b/kafka-ui-react-app/src/components/ConsumerGroups/Details/__tests__/__snapshots__/Details.spec.tsx.snap @@ -113,7 +113,6 @@ exports[`Details component when consumer gruops are fetched Matches the snapshot "consumerId": "consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d0", "currentOffset": 394, "endOffset": 394, - "groupId": "messages-consumer", "host": "/172.31.9.153", "messagesBehind": 0, "partition": 6, @@ -129,7 +128,6 @@ exports[`Details component when consumer gruops are fetched Matches the snapshot "consumerId": "consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d1", "currentOffset": 384, "endOffset": 384, - "groupId": "messages-consumer", "host": "/172.31.9.153", "messagesBehind": 0, "partition": 7, diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx b/kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx index 0dc5c84adb..66614c1d08 100644 --- a/kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx +++ b/kafka-ui-react-app/src/components/ConsumerGroups/List/List.tsx @@ -41,8 +41,11 @@ const List: React.FC = ({ consumerGroups }) => { Consumer group ID - Num of consumers + Num of members Num of topics + Messages behind + Coordinator + State @@ -50,11 +53,11 @@ const List: React.FC = ({ consumerGroups }) => { .filter( (consumerGroup) => !searchText || - consumerGroup?.consumerGroupId?.indexOf(searchText) >= 0 + consumerGroup?.groupId?.indexOf(searchText) >= 0 ) .map((consumerGroup) => ( ))} diff --git a/kafka-ui-react-app/src/components/ConsumerGroups/List/ListItem.tsx b/kafka-ui-react-app/src/components/ConsumerGroups/List/ListItem.tsx index 8dadd5c806..7ca09bcb38 100644 --- a/kafka-ui-react-app/src/components/ConsumerGroups/List/ListItem.tsx +++ b/kafka-ui-react-app/src/components/ConsumerGroups/List/ListItem.tsx @@ -1,6 +1,7 @@ import React from 'react'; import { useHistory } from 'react-router-dom'; import { ConsumerGroup } from 'generated-sources'; +import ConsumerGroupStateTag from 'components/common/ConsumerGroupState/ConsumerGroupStateTag'; const ListItem: React.FC<{ consumerGroup: ConsumerGroup }> = ({ consumerGroup, @@ -8,14 +9,19 @@ const ListItem: React.FC<{ consumerGroup: ConsumerGroup }> = ({ const history = useHistory(); function goToConsumerGroupDetails() { - history.push(`consumer-groups/${consumerGroup.consumerGroupId}`); + history.push(`consumer-groups/${consumerGroup.groupId}`); } return ( - {consumerGroup.consumerGroupId} - {consumerGroup.numConsumers} - {consumerGroup.numTopics} + {consumerGroup.groupId} + {consumerGroup.members} + {consumerGroup.topics} + {consumerGroup.messagesBehind} + {consumerGroup.coordinator?.id} + + + ); }; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/ConsumerGroups/TopicConsumerGroups.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/ConsumerGroups/TopicConsumerGroups.tsx index 309dfc2c43..4fa5ac5bf6 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/ConsumerGroups/TopicConsumerGroups.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/Details/ConsumerGroups/TopicConsumerGroups.tsx @@ -1,15 +1,13 @@ import React from 'react'; -import { - Topic, - TopicDetails, - ConsumerTopicPartitionDetail, -} from 'generated-sources'; +import { Topic, TopicDetails, ConsumerGroup } from 'generated-sources'; import { ClusterName, TopicName } from 'redux/interfaces'; +import ConsumerGroupStateTag from 'components/common/ConsumerGroupState/ConsumerGroupStateTag'; +import { useHistory } from 'react-router'; interface Props extends Topic, TopicDetails { clusterName: ClusterName; topicName: TopicName; - consumerGroups: ConsumerTopicPartitionDetail[]; + consumerGroups: ConsumerGroup[]; fetchTopicConsumerGroups( clusterName: ClusterName, topicName: TopicName @@ -26,31 +24,38 @@ const TopicConsumerGroups: React.FC = ({ fetchTopicConsumerGroups(clusterName, topicName); }, []); + const history = useHistory(); + function goToConsumerGroupDetails(consumer: ConsumerGroup) { + history.push(`consumer-groups/${consumer.groupId}`); + } + return (
{consumerGroups.length > 0 ? ( - - - - + + - - + + {consumerGroups.map((consumer) => ( - + goToConsumerGroupDetails(consumer)} + > - - - + - - + + ))} diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Details/ConsumerGroups/__test__/TopicConsumerGroups.spec.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Details/ConsumerGroups/__test__/TopicConsumerGroups.spec.tsx index 17ad0c969e..f20454fc3d 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/Details/ConsumerGroups/__test__/TopicConsumerGroups.spec.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/Details/ConsumerGroups/__test__/TopicConsumerGroups.spec.tsx @@ -1,6 +1,7 @@ import React from 'react'; import { shallow } from 'enzyme'; import ConsumerGroups from 'components/Topics/Topic/Details/ConsumerGroups/TopicConsumerGroups'; +import { ConsumerGroupState } from 'generated-sources'; describe('Details', () => { const mockFn = jest.fn(); @@ -8,26 +9,24 @@ describe('Details', () => { const mockTopicName = 'local'; const mockWithConsumerGroup = [ { - groupId: 'messages-consumer', - consumerId: - 'consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d0', - topic: 'messages', - host: '/172.31.9.153', - partition: 6, - currentOffset: 394, - endOffset: 394, - messagesBehind: 0, + groupId: 'amazon.msk.canary.group.broker-7', + topics: 0, + members: 0, + simple: false, + partitionAssignor: '', + state: ConsumerGroupState.UNKNOWN, + coordinator: { id: 1 }, + messagesBehind: 9, }, { - groupId: 'messages-consumer', - consumerId: - 'consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d0', - topic: 'messages', - host: '/172.31.9.153', - partition: 7, - currentOffset: 384, - endOffset: 384, - messagesBehind: 0, + groupId: 'amazon.msk.canary.group.broker-4', + topics: 0, + members: 0, + simple: false, + partitionAssignor: '', + state: ConsumerGroupState.COMPLETING_REBALANCE, + coordinator: { id: 1 }, + messagesBehind: 9, }, ]; diff --git a/kafka-ui-react-app/src/components/common/ConsumerGroupState/ConsumerGroupStateTag.tsx b/kafka-ui-react-app/src/components/common/ConsumerGroupState/ConsumerGroupStateTag.tsx new file mode 100644 index 0000000000..92ddff0b2c --- /dev/null +++ b/kafka-ui-react-app/src/components/common/ConsumerGroupState/ConsumerGroupStateTag.tsx @@ -0,0 +1,40 @@ +import { ConsumerGroupState } from 'generated-sources'; +import React from 'react'; + +interface Props { + state?: ConsumerGroupState; +} + +const ConsumerGroupStateTag: React.FC = ({ state }) => { + let classes: string; + switch (state) { + case ConsumerGroupState.DEAD: + classes = 'is-danger'; + break; + case ConsumerGroupState.EMPTY: + classes = 'is-info'; + break; + case ConsumerGroupState.PREPARING_REBALANCE: + classes = 'is-warning'; + break; + case ConsumerGroupState.COMPLETING_REBALANCE: + classes = 'is-success'; + break; + case ConsumerGroupState.STABLE: + classes = 'is-primary'; + break; + case ConsumerGroupState.UNKNOWN: + classes = 'is-light'; + break; + default: + classes = 'is-danger'; + } + + if (!state) { + return Unknown; + } + + return {state}; +}; + +export default ConsumerGroupStateTag; diff --git a/kafka-ui-react-app/src/redux/actions/__test__/thunks/topics.spec.ts b/kafka-ui-react-app/src/redux/actions/__test__/thunks/topics.spec.ts index 4b81d22a61..7a1a2db89a 100644 --- a/kafka-ui-react-app/src/redux/actions/__test__/thunks/topics.spec.ts +++ b/kafka-ui-react-app/src/redux/actions/__test__/thunks/topics.spec.ts @@ -98,7 +98,7 @@ describe('Thunks', () => { describe('fetchTopicConsumerGroups', () => { it('GET_TOPIC_CONSUMER_GROUPS__FAILURE', async () => { fetchMock.getOnce( - `api/clusters/${clusterName}/topics/${topicName}/consumergroups`, + `api/clusters/${clusterName}/topics/${topicName}/consumer-groups`, 404 ); try { @@ -116,7 +116,7 @@ describe('Thunks', () => { it('GET_TOPIC_CONSUMER_GROUPS__SUCCESS', async () => { fetchMock.getOnce( - `api/clusters/${clusterName}/topics/${topicName}/consumergroups`, + `api/clusters/${clusterName}/topics/${topicName}/consumer-groups`, 200 ); try { diff --git a/kafka-ui-react-app/src/redux/interfaces/consumerGroup.ts b/kafka-ui-react-app/src/redux/interfaces/consumerGroup.ts index 4a5b733469..3fec64a9b6 100644 --- a/kafka-ui-react-app/src/redux/interfaces/consumerGroup.ts +++ b/kafka-ui-react-app/src/redux/interfaces/consumerGroup.ts @@ -1,10 +1,8 @@ import { ConsumerGroup, ConsumerGroupDetails } from 'generated-sources'; -export type ConsumerGroupID = ConsumerGroup['consumerGroupId']; +export type ConsumerGroupID = ConsumerGroup['groupId']; -export interface ConsumerGroupDetailedInfo - extends ConsumerGroup, - ConsumerGroupDetails {} +export type ConsumerGroupDetailedInfo = ConsumerGroupDetails; export interface ConsumerGroupsState { byID: { [consumerGroupID: string]: ConsumerGroupDetailedInfo }; diff --git a/kafka-ui-react-app/src/redux/interfaces/topic.ts b/kafka-ui-react-app/src/redux/interfaces/topic.ts index 277c00e0c1..9f7195b61d 100644 --- a/kafka-ui-react-app/src/redux/interfaces/topic.ts +++ b/kafka-ui-react-app/src/redux/interfaces/topic.ts @@ -7,7 +7,6 @@ import { GetTopicMessagesRequest, ConsumerGroup, TopicColumnsToSort, - TopicConsumerGroups, } from 'generated-sources'; export type TopicName = Topic['name']; @@ -42,7 +41,7 @@ export interface TopicFormCustomParams { export interface TopicWithDetailedInfo extends Topic, TopicDetails { config?: TopicConfig[]; - consumerGroups?: TopicConsumerGroups; + consumerGroups?: ConsumerGroup[]; } export interface TopicsState { diff --git a/kafka-ui-react-app/src/redux/reducers/consumerGroups/__test__/reducer.spec.ts b/kafka-ui-react-app/src/redux/reducers/consumerGroups/__test__/reducer.spec.ts index cef1e80bf0..c9bfa71807 100644 --- a/kafka-ui-react-app/src/redux/reducers/consumerGroups/__test__/reducer.spec.ts +++ b/kafka-ui-react-app/src/redux/reducers/consumerGroups/__test__/reducer.spec.ts @@ -5,8 +5,7 @@ import * as actions from 'redux/actions'; const state: ConsumerGroupsState = { byID: { test: { - clusterId: 'local', - consumerGroupId: 'test', + groupId: 'test', }, }, allIDs: ['test'], diff --git a/kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts b/kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts index 2328f36ee0..ab0be7fe35 100644 --- a/kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts +++ b/kafka-ui-react-app/src/redux/reducers/consumerGroups/reducer.ts @@ -22,12 +22,12 @@ const updateConsumerGroupsList = ( ...memo, byID: { ...memo.byID, - [consumerGroup.consumerGroupId]: { - ...memo.byID[consumerGroup.consumerGroupId], + [consumerGroup.groupId]: { + ...memo.byID[consumerGroup.groupId], ...consumerGroup, }, }, - allIDs: [...memo.allIDs, consumerGroup.consumerGroupId], + allIDs: [...memo.allIDs, consumerGroup.groupId], }), initialMemo ); diff --git a/kafka-ui-react-app/src/redux/reducers/topics/selectors.ts b/kafka-ui-react-app/src/redux/reducers/topics/selectors.ts index 2936d1c2ac..99075fd531 100644 --- a/kafka-ui-react-app/src/redux/reducers/topics/selectors.ts +++ b/kafka-ui-react-app/src/redux/reducers/topics/selectors.ts @@ -140,5 +140,5 @@ export const getIsTopicInternal = createSelector( export const getTopicConsumerGroups = createSelector( getTopicMap, getTopicName, - (topics, topicName) => topics[topicName].consumerGroups?.consumers || [] + (topics, topicName) => topics[topicName].consumerGroups || [] );
Group IDConsumer IDHostPartitionConsumer group IDNum of members Messages behindCurrent offsetEnd offsetCoordinatorState
{consumer.groupId}{consumer.consumerId}{consumer.host}{consumer.partition}{consumer.members} {consumer.messagesBehind}{consumer.currentOffset}{consumer.endOffset}{consumer.coordinator?.id} + +