|
@@ -77,26 +77,26 @@ public class ClusterService {
|
|
|
|
|
|
@SneakyThrows
|
|
|
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
|
|
|
- var cluster = clustersStorage.getClusterByName(clusterName);
|
|
|
- return toMono(cluster.getAdminClient().listConsumerGroups().all())
|
|
|
- .map(s -> cluster.getAdminClient().describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())
|
|
|
- .flatMap(s -> {
|
|
|
- return toMono(s).map(c -> {
|
|
|
- ArrayList<ConsumerGroup> result = new ArrayList<>();
|
|
|
- c.values().forEach(c1 -> {
|
|
|
- ConsumerGroup consumerGroup = new ConsumerGroup();
|
|
|
- consumerGroup.setClusterId(cluster.getCluster().getId());
|
|
|
- consumerGroup.setConsumerGroupId(c1.groupId());
|
|
|
- consumerGroup.setNumConsumers(c1.members().size());
|
|
|
- Set<String> topics = new HashSet<>();
|
|
|
- c1.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
|
|
|
- consumerGroup.setNumTopics(topics.size());
|
|
|
- result.add(consumerGroup);
|
|
|
- });
|
|
|
- return result;
|
|
|
- });
|
|
|
- }).map(s -> {
|
|
|
- return ResponseEntity.ok(Flux.fromIterable(s));
|
|
|
- });
|
|
|
+ var cluster = clustersStorage.getClusterByName(clusterName);
|
|
|
+ return toMono(cluster.getAdminClient().listConsumerGroups().all())
|
|
|
+ .map(s -> cluster.getAdminClient().describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())
|
|
|
+ .flatMap(s -> {
|
|
|
+ return toMono(s).map(c -> {
|
|
|
+ ArrayList<ConsumerGroup> result = new ArrayList<>();
|
|
|
+ c.values().forEach(c1 -> {
|
|
|
+ ConsumerGroup consumerGroup = new ConsumerGroup();
|
|
|
+ consumerGroup.setClusterId(cluster.getCluster().getId());
|
|
|
+ consumerGroup.setConsumerGroupId(c1.groupId());
|
|
|
+ consumerGroup.setNumConsumers(c1.members().size());
|
|
|
+ Set<String> topics = new HashSet<>();
|
|
|
+ c1.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
|
|
|
+ consumerGroup.setNumTopics(topics.size());
|
|
|
+ result.add(consumerGroup);
|
|
|
+ });
|
|
|
+ return result;
|
|
|
+ });
|
|
|
+ }).map(s -> {
|
|
|
+ return ResponseEntity.ok(Flux.fromIterable(s));
|
|
|
+ });
|
|
|
}
|
|
|
}
|