Browse Source

added endpoint for group consumers

Roman Nedzvetskiy 5 năm trước cách đây
mục cha
commit
0915be8c2e

+ 33 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -5,13 +5,25 @@ import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.util.List;
+import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 @Service
 @RequiredArgsConstructor
@@ -58,4 +70,24 @@ public class ClusterService {
         if (cluster == null) return null;
         return kafkaService.createTopic(cluster, topicFormData);
     }
+
+    @SneakyThrows
+    public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
+        KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
+        List<ConsumerGroup> consumerGroups = new ArrayList<>();
+        List<String> stringIds = cluster.getAdminClient().listConsumerGroups().all().get().stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList());
+        System.out.println(cluster.getAdminClient().listConsumerGroupOffsets(stringIds.get(0)));
+        Map<String, ConsumerGroupDescription> consumerGroupDescription = cluster.getAdminClient().describeConsumerGroups(stringIds).all().get();
+        consumerGroupDescription.entrySet().forEach(s -> {
+            ConsumerGroup consumerGroup = new ConsumerGroup();
+            consumerGroup.setClusterId(cluster.getCluster().getId());
+            consumerGroup.setConsumerGroupId(s.getValue().groupId());
+            consumerGroup.setNumConsumers(s.getValue().members().size());
+            Set<String> topics = new HashSet<>();
+            s.getValue().members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
+            consumerGroup.setNumTopics(topics.size());
+            consumerGroups.add(consumerGroup);
+        });
+        return Mono.just(ResponseEntity.ok(Flux.fromIterable(consumerGroups)));
+    }
 }

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -4,6 +4,7 @@ import com.provectus.kafka.ui.api.ApiClustersApi;
 import com.provectus.kafka.ui.cluster.service.ClusterService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -53,4 +54,9 @@ public class MetricsRestController implements ApiClustersApi {
     public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
     }
+
+    @Override
+    public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
+        return clusterService.getConsumerGroup(clusterName);
+    }
 }

+ 35 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -169,6 +169,28 @@ paths:
                 items:
                   $ref: '#/components/schemas/TopicConfig'
 
+  /api/clusters/{clusterName}/consumerGroups:
+    get:
+      tags:
+        - /api/clusters
+      summary: getConsumerGroup
+      operationId: getConsumerGroup
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/ConsumerGroup'
+
 components:
   schemas:
     Cluster:
@@ -307,4 +329,16 @@ components:
       type: object
       properties:
         id:
-          type: string
+          type: string
+
+    ConsumerGroup:
+      type: object
+      properties:
+        clusterId:
+          type: string
+        consumerGroupId:
+            type: string
+        numConsumers:
+            type: integer
+        numTopics:
+            type: integer