From 66afaa49713c8897209a78c68652a0c7fd228a1f Mon Sep 17 00:00:00 2001 From: Roman Nedzvetskiy Date: Fri, 24 Jul 2020 14:16:49 +0300 Subject: [PATCH] Added begin and end offset param (#78) * Added begin and end offset param * moved consumer to try with resources block * Fixed some problems * Moved to gettopicdetails * Cleanup code Co-authored-by: Roman Nedzvetskiy Co-authored-by: German Osin --- docker/kafka-clusters-only.yaml | 86 ++++++++++--------- .../kafka/ui/cluster/model/InternalTopic.java | 1 + .../ui/cluster/service/ClusterService.java | 9 +- .../kafka/ui/kafka/KafkaService.java | 28 ++++-- .../kafka/ui/rest/MetricsRestController.java | 4 +- .../main/resources/swagger/kafka-ui-api.yaml | 12 +++ 6 files changed, 88 insertions(+), 52 deletions(-) diff --git a/docker/kafka-clusters-only.yaml b/docker/kafka-clusters-only.yaml index f37569d4d7..ab58d52765 100644 --- a/docker/kafka-clusters-only.yaml +++ b/docker/kafka-clusters-only.yaml @@ -23,35 +23,39 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 - JMX_PORT: 9997 - KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997 - - kafka01: - image: confluentinc/cp-kafka:5.1.0 - depends_on: - - zookeeper0 - ports: - - 29093:29093 - - 9999:9999 - environment: - KAFKA_BROKER_ID: 2 - KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 JMX_PORT: 9997 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997 +# +# kafka01: +# image: confluentinc/cp-kafka:5.1.0 +# depends_on: +# - zookeeper0 +# ports: +# - 29093:29093 +# - 9999:9999 +# environment: +# KAFKA_BROKER_ID: 2 +# KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183 +# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090 +# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT +# KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT +# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 +# JMX_PORT: 9997 +# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997 kafka-init-topics0: image: confluentinc/cp-kafka:5.1.0 depends_on: - kafka0 - command: "bash -c 'echo Waiting for Kafka to be ready... && \ - cub kafka-ready -b kafka0:9092 1 20 && \ - kafka-topics --create --topic users --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183 && \ - kafka-topics --create --topic messages --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183'" + - kafka1 + command: + "kafka-console-producer --broker-list kafka1:9092 --topic secondUsers && \ + This is message 1 && \ + This is message 2 && \ + This is message 3 && \ + Message 4 && \ + Message 5" environment: KAFKA_BROKER_ID: ignored KAFKA_ZOOKEEPER_CONNECT: ignored @@ -79,7 +83,7 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092,PLAIN://localhost:29090 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 JMX_PORT: 9998 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998 @@ -97,21 +101,21 @@ services: networks: - default - schemaregistry0: - image: confluentinc/cp-schema-registry:5.1.0 - depends_on: - - zookeeper0 - - kafka0 - - kafka01 - ports: - - 8085:8085 - environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092 - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183 - SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT - SCHEMA_REGISTRY_HOST_NAME: schemaregistry - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085 - - SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" - SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO - SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas +# schemaregistry0: +# image: confluentinc/cp-schema-registry:5.1.0 +# depends_on: +# - zookeeper0 +# - kafka0 +# - kafka01 +# ports: +# - 8085:8085 +# environment: +# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092 +# SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183 +# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT +# SCHEMA_REGISTRY_HOST_NAME: schemaregistry +# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085 +# +# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" +# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO +# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java index 95783e5b3f..7967f178ab 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.cluster.model; +import com.provectus.kafka.ui.model.TopicPartitionDto; import lombok.Builder; import lombok.Data; import org.apache.kafka.common.TopicPartition; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index 7812e59914..6107839322 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -55,9 +55,12 @@ public class ClusterService { public Optional getTopicDetails(String name, String topicName) { return clustersStorage.getClusterByName(name) - .map(KafkaCluster::getTopics) - .map(t -> t.get(topicName)) - .map(clusterMapper::toTopicDetails); + .map(c -> { + var topic = c.getTopics().get(topicName); + return clusterMapper + .toTopicDetails(topic) + .partitions(kafkaService.partitionDtoList(topic, c)); + }); } public Optional> getTopicConfigs(String name, String topicName) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index 5d100894ee..5d6529acd8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -3,10 +3,7 @@ package com.provectus.kafka.ui.kafka; import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.cluster.util.JmxClusterUtil; -import com.provectus.kafka.ui.model.ConsumerGroup; -import com.provectus.kafka.ui.model.ServerStatus; -import com.provectus.kafka.ui.model.Topic; -import com.provectus.kafka.ui.model.TopicFormData; +import com.provectus.kafka.ui.model.*; import com.provectus.kafka.ui.zookeeper.ZookeeperService; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -26,7 +23,6 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; -import java.math.BigDecimal; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -351,4 +347,26 @@ public class KafkaService { }) ); } + + public List partitionDtoList (InternalTopic topic, KafkaCluster cluster) { + var topicPartitions = topic.getPartitions().stream().map(t -> new TopicPartition(topic.getName(), t.getPartition())).collect(Collectors.toList()); + return getTopicPartitionOffset(cluster, topicPartitions); + } + + private List getTopicPartitionOffset(KafkaCluster c, List topicPartitions ) { + try (var consumer = createConsumer(c)) { + final Map earliest = consumer.beginningOffsets(topicPartitions); + final Map latest = consumer.endOffsets(topicPartitions); + + return topicPartitions.stream() + .map( tp -> new TopicPartitionDto() + .topic(tp.topic()) + .partition(tp.partition()) + .offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L)) + .offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L)) + ).collect(Collectors.toList()); + } catch (Exception e) { + return Collections.emptyList(); + } + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index 10c559ae64..e566ee3738 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -5,7 +5,6 @@ import com.provectus.kafka.ui.cluster.model.ConsumerPosition; import com.provectus.kafka.ui.cluster.service.ClusterService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; - import org.apache.commons.lang3.tuple.Pair; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -14,12 +13,11 @@ import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import javax.validation.Valid; import java.util.Collections; import java.util.List; import java.util.function.Function; -import javax.validation.Valid; - @RestController @RequiredArgsConstructor public class MetricsRestController implements ApiClustersApi { diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 18f92deabd..6fe0f3a091 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -383,6 +383,10 @@ components: TopicDetails: type: object properties: + partitions: + type: array + items: + $ref: "#/components/schemas/TopicPartitionDto" partitionCount: type: integer replicationFactor: @@ -486,9 +490,17 @@ components: type: string partition: type: integer + offsetMax: + type: integer + format: int64 + offsetMin: + type: integer + format: int64 required: - topic - partition + - offsetMax + - offsetMin ConsumerTopicPartitionDetail: type: object