Refactor consumer groups requests (#636)

* Refactor consumer groups requests

* Fixed offsets tests

* Moved state enum to separate class

* Adjust frontend for the new API

Co-authored-by: Alexander <mr.afigitelniychuvak@gmail.com>
This commit is contained in:
German Osin 2021-07-09 15:55:43 +03:00 committed by GitHub
parent dd19cc2eec
commit 13463fe95f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 395 additions and 280 deletions

View file

@ -9,11 +9,11 @@ import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ConsumerGroupDetails; import com.provectus.kafka.ui.model.ConsumerGroupDetails;
import com.provectus.kafka.ui.model.ConsumerGroupOffsetsReset; import com.provectus.kafka.ui.model.ConsumerGroupOffsetsReset;
import com.provectus.kafka.ui.model.PartitionOffset; import com.provectus.kafka.ui.model.PartitionOffset;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.service.ClusterService; import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.service.ClustersStorage; import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.OffsetsResetService; import com.provectus.kafka.ui.service.OffsetsResetService;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
@ -56,12 +56,15 @@ public class ConsumerGroupsController implements ConsumerGroupsApi {
} }
@Override @Override
public Mono<ResponseEntity<TopicConsumerGroups>> getTopicConsumerGroups( public Mono<ResponseEntity<Flux<ConsumerGroup>>> getTopicConsumerGroups(
String clusterName, String topicName, ServerWebExchange exchange) { String clusterName, String topicName, ServerWebExchange exchange) {
return clusterService.getTopicConsumerGroupDetail(clusterName, topicName) return clusterService.getConsumerGroups(clusterName, Optional.of(topicName))
.map(ResponseEntity::ok); .map(Flux::fromIterable)
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
} }
@Override @Override
public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName, String group, public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName, String group,
Mono<ConsumerGroupOffsetsReset> Mono<ConsumerGroupOffsetsReset>

View file

@ -0,0 +1,34 @@
package com.provectus.kafka.ui.model;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import lombok.Builder;
import lombok.Data;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@Data
@Builder(toBuilder = true)
public class InternalConsumerGroup {
private final String groupId;
private final boolean simple;
private final Collection<InternalMember> members;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final Map<TopicPartition, Long> endOffsets;
private final String partitionAssignor;
private final ConsumerGroupState state;
private final Node coordinator;
@Data
@Builder(toBuilder = true)
public static class InternalMember {
private final String consumerId;
private final String groupInstanceId;
private final String clientId;
private final String host;
private final Set<TopicPartition> assignment;
}
}

View file

