|
@@ -2,9 +2,12 @@ package com.provectus.kafka.ui.cluster.service;
|
|
|
|
|
|
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
|
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
|
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
|
+import com.provectus.kafka.ui.cluster.util.ClusterUtil;
|
|
import com.provectus.kafka.ui.kafka.KafkaService;
|
|
import com.provectus.kafka.ui.kafka.KafkaService;
|
|
import com.provectus.kafka.ui.model.*;
|
|
import com.provectus.kafka.ui.model.*;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
|
|
+import lombok.SneakyThrows;
|
|
|
|
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
@@ -58,4 +61,15 @@ public class ClusterService {
|
|
if (cluster == null) return null;
|
|
if (cluster == null) return null;
|
|
return kafkaService.createTopic(cluster, topicFormData);
|
|
return kafkaService.createTopic(cluster, topicFormData);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @SneakyThrows
|
|
|
|
+ public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
|
|
|
|
+ var cluster = clustersStorage.getClusterByName(clusterName);
|
|
|
|
+ return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
|
|
|
|
+ .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
|
|
|
|
+ .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
|
|
|
|
+ .map(s -> s.values().stream()
|
|
|
|
+ .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))
|
|
|
|
+ .map(s -> ResponseEntity.ok(Flux.fromIterable(s)));
|
|
|
|
+ }
|
|
}
|
|
}
|