diff --git a/docker/kafka-clusters-only.yaml b/docker/kafka-clusters-only.yaml index bc98db9e5e..fc51d9d3a1 100644 --- a/docker/kafka-clusters-only.yaml +++ b/docker/kafka-clusters-only.yaml @@ -5,10 +5,10 @@ services: zookeeper0: image: confluentinc/cp-zookeeper:5.1.0 environment: - ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_CLIENT_PORT: 2183 ZOOKEEPER_TICK_TIME: 2000 ports: - - 2181:2181 + - 2183:2183 kafka0: image: confluentinc/cp-kafka:5.1.0 @@ -19,11 +19,28 @@ services: - 9997:9997 environment: KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29091,PLAINTEXT_HOST://localhost:9091,PLAIN://kafka0:29090 + KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 + JMX_PORT: 9997 + KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997 + + kafka01: + image: confluentinc/cp-kafka:5.1.0 + depends_on: + - zookeeper0 + ports: + - 29093:29093 + - 9999:9999 + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 JMX_PORT: 9997 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997 @@ -32,9 +49,9 @@ services: depends_on: - kafka0 command: "bash -c 'echo Waiting for Kafka to be ready... && \ - cub kafka-ready -b kafka0:29090 1 20 && \ - kafka-topics --create --topic users --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \ - kafka-topics --create --topic messages --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181'" + cub kafka-ready -b kafka0:9092 1 20 && \ + kafka-topics --create --topic users --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183 && \ + kafka-topics --create --topic messages --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183'" environment: KAFKA_BROKER_ID: ignored KAFKA_ZOOKEEPER_CONNECT: ignored @@ -44,10 +61,10 @@ services: zookeeper1: image: confluentinc/cp-zookeeper:5.1.0 environment: - ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_CLIENT_PORT: 2182 ZOOKEEPER_TICK_TIME: 2000 ports: - - 2182:2181 + - 2182:2182 kafka1: image: confluentinc/cp-kafka:5.1.0 @@ -58,24 +75,24 @@ services: - 9998:9998 environment: KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092,PLAIN://kafka1:29090 + KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2182 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092,PLAIN://localhost:29090 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2 JMX_PORT: 9998 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998 kafka-init-topics1: image: confluentinc/cp-kafka:5.1.0 depends_on: - - kafka1 + - kafka1 command: "bash -c 'echo Waiting for Kafka to be ready... && \ - cub kafka-ready -b kafka1:29090 1 20 && \ - kafka-topics --create --topic users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \ - kafka-topics --create --topic messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181'" + cub kafka-ready -b kafka1:9092 20 && \ + kafka-topics --create --topic secondUsers --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper1:2182 && \ + kafka-topics --create --topic secondMessages --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper1:2182'" environment: - KAFKA_BROKER_ID: ignored - KAFKA_ZOOKEEPER_CONNECT: ignored + KAFKA_BROKER_ID: ignored + KAFKA_ZOOKEEPER_CONNECT: ignored networks: - - default \ No newline at end of file + - default \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/BrokersMetricsMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/BrokersMetricsMapper.java deleted file mode 100644 index f78540063e..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/BrokersMetricsMapper.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.provectus.kafka.ui.cluster.mapper; - -import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics; -import com.provectus.kafka.ui.model.BrokersMetrics; -import org.mapstruct.Mapper; - -@Mapper(componentModel = "spring") -public interface BrokersMetricsMapper { - - InternalClusterMetrics toBrokersMetricsDto (BrokersMetrics brokersMetrics); - - BrokersMetrics toBrokersMetrics (InternalClusterMetrics brokersMetrics); -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterDtoMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterDtoMapper.java deleted file mode 100644 index a8f6bb60cc..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterDtoMapper.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.provectus.kafka.ui.cluster.mapper; - -import com.provectus.kafka.ui.cluster.model.KafkaCluster; -import com.provectus.kafka.ui.model.Cluster; -import org.mapstruct.Mapper; - -@Mapper(componentModel = "spring") -public interface ClusterDtoMapper { - - KafkaCluster toInternalCluster(Cluster cluster); - Cluster toClusterDto(KafkaCluster cluster); -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java new file mode 100644 index 0000000000..734ef31b07 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java @@ -0,0 +1,10 @@ +package com.provectus.kafka.ui.cluster.model; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder(toBuilder = true) +public class InternalBrokerMetrics { + private final Long segmentSize; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java index b7baa7c6e3..3fa72b61db 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java @@ -1,9 +1,10 @@ package com.provectus.kafka.ui.cluster.model; -import com.provectus.kafka.ui.model.ServerStatus; import lombok.Builder; import lombok.Data; +import java.util.Map; + @Data @Builder(toBuilder = true) @@ -20,8 +21,9 @@ public class InternalClusterMetrics { //TODO: find way to fill private final int bytesInPerSec; private final int bytesOutPerSec; - //TODO: find way to fill - private final int segmentSize; private final int segmentCount; + //TODO: find way to fill + private final long segmentSize; + private final Map internalBrokerMetrics; private final int zooKeeperStatus; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalSegmentSizeDto.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalSegmentSizeDto.java new file mode 100644 index 0000000000..cba7e031b7 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalSegmentSizeDto.java @@ -0,0 +1,14 @@ +package com.provectus.kafka.ui.cluster.model; + +import lombok.Builder; +import lombok.Data; + +import java.util.Map; + +@Data +@Builder(toBuilder = true) +public class InternalSegmentSizeDto { + + private final Map internalTopicWithSegmentSize; + private final InternalClusterMetrics clusterMetricsWithSegmentSize; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java index d2fc8b4a58..95783e5b3f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java @@ -2,8 +2,10 @@ package com.provectus.kafka.ui.cluster.model; import lombok.Builder; import lombok.Data; +import org.apache.kafka.common.TopicPartition; import java.util.List; +import java.util.Map; @Data @Builder(toBuilder = true) @@ -20,6 +22,7 @@ public class InternalTopic { private final int replicationFactor; private final int underReplicatedPartitions; //TODO: find way to fill - private final int segmentSize; + private final long segmentSize; private final int segmentCount; + private final Map partitionSegmentSize; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java index 7d3d38592a..783f17e30b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java @@ -16,7 +16,7 @@ public class MetricsUpdateService { private final KafkaService kafkaService; public Mono updateMetrics(KafkaCluster kafkaCluster) { - log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster); + log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName()); return kafkaService.getUpdatedCluster(kafkaCluster); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java index af10406c20..fa72f46d3e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java @@ -173,4 +173,9 @@ public class ClusterUtil { throw new IllegalArgumentException("Unknown timestampType: " + timestampType); } } + public static Map toSingleMap (Stream> streamOfMaps) { + return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow(); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index c98ec42a0f..0c49ba4f31 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -1,9 +1,6 @@ package com.provectus.kafka.ui.kafka; -import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics; -import com.provectus.kafka.ui.cluster.model.InternalTopic; -import com.provectus.kafka.ui.cluster.model.InternalTopicConfig; -import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.cluster.model.*; import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.ServerStatus; @@ -17,6 +14,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.utils.Bytes; @@ -25,12 +23,10 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import java.util.stream.Stream; @Service @RequiredArgsConstructor @@ -41,15 +37,19 @@ public class KafkaService { private final ZookeeperService zookeeperService; private final Map adminClientCache = new ConcurrentHashMap<>(); + private final Map> leadersCache = new ConcurrentHashMap<>(); @SneakyThrows public Mono getUpdatedCluster(KafkaCluster cluster) { return getOrCreateAdminClient(cluster).flatMap( - ac -> getClusterMetrics(ac).flatMap( clusterMetrics -> + ac -> getClusterMetrics(ac) + + .flatMap( clusterMetrics -> getTopicsData(ac).flatMap( topics -> loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList())) - .map( configs -> mergeWithConfigs(topics, configs) ) - ).map( topics -> buildFromData(cluster, clusterMetrics, topics)) + .map( configs -> mergeWithConfigs(topics, configs)) + .flatMap(it -> updateSegmentMetrics(ac, clusterMetrics, it)) + ).map( segmentSizeDto -> buildFromData(cluster, segmentSizeDto)) ) ).onErrorResume( e -> Mono.just(cluster.toBuilder() @@ -59,7 +59,10 @@ public class KafkaService { ); } - private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalClusterMetrics brokersMetrics, Map topics) { + private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalSegmentSizeDto segmentSizeDto) { + + var topics = segmentSizeDto.getInternalTopicWithSegmentSize(); + var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize(); InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = brokersMetrics.toBuilder(); @@ -134,6 +137,17 @@ public class KafkaService { private Mono> getTopicsData(AdminClient adminClient) { return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names()) .flatMap(topics -> ClusterUtil.toMono(adminClient.describeTopics(topics).all())) + .map(topic -> { + var leadersMap = topic.values().stream() + .flatMap(t -> t.partitions().stream() + .flatMap(t1 -> { + Map result = new HashMap<>(); + result.put(new TopicPartition(t.name(), t1.partition()), t1.leader().id()); + return Stream.of(result); + })); + leadersCache.put(adminClient, ClusterUtil.toSingleMap(leadersMap)); + return topic; + }) .map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())); } @@ -142,11 +156,13 @@ public class KafkaService { .flatMap(brokers -> ClusterUtil.toMono(client.describeCluster().controller()).map( c -> { - InternalClusterMetrics.InternalClusterMetricsBuilder builder = InternalClusterMetrics.builder(); - builder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0); + InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder(); + metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0); // TODO: fill bytes in/out metrics - List brokerIds = brokers.stream().map(Node::id).collect(Collectors.toList()); - return builder.build(); + metricsBuilder + .internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build())))); + + return metricsBuilder.build(); } ) ); @@ -236,7 +252,6 @@ public class KafkaService { public Mono> getConsumerGroups(KafkaCluster cluster) { var adminClient = this.createAdminClient(cluster); - return ClusterUtil.toMono(adminClient.listConsumerGroups().all()) .flatMap(s -> ClusterUtil.toMono(adminClient .describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all())) @@ -263,4 +278,45 @@ public class KafkaService { .iterator() .next()); } + + private Mono updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, Map internalTopic) { + return ClusterUtil.toMono(ac.describeTopics(internalTopic.keySet()).all()).flatMap(topic -> + ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all()) + .map(log -> { + var partitionSegmentSizeStream = leadersCache.get(ac).entrySet().stream() + .flatMap(l -> { + Map result = new HashMap<>(); + result.put(l.getKey(), log.get(l.getValue()).values().stream().mapToLong(e -> e.replicaInfos.get(l.getKey()).size).sum()); + return Stream.of(result); + }); + var partitionSegmentSize = ClusterUtil.toSingleMap(partitionSegmentSizeStream); + + var resultTopicMetricsStream = internalTopic.keySet().stream().flatMap(k -> { + Map result = new HashMap<>(); + result.put(k, internalTopic.get(k).toBuilder() + .segmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).mapToLong(Map.Entry::getValue).sum()) + .partitionSegmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).build()); + return Stream.of(result); + }); + + var resultBrokerMetricsStream = clusterMetrics.getInternalBrokerMetrics().entrySet().stream().map( + e -> { + var brokerSegmentSize = log.get(e.getKey()).values().stream() + .mapToLong(v -> v.replicaInfos.values().stream() + .mapToLong(r -> r.size).sum()).sum(); + InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build(); + return Collections.singletonMap(e.getKey(), tempBrokerMetrics); + }); + + var resultClusterMetrics = clusterMetrics.toBuilder() + .internalBrokerMetrics(ClusterUtil.toSingleMap(resultBrokerMetricsStream)) + .segmentSize(partitionSegmentSize.values().stream().reduce(Long::sum).orElseThrow()) + .build(); + + return InternalSegmentSizeDto.builder() + .clusterMetricsWithSegmentSize(resultClusterMetrics) + .internalTopicWithSegmentSize(ClusterUtil.toSingleMap(resultTopicMetricsStream)).build(); + }) + ); + } } diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index 0b59a9cdbc..e61a941d56 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -3,8 +3,12 @@ kafka: - name: local bootstrapServers: localhost:29091 - zookeeper: localhost:2181 + zookeeper: localhost:2183 - name: secondLocal bootstrapServers: localhost:29092 - zookeeper: localhost:2182 \ No newline at end of file + zookeeper: localhost:2182 + - + name: localReplica + bootstrapServers: localhost:29093 + zookeeper: localhost:2183 \ No newline at end of file diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index a46ccfbe8e..bd467af68e 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -281,6 +281,8 @@ components: type: integer outOfSyncReplicasCount: type: integer + segmentZise: + type: integer Topic: type: object