Browse Source

consumer group details api done

Roman Nedzvetskiy 5 năm trước cách đây
mục cha
commit
28853ef6af

+ 5 - 0
kafka-ui-api/pom.xml

@@ -49,6 +49,11 @@
             <artifactId>kafka-clients</artifactId>
             <version>${kafka-clients.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.13</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.101tec</groupId>
             <artifactId>zkclient</artifactId>

+ 31 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -8,11 +8,13 @@ import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -62,19 +64,36 @@ public class ClusterService {
         return kafkaService.createTopic(cluster, topicFormData);
     }
 
-    public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroupDetail(String topicId, String clusterName) {
+    public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroupDetail(String clusterName, String consumerGroupId) {
         KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
-        var partition = cluster.getTopicDetailsMap().get(topicId).getPartitionCount();
-//        ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
-//                .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
-//                        .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
-        cluster.getAdminClient().describeConsumerGroups().all().get().get("").members()
-                .forEach(s -> {
-                    s.assignment().topicPartitions().forEach(t -> t.partition()); //partition
-                    s.assignment().topicPartitions().forEach(t -> t.topic());//topic
-                });
-        cluster.getAdminClient().describeTopics().all().get().get("").partitions().get(0).;
-        cluster.getAdminClient().listConsumerGroupOffsets("").partitionsToOffsetAndMetadata().get().get("").offset();
+        ConsumerGroupDetails result = new ConsumerGroupDetails();
+        result.setConsumerGroupId(consumerGroupId);
+        result.setConsumers(new ArrayList<>());
+        return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
+                .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
+                        .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
+                .map(s -> {
+                        s.get(consumerGroupId).members().forEach(s1 -> {
+                            ConsumerDetail partlyResult = ClusterUtil.partlyConvertToConsumerDetail(s1, consumerGroupId, cluster);
+                            result.getConsumers().add(partlyResult);
+                    });
+                    return result;
+                })
+                .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata())
+                        .map(o -> {
+                            s.getConsumers().forEach(c -> {
+                                List<Long> currentOffsets = new ArrayList<>();
+                                List<Long> behindMessagesList = new ArrayList<>();
+                                for (int i = 0; i < c.getTopic().size(); i++) {
+                                    Long currentOffset = o.get(new TopicPartition(c.getTopic().get(i), c.getPartition().get(i))).offset();
+                                    currentOffsets.add(currentOffset);
+                                    behindMessagesList.add(c.getEndOffset().get(i) - currentOffset);
+                                }
+                                c.setCurrentOffset(currentOffsets);
+                                c.setMessagesBehind(behindMessagesList);
+                            });
+                            return ResponseEntity.ok(s);
+                        }));
     }
 
     @SneakyThrows

+ 30 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -1,13 +1,19 @@
 package com.provectus.kafka.ui.cluster.util;
 
 import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.model.ConsumerDetail;
 import com.provectus.kafka.ui.model.ConsumerGroup;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import reactor.core.publisher.Mono;
 
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
 
 public class ClusterUtil {
 
@@ -31,4 +37,26 @@ public class ClusterUtil {
         consumerGroup.setNumTopics(topics.size());
         return consumerGroup;
     }
+
+    public static ConsumerDetail partlyConvertToConsumerDetail(MemberDescription s1, String consumerGroupId, KafkaCluster cluster) {
+        ConsumerDetail partlyResult = new ConsumerDetail();
+        partlyResult.setConsumerId(s1.consumerId());
+        partlyResult.setPartition((s1.assignment().topicPartitions().stream().map(TopicPartition::partition).collect(Collectors.toList())));
+        partlyResult.setTopic((s1.assignment().topicPartitions().stream().map(TopicPartition::topic).collect(Collectors.toList())));
+        partlyResult.setEndOffset(new ArrayList(getEndOffsets(s1.assignment().topicPartitions(), consumerGroupId, cluster.getBootstrapServers()).values()));
+        return partlyResult;
+    }
+
+    private static Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition> topicPartition, String groupId, String bootstrapServers) {
+        Map<TopicPartition, Long> result;
+        Properties properties = new Properties();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
+            result = consumer.endOffsets(topicPartition);
+        }
+        return result;
+    }
 }

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -4,7 +4,6 @@ import com.provectus.kafka.ui.api.ApiClustersApi;
 import com.provectus.kafka.ui.cluster.service.ClusterService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -61,7 +60,7 @@ public class MetricsRestController implements ApiClustersApi {
     }
 
     @Override
-    public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroupDetail(String consumerGroupId, ServerWebExchange exchange) {
-        return Mono.just(ResponseEntity.ok(new ConsumerGroupDetails()));
+    public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroupDetail(String clusterName, String consumerGroupId, ServerWebExchange exchange) {
+        return clusterService.getConsumerGroupDetail(clusterName, consumerGroupId);
     }
 }

+ 30 - 15
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -169,13 +169,18 @@ paths:
                 items:
                   $ref: '#/components/schemas/TopicConfig'
 
-  /api/clusters/consumer-groups/{id}:
+  /api/clusters/{clusterName}/consumer-groups/{id}:
     get:
       tags:
         - /api/clusters
       summary: getConsumerGroupDetail
       operationId: getConsumerGroupDetail
       parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
         - name: id
           in: path
           required: true
@@ -351,33 +356,43 @@ components:
         id:
           type: string
 
-    ConsumerDetail:
+    ConsumerGroup:
       type: object
       properties:
-        consumerId:
-          type: string
-        topic:
+        clusterId:
           type: string
-        partition:
+        consumerGroupId:
           type: string
+        numConsumers:
+          type: integer
+        numTopics:
+          type: integer
 
-    ConsumerGroup:
+    ConsumerDetail:
       type: object
       properties:
-        clusterId:
+        consumerId:
           type: string
-        consumerGroupId:
+        topic:
+          type: array
+          items:
             type: string
-        numConsumers:
-            type: integer
-        numTopics:
+        partition:
+          type: array
+          items:
             type: integer
         messagesBehind:
-          type: integer
+          type: array
+          items:
+            type: long
         currentOffset:
-          type: integer
+          type: array
+          items:
+            type: long
         endOffset:
-          type: integer
+          type: array
+          items:
+            type: long
 
     ConsumerGroupDetails:
       type: object

+ 1 - 0
pom.xml

@@ -27,6 +27,7 @@
 		<openapi-generator-maven-plugin.version>4.2.2</openapi-generator-maven-plugin.version>
 		<swagger-annotations.version>1.6.0</swagger-annotations.version>
 		<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
+		<kafka.version>2.4.1</kafka.version>
 	</properties>
 
 	<groupId>com.provectus</groupId>