@ -25,7 +25,6 @@ import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse;
import com.provectus.kafka.ui.model.Topic; import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicColumnsToSort; import com.provectus.kafka.ui.model.TopicColumnsToSort;
import com.provectus.kafka.ui.model.TopicConfig; import com.provectus.kafka.ui.model.TopicConfig;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicCreation;
import com.provectus.kafka.ui.model.TopicDetails; import com.provectus.kafka.ui.model.TopicDetails;
import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.model.TopicMessage;
@ -37,24 +36,20 @@ import com.provectus.kafka.ui.util.ClusterUtil;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult; import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@ -190,46 +185,26 @@ public class ClusterService {
public Mono<ConsumerGroupDetails> getConsumerGroupDetail(String clusterName, public Mono<ConsumerGroupDetails> getConsumerGroupDetail(String clusterName,
String consumerGroupId) { String consumerGroupId) {
var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new); var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new);
return kafkaService.getConsumerGroups(
return kafkaService.getOrCreateAdminClient(cluster).map(ac -> cluster,
ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all() Optional.empty(),
).flatMap(groups -> Collections.singletonList(consumerGroupId)
kafkaService.groupMetadata(cluster, consumerGroupId) ).filter(groups -> !groups.isEmpty()).map(groups -> groups.get(0)).map(
.flatMap(offsets -> { ClusterUtil::convertToConsumerGroupDetails
Map<TopicPartition, Long> endOffsets =
kafkaService.topicPartitionsEndOffsets(cluster, offsets.keySet());
return ClusterUtil.toMono(groups).map(s ->
Tuples.of(
s.get(consumerGroupId),
s.get(consumerGroupId).members().stream()
.flatMap(c ->
Stream.of(
ClusterUtil.convertToConsumerTopicPartitionDetails(
c, offsets, endOffsets, consumerGroupId
)
)
)
.collect(Collectors.toList()).stream()
.flatMap(t ->
t.stream().flatMap(Stream::of)
).collect(Collectors.toList())
)
);
}).map(c -> ClusterUtil.convertToConsumerGroupDetails(c.getT1(), c.getT2()))
); );
} }
public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) { public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) return getConsumerGroups(clusterName, Optional.empty());
.switchIfEmpty(Mono.error(ClusterNotFoundException::new))
.flatMap(kafkaService::getConsumerGroups);
} }
public Mono<TopicConsumerGroups> getTopicConsumerGroupDetail( public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName, Optional<String> topic) {
String clusterName, String topicName) {
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
.switchIfEmpty(Mono.error(ClusterNotFoundException::new)) .switchIfEmpty(Mono.error(ClusterNotFoundException::new))
.flatMap(c -> kafkaService.getTopicConsumerGroups(c, topicName)); .flatMap(c -> kafkaService.getConsumerGroups(c, topic, Collections.emptyList()))
.map(c ->
c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())
);
} }
public Flux<Broker> getBrokers(String clusterName) { public Flux<Broker> getBrokers(String clusterName) {

View file

@ -1,12 +1,12 @@
package com.provectus.kafka.ui.service; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.CreateTopicMessage;
import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.ExtendedAdminClient;
import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalBrokerMetrics;
import com.provectus.kafka.ui.model.InternalClusterMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics;
import com.provectus.kafka.ui.model.InternalConsumerGroup;
import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalPartition;
import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalReplica;
import com.provectus.kafka.ui.model.InternalSegmentSizeDto; import com.provectus.kafka.ui.model.InternalSegmentSizeDto;
@ -17,7 +17,6 @@ import com.provectus.kafka.ui.model.Metric;
import com.provectus.kafka.ui.model.PartitionsIncrease; import com.provectus.kafka.ui.model.PartitionsIncrease;
import com.provectus.kafka.ui.model.ReplicationFactorChange; import com.provectus.kafka.ui.model.ReplicationFactorChange;
import com.provectus.kafka.ui.model.ServerStatus; import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicCreation;
import com.provectus.kafka.ui.model.TopicUpdate; import com.provectus.kafka.ui.model.TopicUpdate;
import com.provectus.kafka.ui.serde.DeserializationService; import com.provectus.kafka.ui.serde.DeserializationService;
@ -51,7 +50,6 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitionReassignment;
@ -327,45 +325,59 @@ public class KafkaService {
); );
} }
public Mono<Collection<ConsumerGroupDescription>> getConsumerGroupsInternal( public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
KafkaCluster cluster) { KafkaCluster cluster) {
return getOrCreateAdminClient(cluster).flatMap(ac -> return getOrCreateAdminClient(cluster).flatMap(ac ->
ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()) ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
.flatMap(s -> .flatMap(s ->
ClusterUtil.toMono( getConsumerGroupsInternal(
ac.getAdminClient().describeConsumerGroups( cluster,
s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()) s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()))
).all() )
).map(Map::values) );
}
public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
KafkaCluster cluster, List<String> groupIds) {
return getOrCreateAdminClient(cluster).flatMap(ac ->
ClusterUtil.toMono(
ac.getAdminClient().describeConsumerGroups(groupIds).all()
).map(Map::values)
).flatMap(descriptions ->
Flux.fromIterable(descriptions)
.parallel()
.flatMap(d ->
groupMetadata(cluster, d.groupId())
.map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets))
) )
.sequential()
.collectList()
); );
} }
public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) { public Mono<List<InternalConsumerGroup>> getConsumerGroups(
return getConsumerGroupsInternal(cluster) KafkaCluster cluster, Optional<String> topic, List<String> groupIds) {
.map(c -> c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())); final Mono<List<InternalConsumerGroup>> consumerGroups;
}
public Mono<TopicConsumerGroups> getTopicConsumerGroups(KafkaCluster cluster, String topic) { if (groupIds.isEmpty()) {
final Map<TopicPartition, Long> endOffsets = topicEndOffsets(cluster, topic); consumerGroups = getConsumerGroupsInternal(cluster);
} else {
consumerGroups = getConsumerGroupsInternal(cluster, groupIds);
}
return getConsumerGroupsInternal(cluster) return consumerGroups.map(c ->
.flatMapIterable(c ->
c.stream() c.stream()
.map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic)) .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic))
.filter(Optional::isPresent) .filter(Optional::isPresent)
.map(Optional::get) .map(Optional::get)
.map(d -> .map(g ->
groupMetadata(cluster, d.groupId()) g.toBuilder().endOffsets(
.flatMapIterable(meta -> topicPartitionsEndOffsets(cluster, g.getOffsets().keySet())
d.members().stream().flatMap(m -> ).build()
ClusterUtil.convertToConsumerTopicPartitionDetails( )
m, meta, endOffsets, d.groupId() .collect(Collectors.toList())
).stream() );
).collect(Collectors.toList())
)
).collect(Collectors.toList())
).flatMap(f -> f).collectList().map(l -> new TopicConsumerGroups().consumers(l));
} }
public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster, public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
@ -377,16 +389,6 @@ public class KafkaService {
).flatMap(ClusterUtil::toMono); ).flatMap(ClusterUtil::toMono);
} }
public Map<TopicPartition, Long> topicEndOffsets(
KafkaCluster cluster, String topic) {
try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
final List<TopicPartition> topicPartitions = consumer.partitionsFor(topic).stream()
.map(i -> new TopicPartition(i.topic(), i.partition()))
.collect(Collectors.toList());
return consumer.endOffsets(topicPartitions);
}
}
public Map<TopicPartition, Long> topicPartitionsEndOffsets( public Map<TopicPartition, Long> topicPartitionsEndOffsets(
KafkaCluster cluster, Collection<TopicPartition> topicPartitions) { KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) { try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {

View file

@ -9,6 +9,7 @@ import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.InternalConsumerGroup;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
@ -78,20 +79,20 @@ public class OffsetsResetService {
} }
private void checkGroupCondition(KafkaCluster cluster, String groupId) { private void checkGroupCondition(KafkaCluster cluster, String groupId) {
ConsumerGroupDescription description = InternalConsumerGroup description =
kafkaService.getConsumerGroupsInternal(cluster) kafkaService.getConsumerGroupsInternal(cluster)
.blockOptional() .blockOptional()
.stream() .stream()
.flatMap(Collection::stream) .flatMap(Collection::stream)
.filter(cgd -> cgd.groupId().equals(groupId)) .filter(cgd -> cgd.getGroupId().equals(groupId))
.findAny() .findAny()
.orElseThrow(() -> new NotFoundException("Consumer group not found")); .orElseThrow(() -> new NotFoundException("Consumer group not found"));
if (!Set.of(DEAD, EMPTY).contains(description.state())) { if (!Set.of(DEAD, EMPTY).contains(description.getState())) {
throw new ValidationException( throw new ValidationException(
String.format( String.format(
"Group's offsets can be reset only if group is inactive, but group is in %s state", "Group's offsets can be reset only if group is inactive, but group is in %s state",
description.state())); description.getState()));
} }
} }

View file

@ -3,10 +3,13 @@ package com.provectus.kafka.ui.util;
import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS; import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
import com.provectus.kafka.ui.model.Broker;
import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ConsumerGroupDetails; import com.provectus.kafka.ui.model.ConsumerGroupDetails;
import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail; import com.provectus.kafka.ui.model.ConsumerGroupState;
import com.provectus.kafka.ui.model.ConsumerGroupTopicPartition;
import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.ExtendedAdminClient;
import com.provectus.kafka.ui.model.InternalConsumerGroup;
import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalPartition;
import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalReplica;
import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopic;
@ -17,6 +20,7 @@ import com.provectus.kafka.ui.serde.RecordSerDe;
import java.time.Instant; import java.time.Instant;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -31,8 +35,6 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -72,57 +74,120 @@ public class ClusterUtil {
})); }));
} }
public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c) { public static InternalConsumerGroup convertToInternalConsumerGroup(
ConsumerGroup consumerGroup = new ConsumerGroup(); ConsumerGroupDescription description, Map<TopicPartition, OffsetAndMetadata> offsets) {
consumerGroup.setConsumerGroupId(c.groupId());
consumerGroup.setNumConsumers(c.members().size()); var builder = InternalConsumerGroup.builder();
int numTopics = c.members().stream() builder.groupId(description.groupId());
.flatMap(m -> m.assignment().topicPartitions().stream().flatMap(t -> Stream.of(t.topic()))) builder.simple(description.isSimpleConsumerGroup());
.collect(Collectors.toSet()).size(); builder.state(description.state());
consumerGroup.setNumTopics(numTopics); builder.partitionAssignor(description.partitionAssignor());
consumerGroup.setSimple(c.isSimpleConsumerGroup()); builder.members(
Optional.ofNullable(c.state()) description.members().stream()
.ifPresent(s -> consumerGroup.setState(s.name())); .map(m ->
Optional.ofNullable(c.coordinator()) InternalConsumerGroup.InternalMember.builder()
.ifPresent(coord -> consumerGroup.setCoordintor(coord.host())); .assignment(m.assignment().topicPartitions())
consumerGroup.setPartitionAssignor(c.partitionAssignor()); .clientId(m.clientId())
.groupInstanceId(m.groupInstanceId().orElse(""))
.consumerId(m.consumerId())
.clientId(m.clientId())
.host(m.host())
.build()
).collect(Collectors.toList())
);
builder.offsets(offsets);
Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
return builder.build();
}
public static ConsumerGroup convertToConsumerGroup(InternalConsumerGroup c) {
return convertToConsumerGroup(c, new ConsumerGroup());
}
public static <T extends ConsumerGroup> T convertToConsumerGroup(
InternalConsumerGroup c, T consumerGroup) {
consumerGroup.setGroupId(c.getGroupId());
consumerGroup.setMembers(c.getMembers().size());
int numTopics = Stream.concat(
c.getOffsets().keySet().stream().map(TopicPartition::topic),
c.getMembers().stream()
.flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
).collect(Collectors.toSet()).size();
long messagesBehind = c.getOffsets().entrySet().stream()
.mapToLong(e ->
Optional.ofNullable(c.getEndOffsets())
.map(o -> o.get(e.getKey()))
.map(o -> o - e.getValue().offset())
.orElse(0L)
).sum();
consumerGroup.setMessagesBehind(messagesBehind);
consumerGroup.setTopics(numTopics);
consumerGroup.setSimple(c.isSimple());
Optional.ofNullable(c.getState())
.ifPresent(s -> consumerGroup.setState(mapConsumerGroupState(s)));
Optional.ofNullable(c.getCoordinator())
.ifPresent(cd -> consumerGroup.setCoordinator(mapCoordinator(cd)));
consumerGroup.setPartitionAssignor(c.getPartitionAssignor());
return consumerGroup; return consumerGroup;
} }
public static ConsumerGroupDetails convertToConsumerGroupDetails( public static ConsumerGroupDetails convertToConsumerGroupDetails(InternalConsumerGroup g) {
ConsumerGroupDescription desc, List<ConsumerTopicPartitionDetail> consumers final ConsumerGroupDetails details = convertToConsumerGroup(g, new ConsumerGroupDetails());
) { Map<TopicPartition, ConsumerGroupTopicPartition> partitionMap = new HashMap<>();
return new ConsumerGroupDetails()
.consumers(consumers) for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : g.getOffsets().entrySet()) {
.consumerGroupId(desc.groupId()) ConsumerGroupTopicPartition partition = new ConsumerGroupTopicPartition();
.simple(desc.isSimpleConsumerGroup()) partition.setTopic(entry.getKey().topic());
.coordintor(Optional.ofNullable(desc.coordinator()).map(Node::host).orElse("")) partition.setPartition(entry.getKey().partition());
.state(Optional.ofNullable(desc.state()).map(Enum::name).orElse("")) partition.setCurrentOffset(entry.getValue().offset());
.partitionAssignor(desc.partitionAssignor());
final Optional<Long> endOffset = Optional.ofNullable(g.getEndOffsets())
.map(o -> o.get(entry.getKey()));
final Long behind = endOffset.map(o -> o - entry.getValue().offset())
.orElse(0L);
partition.setEndOffset(endOffset.orElse(0L));
partition.setMessagesBehind(behind);
partitionMap.put(entry.getKey(), partition);
}
for (InternalConsumerGroup.InternalMember member : g.getMembers()) {
for (TopicPartition topicPartition : member.getAssignment()) {
final ConsumerGroupTopicPartition partition = partitionMap.computeIfAbsent(topicPartition,
(tp) -> new ConsumerGroupTopicPartition()
.topic(tp.topic())
.partition(tp.partition())
);
partition.setHost(member.getHost());
partition.setConsumerId(member.getConsumerId());
partitionMap.put(topicPartition, partition);
}
}
details.setPartitions(new ArrayList<>(partitionMap.values()));
return details;
} }
public static List<ConsumerTopicPartitionDetail> convertToConsumerTopicPartitionDetails( private static Broker mapCoordinator(Node node) {
MemberDescription consumer, return new Broker().host(node.host()).id(node.id());
Map<TopicPartition, OffsetAndMetadata> groupOffsets, }
Map<TopicPartition, Long> endOffsets,
String groupId private static ConsumerGroupState mapConsumerGroupState(
) { org.apache.kafka.common.ConsumerGroupState state) {
return consumer.assignment().topicPartitions().stream() switch (state) {
.map(tp -> { case DEAD: return ConsumerGroupState.DEAD;
long currentOffset = Optional.ofNullable(groupOffsets.get(tp)) case EMPTY: return ConsumerGroupState.EMPTY;
.map(OffsetAndMetadata::offset).orElse(0L); case STABLE: return ConsumerGroupState.STABLE;
long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L); case PREPARING_REBALANCE: return ConsumerGroupState.PREPARING_REBALANCE;
ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail(); case COMPLETING_REBALANCE: return ConsumerGroupState.COMPLETING_REBALANCE;
cd.setGroupId(groupId); default: return ConsumerGroupState.UNKNOWN;
cd.setConsumerId(consumer.consumerId()); }
cd.setHost(consumer.host());
cd.setTopic(tp.topic());
cd.setPartition(tp.partition());
cd.setCurrentOffset(currentOffset);
cd.setEndOffset(endOffset);
cd.setMessagesBehind(endOffset - currentOffset);
return cd;
}).collect(Collectors.toList());
} }
@ -282,42 +347,40 @@ public class ClusterUtil {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow(); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
} }
public static Optional<ConsumerGroupDescription> filterConsumerGroupTopic( public static Optional<InternalConsumerGroup> filterConsumerGroupTopic(
ConsumerGroupDescription description, String topic) { InternalConsumerGroup consumerGroup, Optional<String> topic) {
final List<MemberDescription> members = description.members().stream()
.map(m -> filterConsumerMemberTopic(m, topic))
.filter(m -> !m.assignment().topicPartitions().isEmpty())
.collect(Collectors.toList());
if (!members.isEmpty()) { final Map<TopicPartition, OffsetAndMetadata> offsets =
consumerGroup.getOffsets().entrySet().stream()
.filter(e -> topic.isEmpty() || e.getKey().topic().equals(topic.get()))
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
));
final Collection<InternalConsumerGroup.InternalMember> members =
consumerGroup.getMembers().stream()
.map(m -> filterConsumerMemberTopic(m, topic))
.filter(m -> !m.getAssignment().isEmpty())
.collect(Collectors.toList());
if (!members.isEmpty() || !offsets.isEmpty()) {
return Optional.of( return Optional.of(
new ConsumerGroupDescription( consumerGroup.toBuilder()
description.groupId(), .offsets(offsets)
description.isSimpleConsumerGroup(), .members(members)
members, .build()
description.partitionAssignor(),
description.state(),
description.coordinator()
)
); );
} else { } else {
return Optional.empty(); return Optional.empty();
} }
} }
public static MemberDescription filterConsumerMemberTopic( public static InternalConsumerGroup.InternalMember filterConsumerMemberTopic(
MemberDescription description, String topic) { InternalConsumerGroup.InternalMember member, Optional<String> topic) {
final Set<TopicPartition> topicPartitions = description.assignment().topicPartitions() final Set<TopicPartition> topicPartitions = member.getAssignment()
.stream().filter(tp -> tp.topic().equals(topic)) .stream().filter(tp -> topic.isEmpty() || tp.topic().equals(topic.get()))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
MemberAssignment assignment = new MemberAssignment(topicPartitions); return member.toBuilder().assignment(topicPartitions).build();
return new MemberDescription(
description.consumerId(),
description.groupInstanceId(),
description.clientId(),
description.host(),
assignment
);
} }
} }

View file

@ -472,7 +472,7 @@ paths:
schema: schema:
$ref: '#/components/schemas/TopicMessageSchema' $ref: '#/components/schemas/TopicMessageSchema'
/api/clusters/{clusterName}/topics/{topicName}/consumergroups: /api/clusters/{clusterName}/topics/{topicName}/consumer-groups:
get: get:
tags: tags:
- Consumer Groups - Consumer Groups
@ -495,7 +495,10 @@ paths:
content: content:
application/json: application/json:
schema: schema:
$ref: '#/components/schemas/TopicConsumerGroups' type: array
items:
$ref: '#/components/schemas/ConsumerGroup'
/api/clusters/{clusterName}/consumer-groups/{id}: /api/clusters/{clusterName}/consumer-groups/{id}:
get: get:
@ -542,7 +545,7 @@ paths:
200: 200:
description: OK description: OK
/api/clusters/{clusterName}/consumerGroups: /api/clusters/{clusterName}/consumer-groups:
get: get:
tags: tags:
- Consumer Groups - Consumer Groups
@ -1580,28 +1583,38 @@ components:
required: required:
- id - id
ConsumerGroupState:
type: string
enum:
- UNKNOWN
- PREPARING_REBALANCE
- COMPLETING_REBALANCE
- STABLE
- DEAD
- EMPTY
ConsumerGroup: ConsumerGroup:
type: object type: object
properties: properties:
clusterId: groupId:
type: string type: string
consumerGroupId: members:
type: string
numConsumers:
type: integer type: integer
numTopics: topics:
type: integer type: integer
simple: simple:
type: boolean type: boolean
partitionAssignor: partitionAssignor:
type: string type: string
state: state:
type: string $ref: "#/components/schemas/ConsumerGroupState"
coordintor: coordinator:
type: string $ref: "#/components/schemas/Broker"
messagesBehind:
type: integer
format: int64
required: required:
- clusterId - groupId
- consumerGroupId
CreateTopicMessage: CreateTopicMessage:
type: object type: object
@ -1713,17 +1726,11 @@ components:
- offsetMax - offsetMax
- offsetMin - offsetMin
ConsumerTopicPartitionDetail: ConsumerGroupTopicPartition:
type: object type: object
properties: properties:
groupId:
type: string
consumerId:
type: string
topic: topic:
type: string type: string
host:
type: string
partition: partition:
type: integer type: integer
currentOffset: currentOffset:
@ -1735,36 +1742,24 @@ components:
messagesBehind: messagesBehind:
type: integer type: integer
format: int64 format: int64
consumerId:
type: string
host:
type: string
required: required:
- consumerId - topic
- partition
TopicConsumerGroups:
type: object
properties:
consumers:
type: array
items:
$ref: '#/components/schemas/ConsumerTopicPartitionDetail'
ConsumerGroupDetails: ConsumerGroupDetails:
type: object allOf:
properties: - $ref: '#/components/schemas/ConsumerGroup'
consumerGroupId: - type: object
type: string properties:
simple: partitions:
type: boolean type: array
partitionAssignor: items:
type: string $ref: '#/components/schemas/ConsumerGroupTopicPartition'
state:
type: string
coordintor:
type: string
consumers:
type: array
items:
$ref: '#/components/schemas/ConsumerTopicPartitionDetail'
required:
- consumerGroupId
Metric: Metric:
type: object type: object

