فهرست منبع

Moved to gettopicdetails

Roman Nedzvetskiy 5 سال پیش
والد
کامیت
e2325b9f72

+ 45 - 41
docker/kafka-clusters-only.yaml

@@ -23,35 +23,39 @@ services:
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
-      JMX_PORT: 9997
-      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
-
-  kafka01:
-    image: confluentinc/cp-kafka:5.1.0
-    depends_on:
-      - zookeeper0
-    ports:
-      - 29093:29093
-      - 9999:9999
-    environment:
-      KAFKA_BROKER_ID: 2
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
-      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       JMX_PORT: 9997
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
+#
+#  kafka01:
+#    image: confluentinc/cp-kafka:5.1.0
+#    depends_on:
+#      - zookeeper0
+#    ports:
+#      - 29093:29093
+#      - 9999:9999
+#    environment:
+#      KAFKA_BROKER_ID: 2
+#      KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
+#      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
+#      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
+#      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+#      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
+#      JMX_PORT: 9997
+#      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
 
   kafka-init-topics0:
     image: confluentinc/cp-kafka:5.1.0
     depends_on:
       - kafka0
-    command: "bash -c 'echo Waiting for Kafka to be ready... && \
-                cub kafka-ready -b kafka0:9092 1 20 && \
-                kafka-topics --create --topic users --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183 && \
-                kafka-topics --create --topic messages --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183'"
+      - kafka1
+    command:
+                "kafka-console-producer --broker-list kafka1:9092 --topic secondUsers && \
+                This is message 1 && \
+                This is message 2 && \
+                This is message 3 && \
+                Message 4 && \
+                Message 5"
     environment:
       KAFKA_BROKER_ID: ignored
       KAFKA_ZOOKEEPER_CONNECT: ignored
@@ -79,7 +83,7 @@ services:
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092,PLAIN://localhost:29090
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       JMX_PORT: 9998
       KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998
 
@@ -97,21 +101,21 @@ services:
     networks:
      - default
 
-  schemaregistry0:
-    image: confluentinc/cp-schema-registry:5.1.0
-    depends_on:
-      - zookeeper0
-      - kafka0
-      - kafka01
-    ports:
-      - 8085:8085
-    environment:
-      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
-      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
-      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
-      SCHEMA_REGISTRY_HOST_NAME: schemaregistry
-      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
-
-      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
-      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
-      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+#  schemaregistry0:
+#    image: confluentinc/cp-schema-registry:5.1.0
+#    depends_on:
+#      - zookeeper0
+#      - kafka0
+#      - kafka01
+#    ports:
+#      - 8085:8085
+#    environment:
+#      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
+#      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
+#      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+#      SCHEMA_REGISTRY_HOST_NAME: schemaregistry
+#      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
+#
+#      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+#      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+#      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.cluster.model;
 
+import com.provectus.kafka.ui.model.TopicPartitionDto;
 import lombok.Builder;
 import lombok.Data;
 import org.apache.kafka.common.TopicPartition;
@@ -25,4 +26,5 @@ public class InternalTopic {
     private final long segmentSize;
     private final int segmentCount;
     private final Map<TopicPartition, Long> partitionSegmentSize;
+    private final List<TopicPartitionDto> offsets;
 }

+ 6 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -9,18 +9,15 @@ import com.provectus.kafka.ui.kafka.KafkaService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.math.BigDecimal;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -56,11 +53,13 @@ public class ClusterService {
                 ).orElse(Collections.emptyList());
     }
 
-    public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
+    public Optional<TopicDetails> getTopicDetails(String name, String topicName) { ;
         return clustersStorage.getClusterByName(name)
-                .map(KafkaCluster::getTopics)
-                .map(t -> t.get(topicName))
-                .map(clusterMapper::toTopicDetails);
+                .map(c -> {
+                 var topic = c.getTopics().get(topicName);
+                 var internalTopic = kafkaService.fillOffsets(topic, c);
+                 return clusterMapper.toTopicDetails(internalTopic);
+                });
     }
                                                                            
     public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
@@ -156,9 +155,4 @@ public class ClusterService {
                 .orElse(Flux.empty());
 
     }
-
-    public Mono<Map<String, BigDecimal>> getOffsets(String clusterName) {
-        return clustersStorage.getClusterByName(clusterName)
-                .map(c -> kafkaService.getOffsets( c))).orElseThrow();
-    }
 }

+ 19 - 37
kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

@@ -3,10 +3,7 @@ package com.provectus.kafka.ui.kafka;
 import com.provectus.kafka.ui.cluster.model.*;
 import com.provectus.kafka.ui.cluster.util.ClusterUtil;
 import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
-import com.provectus.kafka.ui.model.ConsumerGroup;
-import com.provectus.kafka.ui.model.ServerStatus;
-import com.provectus.kafka.ui.model.Topic;
-import com.provectus.kafka.ui.model.TopicFormData;
+import com.provectus.kafka.ui.model.*;
 import com.provectus.kafka.ui.zookeeper.ZookeeperService;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
@@ -26,7 +23,6 @@ import reactor.core.publisher.Mono;
 import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 
-import java.math.BigDecimal;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -352,43 +348,29 @@ public class KafkaService {
             );
     }
 
