Roman Nedzvetskiy 5 年 前
コミット
211aa195d8

+ 8 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -72,7 +72,8 @@ public class ClusterService {
                 .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
                         .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
                 .map(s -> {
-                    var consumerDetails = s.get(consumerGroupId).members().stream().map(s1 -> ClusterUtil.partlyConvertToConsumerDetail(s1, consumerGroupId, cluster)).collect(Collectors.toList());
+                    var consumerDetails = s.get(consumerGroupId).members().stream()
+                            .map(s1 -> ClusterUtil.partlyConvertToConsumerDetail(s1, consumerGroupId, cluster)).collect(Collectors.toList());
                     var result = new ConsumerGroupDetails();
                     result.setConsumers(consumerDetails);
                     result.setConsumerGroupId(consumerGroupId);
@@ -80,7 +81,7 @@ public class ClusterService {
                 })
                 .flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata())
                         .map(o -> {
-                            s.getConsumers().forEach(c -> fillOffsetParams(c, o));
+                            s.getConsumers().forEach(с -> generateOffsetParams(с, o));
                             return ResponseEntity.ok(s);
                         }));
     }
@@ -96,14 +97,14 @@ public class ClusterService {
                     .map(s -> ResponseEntity.ok(Flux.fromIterable(s)));
     }
 
-    private void fillOffsetParams(ConsumerDetail consumerDetail, Map<TopicPartition, OffsetAndMetadata> topicMetadata) {
+    private void generateOffsetParams(ConsumerDetail consumerDetail, Map<TopicPartition, OffsetAndMetadata> topicMetadata) {
         List<Long> currentOffsets = new ArrayList<>();
         List<Long> behindMessagesList = new ArrayList<>();
         var endOffsets = consumerDetail.getEndOffset();
-        var topics = consumerDetail.getTopic();
-        var partitions = consumerDetail.getPartition();
-        for (int i = 0; i < consumerDetail.getTopic().size(); i++) {
-            Long currentOffset = topicMetadata.get(new TopicPartition(topics.get(i), partitions.get(i))).offset();
+        var topicPartitionList = consumerDetail.getTopicPartition();
+        for (int i = 0; i < consumerDetail.getTopicPartition().size(); i++) {
+            var topicPartition = new TopicPartition(topicPartitionList.get(i).getTopic(), topicPartitionList.get(i).getPartition());
+            Long currentOffset = topicMetadata.get(topicPartition).offset();
             currentOffsets.add(currentOffset);
             behindMessagesList.add(endOffsets.get(i) - currentOffset);
         }

+ 9 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

@@ -11,6 +11,7 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import reactor.core.publisher.Mono;
+import com.provectus.kafka.ui.model.TopicPartitionDto;
 
 import java.util.*;
 import java.util.stream.Collectors;
@@ -41,8 +42,7 @@ public class ClusterUtil {
     public static ConsumerDetail partlyConvertToConsumerDetail(MemberDescription consumer, String consumerGroupId, KafkaCluster cluster) {
         ConsumerDetail partlyResult = new ConsumerDetail();
         partlyResult.setConsumerId(consumer.consumerId());
-        partlyResult.setPartition((consumer.assignment().topicPartitions().stream().map(TopicPartition::partition).collect(Collectors.toList())));
-        partlyResult.setTopic((consumer.assignment().topicPartitions().stream().map(TopicPartition::topic).collect(Collectors.toList())));
+        partlyResult.setTopicPartition(consumer.assignment().topicPartitions().stream().map(ClusterUtil::toTopicPartitionDto).collect(Collectors.toList()));
         partlyResult.setEndOffset(new ArrayList(getEndOffsets(consumer.assignment().topicPartitions(), consumerGroupId, cluster.getBootstrapServers()).values()));
         return partlyResult;
     }
@@ -59,4 +59,11 @@ public class ClusterUtil {
         }
         return result;
     }
+
+    private static TopicPartitionDto toTopicPartitionDto(TopicPartition topicPartition) {
+        TopicPartitionDto result = new TopicPartitionDto();
+        result.setTopic(topicPartition.topic());
+        result.setPartition(topicPartition.partition());
+        return result;
+    }
 }

+ 13 - 6
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -368,19 +368,26 @@ components:
         numTopics:
           type: integer
 
+    TopicPartitionDto:
+      type: object
+      properties:
+        topic:
+          type: string
+        partition:
+          type: integer
+      required:
+        - topic
+        - partition
+
     ConsumerDetail:
       type: object
       properties:
         consumerId:
           type: string
-        topic:
-          type: array
-          items:
-            type: string
-        partition:
+        topicPartition:
           type: array
           items:
-            type: integer
+            $ref: '#/components/schemas/TopicPartitionDto'
         messagesBehind:
           type: array
           items: