Kaynağa Gözat

temp result commit

Roman Nedzvetskiy 5 yıl önce
ebeveyn
işleme
7d8773f633

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -5,6 +5,11 @@ import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.zookeeper.ZooKeeper;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
@@ -58,4 +63,19 @@ public class ClusterService {
         if (cluster == null) return null;
         return kafkaService.createTopic(cluster, topicFormData);
     }
+
+    public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroupDetail(String topicId, String clusterName) {
+        KafkaCluster cluster = clustersStorage.getClusterByName(clusterName);
+        var partition = cluster.getTopicDetailsMap().get(topicId).getPartitionCount();
+//        ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
+//                .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
+//                        .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
+        cluster.getAdminClient().describeConsumerGroups().all().get().get("").members()
+                .forEach(s -> {
+                    s.assignment().topicPartitions().forEach(t -> t.partition()); //partition
+                    s.assignment().topicPartitions().forEach(t -> t.topic());//topic
+                });
+        cluster.getAdminClient().describeTopics().all().get().get("").partitions().get(0).;
+        cluster.getAdminClient().listConsumerGroupOffsets("").partitionsToOffsetAndMetadata().get().get("").offset();
+    }
 }

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -53,4 +53,9 @@ public class MetricsRestController implements ApiClustersApi {
     public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
     }
+
+    @Override
+    public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroupDetail(String consumerGroupId, ServerWebExchange exchange) {
+        return Mono.just(ResponseEntity.ok(new ConsumerGroupDetails()));
+    }
 }

+ 47 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -169,6 +169,26 @@ paths:
                 items:
                   $ref: '#/components/schemas/TopicConfig'
 
+  /api/clusters/consumer-groups/{id}:
+    get:
+      tags:
+        - /api/clusters
+      summary: getConsumerGroupDetail
+      operationId: getConsumerGroupDetail
+      parameters:
+        - name: id
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ConsumerGroupDetails'
+
 components:
   schemas:
     Cluster:
@@ -307,4 +327,30 @@ components:
       type: object
       properties:
         id:
-          type: string
+          type: string
+
+    ConsumerDetail:
+      type: object
+      properties:
+        consumerId:
+          type: string
+        topic:
+          type: string
+        partition:
+          type: string
+        messagesBehind:
+          type: integer
+        currentOffset:
+          type: integer
+        endOffset:
+          type: integer
+
+    ConsumerGroupDetails:
+      type: object
+      properties:
+        consumerGroupId:
+          type: string
+        consumers:
+          type: array
+          items:
+            $ref: '#/components/schemas/ConsumerDetail'