Explorar o código

Added begin and end offset param

Roman Nedzvetskiy %!s(int64=5) %!d(string=hai) anos
pai
achega
cd5720114d

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -9,15 +9,18 @@ import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.math.BigDecimal;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -153,4 +156,10 @@ public class ClusterService {
                 .orElse(Flux.empty());
 
     }
+
+    public Mono<Map<String, BigDecimal>> getOffsets(String clusterName) {
+        return clustersStorage.getClusterByName(clusterName)
+                .map(c -> kafkaService.getOrCreateAdminClient(c)
+                        .flatMap(a -> kafkaService.getOffsets(a.getAdminClient(), c))).orElseThrow();
+    }
 }

+ 36 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -351,4 +351,40 @@ public class KafkaService {
                 })
             );
     }
+
+    public Mono<Map<String, BigDecimal>> getOffsets (AdminClient ac, KafkaCluster c) {
+        return getTopicPartitionList(ac)
+                .map(s -> {
+                    var tps = s.stream()
+                            .map(tp ->
+                                    tp.entrySet().stream()
+                                            .map(tp1 -> tp1.getValue().stream()
+                                                    .map(tt -> new TopicPartition(tp1.getKey(), tt))
+                                                    .collect(Collectors.toList()))
+                                            .collect(Collectors.toList()))
+                            .collect(Collectors.toList())
+                            .stream().flatMap(List::stream).collect(Collectors.toList())
+                            .stream().flatMap(List::stream).collect(Collectors.toList());
+                    return getTopicPartitionOffset(c, tps);
+                });
+    }
+
+    private Mono<List<Map<String, List<Integer>>>> getTopicPartitionList(AdminClient ac) {
+        return ClusterUtil.toMono(ac.listTopics().names())
+                .flatMap(l -> ClusterUtil.toMono(ac.describeTopics(l).all()))
+                .map(t -> t.entrySet().stream()
+                        .map(e -> Map.of(e.getKey(), e.getValue().partitions().stream()
+                                .flatMap(tp -> Stream.of(tp.partition()))
+                                .collect(Collectors.toList())))
+                        .collect(Collectors.toList()));
+    }
+
+    private Map<String, BigDecimal> getTopicPartitionOffset(KafkaCluster c, List<TopicPartition> topicPartitions )  {
+            var offset = ClusterUtil.toSingleMap(createConsumer(c).beginningOffsets(topicPartitions).entrySet().stream()
+                    .map(e -> Map.of(e.getKey().topic() + '-' + e.getKey().partition() + '-' + "min", new BigDecimal(e.getValue()))));
+            offset.putAll(ClusterUtil.toSingleMap(createConsumer(c).endOffsets(topicPartitions).entrySet().stream()
+                    .map(e -> Map.of(e.getKey().topic() + '-' + e.getKey().partition() + '-' + "min", new BigDecimal(e.getValue())))));
+            return offset;
+
+    }
 }

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -14,8 +14,10 @@ import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 
 import javax.validation.Valid;
@@ -40,6 +42,11 @@ public class MetricsRestController implements ApiClustersApi {
         );
     }
 
+    @Override
+    public Mono<ResponseEntity<Map<String, BigDecimal>>> getOffsets(String clusterName, ServerWebExchange exchange) {
+        return clusterService.getOffsets(clusterName).map(ResponseEntity::ok);
+    }
+
     @Override
     public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));

+ 22 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -290,6 +290,28 @@ paths:
                 items:
                   $ref: '#/components/schemas/ConsumerGroup'
 
+  /api/clusters/offsets/{clusterName}:
+    get:
+      tags:
+        - /api/clusters
+      summary: get offsets
+      operationId: getOffsets
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: object
+                additionalProperties:
+                  type: number
+
 components:
   schemas:
     Cluster: