|
@@ -8,22 +8,13 @@ import lombok.RequiredArgsConstructor;
|
|
|
import lombok.SneakyThrows;
|
|
|
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
|
|
import org.apache.kafka.clients.admin.ConsumerGroupListing;
|
|
|
-import org.apache.kafka.clients.admin.KafkaAdminClient;
|
|
|
-import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
|
|
|
-import org.apache.kafka.clients.consumer.Consumer;
|
|
|
-import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
-import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
-import org.apache.kafka.common.TopicPartition;
|
|
|
import org.springframework.http.ResponseEntity;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
-import javax.annotation.PostConstruct;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.ExecutionException;
|
|
|
import java.util.stream.Collectors;
|
|
|
-import java.util.stream.Stream;
|
|
|
|
|
|
@Service
|
|
|
@RequiredArgsConstructor
|
|
@@ -76,7 +67,6 @@ public class ClusterService {
|
|
|
KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
|
|
|
List<ConsumerGroup> consumerGroups = new ArrayList<>();
|
|
|
List<String> stringIds = cluster.getAdminClient().listConsumerGroups().all().get().stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList());
|
|
|
- System.out.println(cluster.getAdminClient().listConsumerGroupOffsets(stringIds.get(0)));
|
|
|
Map<String, ConsumerGroupDescription> consumerGroupDescription = cluster.getAdminClient().describeConsumerGroups(stringIds).all().get();
|
|
|
consumerGroupDescription.entrySet().forEach(s -> {
|
|
|
ConsumerGroup consumerGroup = new ConsumerGroup();
|