浏览代码

changed method to async mono

Roman Nedzvetskiy 5 年之前
父节点
当前提交
556c416432
共有 1 个文件被更改,包括 36 次插入17 次删除
  1. 36 17
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

+ 36 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -6,14 +6,17 @@ import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
-import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.common.KafkaFuture;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 @Service
@@ -62,22 +65,38 @@ public class ClusterService {
         return kafkaService.createTopic(cluster, topicFormData);
     }
 
+    private <T> Mono<T> toMono(KafkaFuture<T> future){
+        return Mono.create(sink-> future.whenComplete((res, ex)->{
+            if(ex!=null) {
+                sink.error(ex);
+            } else {
+                sink.success(res);
+            }
+        }));
+    }
+
     @SneakyThrows
     public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
-        KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
-        List<ConsumerGroup> consumerGroups = new ArrayList<>();
-        List<String> stringIds = cluster.getAdminClient().listConsumerGroups().all().get().stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList());
-        Map<String, ConsumerGroupDescription> consumerGroupDescription = cluster.getAdminClient().describeConsumerGroups(stringIds).all().get();
-        consumerGroupDescription.entrySet().forEach(s -> {
-            ConsumerGroup consumerGroup = new ConsumerGroup();
-            consumerGroup.setClusterId(cluster.getCluster().getId());
-            consumerGroup.setConsumerGroupId(s.getValue().groupId());
-            consumerGroup.setNumConsumers(s.getValue().members().size());
-            Set<String> topics = new HashSet<>();
-            s.getValue().members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
-            consumerGroup.setNumTopics(topics.size());
-            consumerGroups.add(consumerGroup);
-        });
-        return Mono.just(ResponseEntity.ok(Flux.fromIterable(consumerGroups)));
+                    var cluster = clustersStorage.getClusterByName(clusterName);
+                    return toMono(cluster.getAdminClient().listConsumerGroups().all())
+                            .map(s -> cluster.getAdminClient().describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())
+                            .flatMap(s -> {
+                                return toMono(s).map(c -> {
+                                    ArrayList<ConsumerGroup> result = new ArrayList<>();
+                                    c.values().forEach(c1 -> {
+                                    ConsumerGroup consumerGroup = new ConsumerGroup();
+                                    consumerGroup.setClusterId(cluster.getCluster().getId());
+                                    consumerGroup.setConsumerGroupId(c1.groupId());
+                                    consumerGroup.setNumConsumers(c1.members().size());
+                                    Set<String> topics = new HashSet<>();
+                                    c1.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
+                                    consumerGroup.setNumTopics(topics.size());
+                                    result.add(consumerGroup);
+                                });
+                                    return result;
+                                });
+                            }).map(s -> {
+                                return ResponseEntity.ok(Flux.fromIterable(s));
+                            });
     }
 }