-    public Mono<Map<String, BigDecimal>> getOffsets (KafkaCluster c) {
-        return getOrCreateAdminClient(c).flatMap(ac ->
-                getTopicPartitionList(ac.getAdminClient())
-                .map(s -> {
-                    var tps = s.stream()
-                            .map(tp ->
-                                    tp.entrySet().stream()
-                                            .map(tp1 -> tp1.getValue().stream()
-                                                    .map(tt -> new TopicPartition(tp1.getKey(), tt))
-                                                    .collect(Collectors.toList()))
-                                            .collect(Collectors.toList()))
-                            .collect(Collectors.toList())
-                            .stream().flatMap(List::stream).collect(Collectors.toList())
-                            .stream().flatMap(List::stream).collect(Collectors.toList());
-                    return getTopicPartitionOffset(c, tps);
-                }));
+    public InternalTopic fillOffsets (InternalTopic topic, KafkaCluster cluster) {
+        var topicPartitions = topic.getPartitions().stream().map(t -> new TopicPartition(topic.getName(), t.getPartition())).collect(Collectors.toList());
+        return topic.toBuilder().offsets(getTopicPartitionOffset(cluster, topicPartitions)).build();
     }
 
-    private Mono<List<Map<String, List<Integer>>>> getTopicPartitionList(AdminClient ac) {
-        return ClusterUtil.toMono(ac.listTopics().names())
-                .flatMap(l -> ClusterUtil.toMono(ac.describeTopics(l).all()))
-                .map(t -> t.entrySet().stream()
-                        .map(e -> Map.of(e.getKey(), e.getValue().partitions().stream()
-                                .flatMap(tp -> Stream.of(tp.partition()))
-                                .collect(Collectors.toList())))
-                        .collect(Collectors.toList()));
-    }
-
-    private Map<String, BigDecimal> getTopicPartitionOffset(KafkaCluster c, List<TopicPartition> topicPartitions )  {
+    private List<TopicPartitionDto> getTopicPartitionOffset(KafkaCluster c, List<TopicPartition> topicPartitions )  {
         try (var consumer = createConsumer(c)) {
-            var offset = ClusterUtil.toSingleMap(consumer.beginningOffsets(topicPartitions).entrySet().stream()
-                    .map(e -> Map.of(e.getKey().topic() + '-' + e.getKey().partition() + '-' + "min", new BigDecimal(e.getValue()))));
-            offset.putAll(ClusterUtil.toSingleMap(consumer.endOffsets(topicPartitions).entrySet().stream()
-                    .map(e -> Map.of(e.getKey().topic() + '-' + e.getKey().partition() + '-' + "min", new BigDecimal(e.getValue())))));
+            var offset = consumer.beginningOffsets(topicPartitions).entrySet().stream()
+                    .map(e -> {
+                        var tempResult = new TopicPartitionDto();
+                        tempResult.setTopic(e.getKey().topic());
+                        tempResult.setPartition(e.getKey().partition());
+                        tempResult.setOffsetMin(e.getValue().intValue());
+                        return tempResult;
+                    }).map(o -> consumer.endOffsets(topicPartitions).entrySet()
+                            .stream().filter(max -> o.getTopic().equals(max.getKey().topic()))
+                            .map(max -> {
+                                o.setOffsetMax(max.getValue().intValue());
+                                return o;
+                            }).findFirst().orElse(o)).collect(Collectors.toList());
             return offset;
         } catch (Exception e) {
-            return Collections.emptyMap();
+            return Collections.emptyList();
         }
     }
 }

+ 1 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -5,7 +5,6 @@ import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
 import com.provectus.kafka.ui.cluster.service.ClusterService;
 import com.provectus.kafka.ui.model.*;
 import lombok.RequiredArgsConstructor;
-
 import org.apache.commons.lang3.tuple.Pair;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
@@ -14,14 +13,11 @@ import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.math.BigDecimal;
+import javax.validation.Valid;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Function;
 
-import javax.validation.Valid;
-
 @RestController
 @RequiredArgsConstructor
 public class MetricsRestController implements ApiClustersApi {
@@ -42,11 +38,6 @@ public class MetricsRestController implements ApiClustersApi {
         );
     }
 
-    @Override
-    public Mono<ResponseEntity<Map<String, BigDecimal>>> getOffsets(String clusterName, ServerWebExchange exchange) {
-        return clusterService.getOffsets(clusterName).map(ResponseEntity::ok);
-    }
-
     @Override
     public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
         return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));

+ 10 - 22
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -290,28 +290,6 @@ paths:
                 items:
                   $ref: '#/components/schemas/ConsumerGroup'
 
-  /api/clusters/offsets/{clusterName}:
-    get:
-      tags:
-        - /api/clusters
-      summary: get offsets
-      operationId: getOffsets
-      parameters:
-        - name: clusterName
-          in: path
-          required: true
-          schema:
-            type: string
-      responses:
-        200:
-          description: OK
-          content:
-            application/json:
-              schema:
-                type: object
-                additionalProperties:
-                  type: number
-
 components:
   schemas:
     Cluster:
@@ -405,6 +383,10 @@ components:
     TopicDetails:
       type: object
       properties:
+        offsets:
+          type: array
+          items:
+            $ref: "#/components/schemas/TopicPartitionDto"
         partitionCount:
           type: integer
         replicationFactor:
@@ -508,9 +490,15 @@ components:
           type: string
         partition:
           type: integer
+        offsetMax:
+          type: integer
+        offsetMin:
+          type: integer
       required:
         - topic
         - partition
+        - offsetMax
+        - offsetMin
 
     ConsumerTopicPartitionDetail:
       type: object