Browse Source

changes after review

Roman Nedzvetskiy 5 năm trước cách đây
mục cha
commit
6da3ea77a9

+ 9 - 30
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -2,21 +2,19 @@ package com.provectus.kafka.ui.cluster.service;
 
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
-import org.apache.kafka.common.KafkaFuture;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 @Service
@@ -65,38 +63,19 @@ public class ClusterService {
         return kafkaService.createTopic(cluster, topicFormData);
     }
 
-    private <T> Mono<T> toMono(KafkaFuture<T> future){
-        return Mono.create(sink-> future.whenComplete((res, ex)->{
-            if(ex!=null) {
-                sink.error(ex);
-            } else {
-                sink.success(res);
-            }
-        }));
-    }
-
     @SneakyThrows
     public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
             var cluster = clustersStorage.getClusterByName(clusterName);
-            return toMono(cluster.getAdminClient().listConsumerGroups().all())
-                    .map(s -> cluster.getAdminClient().describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())
-                    .flatMap(s -> {
-                        return toMono(s).map(c -> {
+            return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
+                    .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
+                            .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
+                    .map(s -> {
                             ArrayList<ConsumerGroup> result = new ArrayList<>();
-                            c.values().forEach(c1 -> {
-                            ConsumerGroup consumerGroup = new ConsumerGroup();
-                            consumerGroup.setClusterId(cluster.getCluster().getId());
-                            consumerGroup.setConsumerGroupId(c1.groupId());
-                            consumerGroup.setNumConsumers(c1.members().size());
-                            Set<String> topics = new HashSet<>();
-                            c1.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
-                            consumerGroup.setNumTopics(topics.size());
+                            s.values().forEach(c -> {
+                            ConsumerGroup consumerGroup = ClusterUtil.convertToConsumerGroup(c, cluster);
                             result.add(consumerGroup);
-                        });
+                            });
                             return result;
-                        });
-                    }).map(s -> {
-                        return ResponseEntity.ok(Flux.fromIterable(s));
-                    });
+                        }).map(s -> ResponseEntity.ok(Flux.fromIterable(s)));
     }
 }

+ 34 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -0,0 +1,34 @@
+package com.provectus.kafka.ui.cluster.util;
+
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.model.ConsumerGroup;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.common.KafkaFuture;
+import reactor.core.publisher.Mono;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ClusterUtil {
+
+    public static <T> Mono<T> toMono(KafkaFuture<T> future){
+        return Mono.create(sink -> future.whenComplete((res, ex)->{
+            if (ex!=null) {
+                sink.error(ex);
+            } else {
+                sink.success(res);
+            }
+        }));
+    }
+
+    public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
+        ConsumerGroup consumerGroup = new ConsumerGroup();
+        consumerGroup.setClusterId(cluster.getCluster().getId());
+        consumerGroup.setConsumerGroupId(c.groupId());
+        consumerGroup.setNumConsumers(c.members().size());
+        Set<String> topics = new HashSet<>();
+        c.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
+        consumerGroup.setNumTopics(topics.size());
+        return consumerGroup;
+    }
+}