View file

@ -6,7 +6,7 @@ import { ConsumerGroupID } from 'redux/interfaces/consumerGroup';
import { import {
ConsumerGroup, ConsumerGroup,
ConsumerGroupDetails, ConsumerGroupDetails,
ConsumerTopicPartitionDetail, ConsumerGroupTopicPartition,
} from 'generated-sources'; } from 'generated-sources';
import PageLoader from 'components/common/PageLoader/PageLoader'; import PageLoader from 'components/common/PageLoader/PageLoader';
import ConfirmationModal from 'components/common/ConfirmationModal/ConfirmationModal'; import ConfirmationModal from 'components/common/ConfirmationModal/ConfirmationModal';
@ -16,8 +16,7 @@ import ListItem from './ListItem';
export interface Props extends ConsumerGroup, ConsumerGroupDetails { export interface Props extends ConsumerGroup, ConsumerGroupDetails {
clusterName: ClusterName; clusterName: ClusterName;
consumerGroupId: ConsumerGroupID; consumers?: ConsumerGroupTopicPartition[];
consumers?: ConsumerTopicPartitionDetail[];
isFetched: boolean; isFetched: boolean;
isDeleted: boolean; isDeleted: boolean;
fetchConsumerGroupDetails: ( fetchConsumerGroupDetails: (
@ -29,7 +28,7 @@ export interface Props extends ConsumerGroup, ConsumerGroupDetails {
const Details: React.FC<Props> = ({ const Details: React.FC<Props> = ({
clusterName, clusterName,
consumerGroupId, groupId,
consumers, consumers,
isFetched, isFetched,
isDeleted, isDeleted,
@ -37,8 +36,8 @@ const Details: React.FC<Props> = ({
deleteConsumerGroup, deleteConsumerGroup,
}) => { }) => {
React.useEffect(() => { React.useEffect(() => {
fetchConsumerGroupDetails(clusterName, consumerGroupId); fetchConsumerGroupDetails(clusterName, groupId);
}, [fetchConsumerGroupDetails, clusterName, consumerGroupId]); }, [fetchConsumerGroupDetails, clusterName, groupId]);
const items = consumers || []; const items = consumers || [];
const [isConfirmationModelVisible, setIsConfirmationModelVisible] = const [isConfirmationModelVisible, setIsConfirmationModelVisible] =
React.useState<boolean>(false); React.useState<boolean>(false);
@ -46,7 +45,7 @@ const Details: React.FC<Props> = ({
const onDelete = () => { const onDelete = () => {
setIsConfirmationModelVisible(false); setIsConfirmationModelVisible(false);
deleteConsumerGroup(clusterName, consumerGroupId); deleteConsumerGroup(clusterName, groupId);
}; };
React.useEffect(() => { React.useEffect(() => {
if (isDeleted) { if (isDeleted) {
@ -66,7 +65,7 @@ const Details: React.FC<Props> = ({
}, },
]} ]}
> >
{consumerGroupId} {groupId}
</Breadcrumb> </Breadcrumb>
</div> </div>
</div> </div>

View file

@ -30,7 +30,6 @@ const mapStateToProps = (
}: OwnProps }: OwnProps
) => ({ ) => ({
clusterName, clusterName,
consumerGroupID,
isFetched: getIsConsumerGroupDetailsFetched(state), isFetched: getIsConsumerGroupDetailsFetched(state),
isDeleted: getIsConsumerGroupsDeleted(state), isDeleted: getIsConsumerGroupsDeleted(state),
...getConsumerGroupByID(state, consumerGroupID), ...getConsumerGroupByID(state, consumerGroupID),

View file

@ -1,11 +1,11 @@
import React from 'react'; import React from 'react';
import { ConsumerTopicPartitionDetail } from 'generated-sources'; import { ConsumerGroupTopicPartition } from 'generated-sources';
import { NavLink } from 'react-router-dom'; import { NavLink } from 'react-router-dom';
import { ClusterName } from 'redux/interfaces/cluster'; import { ClusterName } from 'redux/interfaces/cluster';
interface Props { interface Props {
clusterName: ClusterName; clusterName: ClusterName;
consumer: ConsumerTopicPartitionDetail; consumer: ConsumerGroupTopicPartition;
} }
const ListItem: React.FC<Props> = ({ clusterName, consumer }) => { const ListItem: React.FC<Props> = ({ clusterName, consumer }) => {

View file

@ -15,15 +15,13 @@ describe('Details component', () => {
const setupWrapper = (props?: Partial<Props>) => ( const setupWrapper = (props?: Partial<Props>) => (
<Details <Details
clusterName="local" clusterName="local"
clusterId="local" groupId="test"
consumerGroupId="test"
isFetched isFetched
isDeleted={false} isDeleted={false}
fetchConsumerGroupDetails={jest.fn()} fetchConsumerGroupDetails={jest.fn()}
deleteConsumerGroup={jest.fn()} deleteConsumerGroup={jest.fn()}
consumers={[ consumers={[
{ {
groupId: 'messages-consumer',
consumerId: consumerId:
'consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d0', 'consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d0',
topic: 'messages', topic: 'messages',
@ -34,7 +32,6 @@ describe('Details component', () => {
messagesBehind: 0, messagesBehind: 0,
}, },
{ {
groupId: 'messages-consumer',
consumerId: consumerId:
'consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d1', 'consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d1',
topic: 'messages', topic: 'messages',

View file

@ -113,7 +113,6 @@ exports[`Details component when consumer gruops are fetched Matches the snapshot
"consumerId": "consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d0", "consumerId": "consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d0",
"currentOffset": 394, "currentOffset": 394,
"endOffset": 394, "endOffset": 394,
"groupId": "messages-consumer",
"host": "/172.31.9.153", "host": "/172.31.9.153",
"messagesBehind": 0, "messagesBehind": 0,
"partition": 6, "partition": 6,
@ -129,7 +128,6 @@ exports[`Details component when consumer gruops are fetched Matches the snapshot
"consumerId": "consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d1", "consumerId": "consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d1",
"currentOffset": 384, "currentOffset": 384,
"endOffset": 384, "endOffset": 384,
"groupId": "messages-consumer",
"host": "/172.31.9.153", "host": "/172.31.9.153",
"messagesBehind": 0, "messagesBehind": 0,
"partition": 7, "partition": 7,

View file

@ -41,8 +41,11 @@ const List: React.FC<Props> = ({ consumerGroups }) => {
<thead> <thead>
<tr> <tr>
<th>Consumer group ID</th> <th>Consumer group ID</th>
<th>Num of consumers</th> <th>Num of members</th>
<th>Num of topics</th> <th>Num of topics</th>
<th>Messages behind</th>
<th>Coordinator</th>
<th>State</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
@ -50,11 +53,11 @@ const List: React.FC<Props> = ({ consumerGroups }) => {
.filter( .filter(
(consumerGroup) => (consumerGroup) =>
!searchText || !searchText ||
consumerGroup?.consumerGroupId?.indexOf(searchText) >= 0 consumerGroup?.groupId?.indexOf(searchText) >= 0
) )
.map((consumerGroup) => ( .map((consumerGroup) => (
<ListItem <ListItem
key={consumerGroup.consumerGroupId} key={consumerGroup.groupId}
consumerGroup={consumerGroup} consumerGroup={consumerGroup}
/> />
))} ))}

View file

@ -1,6 +1,7 @@
import React from 'react'; import React from 'react';
import { useHistory } from 'react-router-dom'; import { useHistory } from 'react-router-dom';
import { ConsumerGroup } from 'generated-sources'; import { ConsumerGroup } from 'generated-sources';
import ConsumerGroupStateTag from 'components/common/ConsumerGroupState/ConsumerGroupStateTag';
const ListItem: React.FC<{ consumerGroup: ConsumerGroup }> = ({ const ListItem: React.FC<{ consumerGroup: ConsumerGroup }> = ({
consumerGroup, consumerGroup,
@ -8,14 +9,19 @@ const ListItem: React.FC<{ consumerGroup: ConsumerGroup }> = ({
const history = useHistory(); const history = useHistory();
function goToConsumerGroupDetails() { function goToConsumerGroupDetails() {
history.push(`consumer-groups/${consumerGroup.consumerGroupId}`); history.push(`consumer-groups/${consumerGroup.groupId}`);
} }
return ( return (
<tr className="is-clickable" onClick={goToConsumerGroupDetails}> <tr className="is-clickable" onClick={goToConsumerGroupDetails}>
<td>{consumerGroup.consumerGroupId}</td> <td>{consumerGroup.groupId}</td>
<td>{consumerGroup.numConsumers}</td> <td>{consumerGroup.members}</td>
<td>{consumerGroup.numTopics}</td> <td>{consumerGroup.topics}</td>
<td>{consumerGroup.messagesBehind}</td>
<td>{consumerGroup.coordinator?.id}</td>
<td>
<ConsumerGroupStateTag state={consumerGroup.state} />
</td>
</tr> </tr>
); );
}; };

View file

@ -1,15 +1,13 @@
import React from 'react'; import React from 'react';
import { import { Topic, TopicDetails, ConsumerGroup } from 'generated-sources';
Topic,
TopicDetails,
ConsumerTopicPartitionDetail,
} from 'generated-sources';
import { ClusterName, TopicName } from 'redux/interfaces'; import { ClusterName, TopicName } from 'redux/interfaces';
import ConsumerGroupStateTag from 'components/common/ConsumerGroupState/ConsumerGroupStateTag';
import { useHistory } from 'react-router';
interface Props extends Topic, TopicDetails { interface Props extends Topic, TopicDetails {
clusterName: ClusterName; clusterName: ClusterName;
topicName: TopicName; topicName: TopicName;
consumerGroups: ConsumerTopicPartitionDetail[]; consumerGroups: ConsumerGroup[];
fetchTopicConsumerGroups( fetchTopicConsumerGroups(
clusterName: ClusterName, clusterName: ClusterName,
topicName: TopicName topicName: TopicName
@ -26,31 +24,38 @@ const TopicConsumerGroups: React.FC<Props> = ({
fetchTopicConsumerGroups(clusterName, topicName); fetchTopicConsumerGroups(clusterName, topicName);
}, []); }, []);
const history = useHistory();
function goToConsumerGroupDetails(consumer: ConsumerGroup) {
history.push(`consumer-groups/${consumer.groupId}`);
}
return ( return (
<div className="box"> <div className="box">
{consumerGroups.length > 0 ? ( {consumerGroups.length > 0 ? (
<table className="table is-striped is-fullwidth"> <table className="table is-striped is-fullwidth">
<thead> <thead>
<tr> <tr>
<th>Group ID</th> <th>Consumer group ID</th>
<th>Consumer ID</th> <th>Num of members</th>
<th>Host</th>
<th>Partition</th>
<th>Messages behind</th> <th>Messages behind</th>
<th>Current offset</th> <th>Coordinator</th>
<th>End offset</th> <th>State</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
{consumerGroups.map((consumer) => ( {consumerGroups.map((consumer) => (
<tr key={consumer.consumerId}> <tr
key={consumer.groupId}
className="is-clickable"
onClick={() => goToConsumerGroupDetails(consumer)}
>
<td>{consumer.groupId}</td> <td>{consumer.groupId}</td>
<td>{consumer.consumerId}</td> <td>{consumer.members}</td>
<td>{consumer.host}</td>
<td>{consumer.partition}</td>
<td>{consumer.messagesBehind}</td> <td>{consumer.messagesBehind}</td>
<td>{consumer.currentOffset}</td> <td>{consumer.coordinator?.id}</td>
<td>{consumer.endOffset}</td> <td>
<ConsumerGroupStateTag state={consumer.state} />
</td>
</tr> </tr>
))} ))}
</tbody> </tbody>

View file

@ -1,6 +1,7 @@
import React from 'react'; import React from 'react';
import { shallow } from 'enzyme'; import { shallow } from 'enzyme';
import ConsumerGroups from 'components/Topics/Topic/Details/ConsumerGroups/TopicConsumerGroups'; import ConsumerGroups from 'components/Topics/Topic/Details/ConsumerGroups/TopicConsumerGroups';
import { ConsumerGroupState } from 'generated-sources';
describe('Details', () => { describe('Details', () => {
const mockFn = jest.fn(); const mockFn = jest.fn();
@ -8,26 +9,24 @@ describe('Details', () => {
const mockTopicName = 'local'; const mockTopicName = 'local';
const mockWithConsumerGroup = [ const mockWithConsumerGroup = [
{ {
groupId: 'messages-consumer', groupId: 'amazon.msk.canary.group.broker-7',
consumerId: topics: 0,
'consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d0', members: 0,
topic: 'messages', simple: false,
host: '/172.31.9.153', partitionAssignor: '',
partition: 6, state: ConsumerGroupState.UNKNOWN,
currentOffset: 394, coordinator: { id: 1 },
endOffset: 394, messagesBehind: 9,
messagesBehind: 0,
}, },
{ {
groupId: 'messages-consumer', groupId: 'amazon.msk.canary.group.broker-4',
consumerId: topics: 0,
'consumer-messages-consumer-1-122fbf98-643b-491d-8aec-c0641d2513d0', members: 0,
topic: 'messages', simple: false,
host: '/172.31.9.153', partitionAssignor: '',
partition: 7, state: ConsumerGroupState.COMPLETING_REBALANCE,
currentOffset: 384, coordinator: { id: 1 },
endOffset: 384, messagesBehind: 9,
messagesBehind: 0,
}, },
]; ];

View file

@ -0,0 +1,40 @@
import { ConsumerGroupState } from 'generated-sources';
import React from 'react';
interface Props {
state?: ConsumerGroupState;
}
const ConsumerGroupStateTag: React.FC<Props> = ({ state }) => {
let classes: string;
switch (state) {
case ConsumerGroupState.DEAD:
classes = 'is-danger';
break;
case ConsumerGroupState.EMPTY:
classes = 'is-info';
break;
case ConsumerGroupState.PREPARING_REBALANCE:
classes = 'is-warning';
break;
case ConsumerGroupState.COMPLETING_REBALANCE:
classes = 'is-success';
break;
case ConsumerGroupState.STABLE:
classes = 'is-primary';
break;
case ConsumerGroupState.UNKNOWN:
classes = 'is-light';
break;
default:
classes = 'is-danger';
}
if (!state) {
return <span className="is-tag is-light">Unknown</span>;
}
return <span className={`tag ${classes}`}>{state}</span>;
};
export default ConsumerGroupStateTag;

View file

@ -98,7 +98,7 @@ describe('Thunks', () => {
describe('fetchTopicConsumerGroups', () => { describe('fetchTopicConsumerGroups', () => {
it('GET_TOPIC_CONSUMER_GROUPS__FAILURE', async () => { it('GET_TOPIC_CONSUMER_GROUPS__FAILURE', async () => {
fetchMock.getOnce( fetchMock.getOnce(
`api/clusters/${clusterName}/topics/${topicName}/consumergroups`, `api/clusters/${clusterName}/topics/${topicName}/consumer-groups`,
404 404
); );
try { try {
@ -116,7 +116,7 @@ describe('Thunks', () => {
it('GET_TOPIC_CONSUMER_GROUPS__SUCCESS', async () => { it('GET_TOPIC_CONSUMER_GROUPS__SUCCESS', async () => {
fetchMock.getOnce( fetchMock.getOnce(
`api/clusters/${clusterName}/topics/${topicName}/consumergroups`, `api/clusters/${clusterName}/topics/${topicName}/consumer-groups`,
200 200
); );
try { try {

View file

@ -1,10 +1,8 @@
import { ConsumerGroup, ConsumerGroupDetails } from 'generated-sources'; import { ConsumerGroup, ConsumerGroupDetails } from 'generated-sources';
export type ConsumerGroupID = ConsumerGroup['consumerGroupId']; export type ConsumerGroupID = ConsumerGroup['groupId'];
export interface ConsumerGroupDetailedInfo export type ConsumerGroupDetailedInfo = ConsumerGroupDetails;
extends ConsumerGroup,
ConsumerGroupDetails {}
export interface ConsumerGroupsState { export interface ConsumerGroupsState {
byID: { [consumerGroupID: string]: ConsumerGroupDetailedInfo }; byID: { [consumerGroupID: string]: ConsumerGroupDetailedInfo };

View file

@ -7,7 +7,6 @@ import {
GetTopicMessagesRequest, GetTopicMessagesRequest,
ConsumerGroup, ConsumerGroup,
TopicColumnsToSort, TopicColumnsToSort,
TopicConsumerGroups,
} from 'generated-sources'; } from 'generated-sources';
export type TopicName = Topic['name']; export type TopicName = Topic['name'];
@ -42,7 +41,7 @@ export interface TopicFormCustomParams {
export interface TopicWithDetailedInfo extends Topic, TopicDetails { export interface TopicWithDetailedInfo extends Topic, TopicDetails {
config?: TopicConfig[]; config?: TopicConfig[];
consumerGroups?: TopicConsumerGroups; consumerGroups?: ConsumerGroup[];
} }
export interface TopicsState { export interface TopicsState {

View file

@ -5,8 +5,7 @@ import * as actions from 'redux/actions';
const state: ConsumerGroupsState = { const state: ConsumerGroupsState = {
byID: { byID: {
test: { test: {
clusterId: 'local', groupId: 'test',
consumerGroupId: 'test',
}, },
}, },
allIDs: ['test'], allIDs: ['test'],

View file

@ -22,12 +22,12 @@ const updateConsumerGroupsList = (
...memo, ...memo,
byID: { byID: {
...memo.byID, ...memo.byID,
[consumerGroup.consumerGroupId]: { [consumerGroup.groupId]: {
...memo.byID[consumerGroup.consumerGroupId], ...memo.byID[consumerGroup.groupId],
...consumerGroup, ...consumerGroup,
}, },
}, },
allIDs: [...memo.allIDs, consumerGroup.consumerGroupId], allIDs: [...memo.allIDs, consumerGroup.groupId],
}), }),
initialMemo initialMemo
); );

View file

@ -140,5 +140,5 @@ export const getIsTopicInternal = createSelector(
export const getTopicConsumerGroups = createSelector( export const getTopicConsumerGroups = createSelector(
getTopicMap, getTopicMap,
getTopicName, getTopicName,
(topics, topicName) => topics[topicName].consumerGroups?.consumers || [] (topics, topicName) => topics[topicName].consumerGroups || []
); );