ISSUE-121 Topic level consumer groups & consumerr group details (#360)

This commit is contained in:
German Osin 2021-04-08 11:49:44 +03:00 committed by GitHub
parent 9d75dbdacd
commit 7bfae45162
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 225 additions and 54 deletions

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.ConsumerGroupsApi; import com.provectus.kafka.ui.api.ConsumerGroupsApi;
import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ConsumerGroupDetails; import com.provectus.kafka.ui.model.ConsumerGroupDetails;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.service.ClusterService; import com.provectus.kafka.ui.service.ClusterService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -34,4 +35,11 @@ public class ConsumerGroupsController implements ConsumerGroupsApi {
.map(ResponseEntity::ok) .map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
} }
@Override
public Mono<ResponseEntity<TopicConsumerGroups>> getTopicConsumerGroups(
String clusterName, String topicName, ServerWebExchange exchange) {
return clusterService.getTopicConsumerGroupDetail(clusterName, topicName)
.map(ResponseEntity::ok);
}
} }

View file

@ -15,33 +15,28 @@ import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Topic; import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicConfig; import com.provectus.kafka.ui.model.TopicConfig;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicCreation;
import com.provectus.kafka.ui.model.TopicDetails; import com.provectus.kafka.ui.model.TopicDetails;
import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.model.TopicMessage;
import com.provectus.kafka.ui.model.TopicUpdate; import com.provectus.kafka.ui.model.TopicUpdate;
import com.provectus.kafka.ui.model.TopicsResponse; import com.provectus.kafka.ui.model.TopicsResponse;
import com.provectus.kafka.ui.util.ClusterUtil; import com.provectus.kafka.ui.util.ClusterUtil;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@ -142,42 +137,29 @@ public class ClusterService {
return kafkaService.getOrCreateAdminClient(cluster).map(ac -> return kafkaService.getOrCreateAdminClient(cluster).map(ac ->
ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all() ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
).flatMap(groups -> ).flatMap(groups ->
groupMetadata(cluster, consumerGroupId) kafkaService.groupMetadata(cluster, consumerGroupId)
.flatMap(offsets -> { .flatMap(offsets -> {
Map<TopicPartition, Long> endOffsets = Map<TopicPartition, Long> endOffsets =
topicPartitionsEndOffsets(cluster, offsets.keySet()); kafkaService.topicPartitionsEndOffsets(cluster, offsets.keySet());
return ClusterUtil.toMono(groups).map(s -> s.get(consumerGroupId).members().stream() return ClusterUtil.toMono(groups).map(s ->
.flatMap(c -> Stream.of(ClusterUtil Tuples.of(
.convertToConsumerTopicPartitionDetails(c, offsets, endOffsets))) s.get(consumerGroupId),
.collect(Collectors.toList()).stream() s.get(consumerGroupId).members().stream()
.flatMap(t -> t.stream().flatMap(Stream::of)).collect(Collectors.toList())); .flatMap(c ->
}) Stream.of(
) ClusterUtil.convertToConsumerTopicPartitionDetails(
.map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId)); c, offsets, endOffsets, consumerGroupId
)
} )
)
public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster, .collect(Collectors.toList()).stream()
String consumerGroupId) { .flatMap(t ->
return t.stream().flatMap(Stream::of)
kafkaService.getOrCreateAdminClient(cluster) ).collect(Collectors.toList())
.map(ac -> ac.getAdminClient().listConsumerGroupOffsets(consumerGroupId) )
.partitionsToOffsetAndMetadata()) );
.flatMap(ClusterUtil::toMono); }).map(c -> ClusterUtil.convertToConsumerGroupDetails(c.getT1(), c.getT2()))
} );
public Map<TopicPartition, Long> topicPartitionsEndOffsets(
KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
Properties properties = new Properties();
properties.putAll(cluster.getProperties());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
return consumer.endOffsets(topicPartitions);
}
} }
public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) { public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
@ -186,6 +168,13 @@ public class ClusterService {
.flatMap(kafkaService::getConsumerGroups); .flatMap(kafkaService::getConsumerGroups);
} }
public Mono<TopicConsumerGroups> getTopicConsumerGroupDetail(
String clusterName, String topicName) {
return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
.switchIfEmpty(Mono.error(ClusterNotFoundException::new))
.flatMap(c -> kafkaService.getTopicConsumerGroups(c, topicName));
}
public Flux<Broker> getBrokers(String clusterName) { public Flux<Broker> getBrokers(String clusterName) {
return kafkaService return kafkaService
.getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow()) .getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
@ -251,4 +240,6 @@ public class ClusterService {
return consumingService.offsetsForDeletion(cluster, topicName, partitions) return consumingService.offsetsForDeletion(cluster, topicName, partitions)
.flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets)); .flatMap(offsets -> kafkaService.deleteTopicMessages(cluster, offsets));
} }
} }

