|
@@ -70,15 +70,10 @@ public class ClusterService {
|
|
public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroupDetail(String clusterName, String consumerGroupId) {
|
|
public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroupDetail(String clusterName, String consumerGroupId) {
|
|
KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
|
|
KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
|
|
|
|
|
|
- return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
|
|
|
|
- .flatMap(groups ->
|
|
|
|
- ClusterUtil.toMono(
|
|
|
|
- cluster.getAdminClient()
|
|
|
|
- .describeConsumerGroups(groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()))
|
|
|
|
- .all()
|
|
|
|
- )
|
|
|
|
- )
|
|
|
|
- .flatMap(groups ->
|
|
|
|
|
|
+ return ClusterUtil.toMono(
|
|
|
|
+ cluster.getAdminClient()
|
|
|
|
+ .describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
|
|
|
|
+ ).flatMap(groups ->
|
|
groupMetadata(cluster, consumerGroupId).map(
|
|
groupMetadata(cluster, consumerGroupId).map(
|
|
offsets -> {
|
|
offsets -> {
|
|
Map<TopicPartition, Long> endOffsets = topicPartitionsEndOffsets(cluster, offsets.keySet());
|
|
Map<TopicPartition, Long> endOffsets = topicPartitionsEndOffsets(cluster, offsets.keySet());
|