View file

@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Metric; import com.provectus.kafka.ui.model.Metric;
import com.provectus.kafka.ui.model.ServerStatus; import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.TopicConsumerGroups;
import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicCreation;
import com.provectus.kafka.ui.model.TopicUpdate; import com.provectus.kafka.ui.model.TopicUpdate;
import com.provectus.kafka.ui.util.ClusterUtil; import com.provectus.kafka.ui.util.ClusterUtil;
@ -38,12 +39,14 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource;
@ -296,15 +299,71 @@ public class KafkaService {
); );
} }
public Mono<Collection<ConsumerGroupDescription>> getConsumerGroupsInternal(
KafkaCluster cluster) {
return getOrCreateAdminClient(cluster).flatMap(ac ->
ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
.flatMap(s ->
ClusterUtil.toMono(
ac.getAdminClient().describeConsumerGroups(
s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())
).all()
).map(Map::values)
)
);
}
public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) { public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
return getOrCreateAdminClient(cluster) return getConsumerGroupsInternal(cluster)
.flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()) .map(c -> c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList()));
.flatMap(s -> ClusterUtil.toMono(ac.getAdminClient() }
.describeConsumerGroups(
s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())) public Mono<TopicConsumerGroups> getTopicConsumerGroups(KafkaCluster cluster, String topic) {
.all())) final Map<TopicPartition, Long> endOffsets = topicEndOffsets(cluster, topic);
.map(s -> s.values().stream()
.map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList()))); return getConsumerGroupsInternal(cluster)
.flatMapIterable(c ->
c.stream()
.map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic))
.filter(Optional::isPresent)
.map(Optional::get)
.map(d ->
groupMetadata(cluster, d.groupId())
.flatMapIterable(meta ->
d.members().stream().flatMap(m ->
ClusterUtil.convertToConsumerTopicPartitionDetails(
m, meta, endOffsets, d.groupId()
).stream()
).collect(Collectors.toList())
)
).collect(Collectors.toList())
).flatMap(f -> f).collectList().map(l -> new TopicConsumerGroups().consumers(l));
}
public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
String consumerGroupId) {
return getOrCreateAdminClient(cluster).map(ac ->
ac.getAdminClient()
.listConsumerGroupOffsets(consumerGroupId)
.partitionsToOffsetAndMetadata()
).flatMap(ClusterUtil::toMono);
}
public Map<TopicPartition, Long> topicEndOffsets(
KafkaCluster cluster, String topic) {
try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
final List<TopicPartition> topicPartitions = consumer.partitionsFor(topic).stream()
.map(i -> new TopicPartition(i.topic(), i.partition()))
.collect(Collectors.toList());
return consumer.endOffsets(topicPartitions);
}
}
public Map<TopicPartition, Long> topicPartitionsEndOffsets(
KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
return consumer.endOffsets(topicPartitions);
}
} }
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) { public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
@ -571,4 +630,6 @@ public class KafkaService {
return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient) return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient)
.map(ac -> ac.deleteRecords(records)).then(); .map(ac -> ac.deleteRecords(records)).then();
} }
} }

View file

@ -5,6 +5,7 @@ import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_
import com.provectus.kafka.ui.deserialization.RecordDeserializer; import com.provectus.kafka.ui.deserialization.RecordDeserializer;
import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ConsumerGroupDetails;
import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail; import com.provectus.kafka.ui.model.ConsumerTopicPartitionDetail;
import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.ExtendedAdminClient;
import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalPartition;
@ -30,6 +31,7 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -77,20 +79,40 @@ public class ClusterUtil {
.flatMap(m -> m.assignment().topicPartitions().stream().flatMap(t -> Stream.of(t.topic()))) .flatMap(m -> m.assignment().topicPartitions().stream().flatMap(t -> Stream.of(t.topic())))
.collect(Collectors.toSet()).size(); .collect(Collectors.toSet()).size();
consumerGroup.setNumTopics(numTopics); consumerGroup.setNumTopics(numTopics);
consumerGroup.setSimple(c.isSimpleConsumerGroup());
Optional.ofNullable(c.state())
.ifPresent(s -> consumerGroup.setState(s.name()));
Optional.ofNullable(c.coordinator())
.ifPresent(coord -> consumerGroup.setCoordintor(coord.host()));
consumerGroup.setPartitionAssignor(c.partitionAssignor());
return consumerGroup; return consumerGroup;
} }
public static ConsumerGroupDetails convertToConsumerGroupDetails(
ConsumerGroupDescription desc, List<ConsumerTopicPartitionDetail> consumers
) {
return new ConsumerGroupDetails()
.consumers(consumers)
.consumerGroupId(desc.groupId())
.simple(desc.isSimpleConsumerGroup())
.coordintor(Optional.ofNullable(desc.coordinator()).map(Node::host).orElse(""))
.state(Optional.ofNullable(desc.state()).map(Enum::name).orElse(""))
.partitionAssignor(desc.partitionAssignor());
}
public static List<ConsumerTopicPartitionDetail> convertToConsumerTopicPartitionDetails( public static List<ConsumerTopicPartitionDetail> convertToConsumerTopicPartitionDetails(
MemberDescription consumer, MemberDescription consumer,
Map<TopicPartition, OffsetAndMetadata> groupOffsets, Map<TopicPartition, OffsetAndMetadata> groupOffsets,
Map<TopicPartition, Long> endOffsets Map<TopicPartition, Long> endOffsets,
String groupId
) { ) {
return consumer.assignment().topicPartitions().stream() return consumer.assignment().topicPartitions().stream()
.map(tp -> { .map(tp -> {
Long currentOffset = Optional.ofNullable( long currentOffset = Optional.ofNullable(groupOffsets.get(tp))
groupOffsets.get(tp)).map(o -> o.offset()).orElse(0L); .map(OffsetAndMetadata::offset).orElse(0L);
Long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L); long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L);
ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail(); ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail();
cd.setGroupId(groupId);
cd.setConsumerId(consumer.consumerId()); cd.setConsumerId(consumer.consumerId());
cd.setHost(consumer.host()); cd.setHost(consumer.host());
cd.setTopic(tp.topic()); cd.setTopic(tp.topic());
@ -250,4 +272,42 @@ public class ClusterUtil {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow(); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
} }
public static Optional<ConsumerGroupDescription> filterConsumerGroupTopic(
ConsumerGroupDescription description, String topic) {
final List<MemberDescription> members = description.members().stream()
.map(m -> filterConsumerMemberTopic(m, topic))
.filter(m -> !m.assignment().topicPartitions().isEmpty())
.collect(Collectors.toList());
if (!members.isEmpty()) {
return Optional.of(
new ConsumerGroupDescription(
description.groupId(),
description.isSimpleConsumerGroup(),
members,
description.partitionAssignor(),
description.state(),
description.coordinator()
)
);
} else {
return Optional.empty();
}
}
public static MemberDescription filterConsumerMemberTopic(
MemberDescription description, String topic) {
final Set<TopicPartition> topicPartitions = description.assignment().topicPartitions()
.stream().filter(tp -> tp.topic().equals(topic))
.collect(Collectors.toSet());
MemberAssignment assignment = new MemberAssignment(topicPartitions);
return new MemberDescription(
description.consumerId(),
description.groupInstanceId(),
description.clientId(),
description.host(),
assignment
);
}
} }

View file

@ -346,6 +346,31 @@ paths:
404: 404:
description: Not found description: Not found
/api/clusters/{clusterName}/topics/{topicName}/consumergroups:
get:
tags:
- Consumer Groups
summary: get Consumer Groups By Topics
operationId: getTopicConsumerGroups
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: topicName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/TopicConsumerGroups'
/api/clusters/{clusterName}/consumer-groups/{id}: /api/clusters/{clusterName}/consumer-groups/{id}:
get: get:
tags: tags:
@ -1330,6 +1355,14 @@ components:
type: integer type: integer
numTopics: numTopics:
type: integer type: integer
simple:
type: boolean
partitionAssignor:
type: string
state:
type: string
coordintor:
type: string
required: required:
- clusterId - clusterId
- consumerGroupId - consumerGroupId
@ -1397,6 +1430,8 @@ components:
ConsumerTopicPartitionDetail: ConsumerTopicPartitionDetail:
type: object type: object
properties: properties:
groupId:
type: string
consumerId: consumerId:
type: string type: string
topic: topic:
@ -1416,12 +1451,28 @@ components:
format: int64 format: int64
required: required:
- consumerId - consumerId
TopicConsumerGroups:
type: object
properties:
consumers:
type: array
items:
$ref: '#/components/schemas/ConsumerTopicPartitionDetail'
ConsumerGroupDetails: ConsumerGroupDetails:
type: object type: object
properties: properties:
consumerGroupId: consumerGroupId:
type: string type: string
simple:
type: boolean
partitionAssignor:
type: string
state:
type: string
coordintor:
type: string
consumers: consumers:
type: array type: array
items: items: