Added segmentSize for topic level (#48)

* Added segmentSize for topic level

* Added segmentSize to clusterMetrics object

* Added internalClusterState for storing clusterMetrics linked to cluster's brokersIds

* InternalBrokersMetrics param added to InternalClusterMetrics

* Changed topic segment counting logic by finding the partition leader, added partition collecting in topic

* Added segmentSize info to cluster, broker, topic and topicPartition level

* Reduce to map moved to clusterutil, leader moved in cache

* Cleared some code

Co-authored-by: Roman Nedzvetskiy <roman@Romans-MacBook-Pro.local>
This commit is contained in:
Roman Nedzvetskiy 2020-05-28 13:11:50 +03:00 committed by GitHub
parent 803f0be7d7
commit 9a5ffc9eb5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 157 additions and 69 deletions

View file

@ -5,10 +5,10 @@ services:
zookeeper0:
image: confluentinc/cp-zookeeper:5.1.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_CLIENT_PORT: 2183
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
- 2183:2183
kafka0:
image: confluentinc/cp-kafka:5.1.0
@ -19,11 +19,28 @@ services:
- 9997:9997
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29091,PLAINTEXT_HOST://localhost:9091,PLAIN://kafka0:29090
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
kafka01:
image: confluentinc/cp-kafka:5.1.0
depends_on:
- zookeeper0
ports:
- 29093:29093
- 9999:9999
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
@ -32,9 +49,9 @@ services:
depends_on:
- kafka0
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka0:29090 1 20 && \
kafka-topics --create --topic users --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
kafka-topics --create --topic messages --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181'"
cub kafka-ready -b kafka0:9092 1 20 && \
kafka-topics --create --topic users --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183 && \
kafka-topics --create --topic messages --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183'"
environment:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
@ -44,10 +61,10 @@ services:
zookeeper1:
image: confluentinc/cp-zookeeper:5.1.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_CLIENT_PORT: 2182
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2182:2181
- 2182:2182
kafka1:
image: confluentinc/cp-kafka:5.1.0
@ -58,11 +75,11 @@ services:
- 9998:9998
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092,PLAIN://kafka1:29090
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2182
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092,PLAINTEXT_HOST://localhost:29092,PLAIN://localhost:29090
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
JMX_PORT: 9998
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998
@ -71,9 +88,9 @@ services:
depends_on:
- kafka1
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka1:29090 1 20 && \
kafka-topics --create --topic users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
kafka-topics --create --topic messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181'"
cub kafka-ready -b kafka1:9092 20 && \
kafka-topics --create --topic secondUsers --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper1:2182 && \
kafka-topics --create --topic secondMessages --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper1:2182'"
environment:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored

View file

@ -1,13 +0,0 @@
package com.provectus.kafka.ui.cluster.mapper;
import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
import com.provectus.kafka.ui.model.BrokersMetrics;
import org.mapstruct.Mapper;
@Mapper(componentModel = "spring")
public interface BrokersMetricsMapper {
InternalClusterMetrics toBrokersMetricsDto (BrokersMetrics brokersMetrics);
BrokersMetrics toBrokersMetrics (InternalClusterMetrics brokersMetrics);
}

View file

@ -1,12 +0,0 @@
package com.provectus.kafka.ui.cluster.mapper;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.model.Cluster;
import org.mapstruct.Mapper;
@Mapper(componentModel = "spring")
public interface ClusterDtoMapper {
KafkaCluster toInternalCluster(Cluster cluster);
Cluster toClusterDto(KafkaCluster cluster);
}

View file

@ -0,0 +1,10 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
@Data
@Builder(toBuilder = true)
public class InternalBrokerMetrics {
private final Long segmentSize;
}

View file

@ -1,9 +1,10 @@
package com.provectus.kafka.ui.cluster.model;
import com.provectus.kafka.ui.model.ServerStatus;
import lombok.Builder;
import lombok.Data;
import java.util.Map;
@Data
@Builder(toBuilder = true)
@ -20,8 +21,9 @@ public class InternalClusterMetrics {
//TODO: find way to fill
private final int bytesInPerSec;
private final int bytesOutPerSec;
//TODO: find way to fill
private final int segmentSize;
private final int segmentCount;
//TODO: find way to fill
private final long segmentSize;
private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
private final int zooKeeperStatus;
}

View file

@ -0,0 +1,14 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
import java.util.Map;
@Data
@Builder(toBuilder = true)
public class InternalSegmentSizeDto {
private final Map<String, InternalTopic> internalTopicWithSegmentSize;
private final InternalClusterMetrics clusterMetricsWithSegmentSize;
}

View file

@ -2,8 +2,10 @@ package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
import java.util.Map;
@Data
@Builder(toBuilder = true)
@ -20,6 +22,7 @@ public class InternalTopic {
private final int replicationFactor;
private final int underReplicatedPartitions;
//TODO: find way to fill
private final int segmentSize;
private final long segmentSize;
private final int segmentCount;
private final Map<TopicPartition, Long> partitionSegmentSize;
}

View file

@ -16,7 +16,7 @@ public class MetricsUpdateService {
private final KafkaService kafkaService;
public Mono<KafkaCluster> updateMetrics(KafkaCluster kafkaCluster) {
log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster);
log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster.getName());
return kafkaService.getUpdatedCluster(kafkaCluster);
}
}

View file

@ -173,4 +173,9 @@ public class ClusterUtil {
throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
}
}
public static <T, R> Map<T, R> toSingleMap (Stream<Map<T, R>> streamOfMaps) {
return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();
}
}

View file

@ -1,9 +1,6 @@
package com.provectus.kafka.ui.kafka;
import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
import com.provectus.kafka.ui.cluster.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ServerStatus;
@ -17,6 +14,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
@ -25,12 +23,10 @@ import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Service
@RequiredArgsConstructor
@ -41,15 +37,19 @@ public class KafkaService {
private final ZookeeperService zookeeperService;
private final Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
@SneakyThrows
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
return getOrCreateAdminClient(cluster).flatMap(
ac -> getClusterMetrics(ac).flatMap( clusterMetrics ->
ac -> getClusterMetrics(ac)
.flatMap( clusterMetrics ->
getTopicsData(ac).flatMap( topics ->
loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
.map( configs -> mergeWithConfigs(topics, configs) )
).map( topics -> buildFromData(cluster, clusterMetrics, topics))
.map( configs -> mergeWithConfigs(topics, configs))
.flatMap(it -> updateSegmentMetrics(ac, clusterMetrics, it))
).map( segmentSizeDto -> buildFromData(cluster, segmentSizeDto))
)
).onErrorResume(
e -> Mono.just(cluster.toBuilder()
@ -59,7 +59,10 @@ public class KafkaService {
);
}
private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalClusterMetrics brokersMetrics, Map<String, InternalTopic> topics) {
private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalSegmentSizeDto segmentSizeDto) {
var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize();
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = brokersMetrics.toBuilder();
@ -134,6 +137,17 @@ public class KafkaService {
private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names())
.flatMap(topics -> ClusterUtil.toMono(adminClient.describeTopics(topics).all()))
.map(topic -> {
var leadersMap = topic.values().stream()
.flatMap(t -> t.partitions().stream()
.flatMap(t1 -> {
Map<TopicPartition, Integer> result = new HashMap<>();
result.put(new TopicPartition(t.name(), t1.partition()), t1.leader().id());
return Stream.of(result);
}));
leadersCache.put(adminClient, ClusterUtil.toSingleMap(leadersMap));
return topic;
})
.map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
}
@ -142,11 +156,13 @@ public class KafkaService {
.flatMap(brokers ->
ClusterUtil.toMono(client.describeCluster().controller()).map(
c -> {
InternalClusterMetrics.InternalClusterMetricsBuilder builder = InternalClusterMetrics.builder();
builder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder();
metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
// TODO: fill bytes in/out metrics
List<Integer> brokerIds = brokers.stream().map(Node::id).collect(Collectors.toList());
return builder.build();
metricsBuilder
.internalBrokerMetrics((brokers.stream().map(Node::id).collect(Collectors.toMap(k -> k, v -> InternalBrokerMetrics.builder().build()))));
return metricsBuilder.build();
}
)
);
@ -236,7 +252,6 @@ public class KafkaService {
public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
var adminClient = this.createAdminClient(cluster);
return ClusterUtil.toMono(adminClient.listConsumerGroups().all())
.flatMap(s -> ClusterUtil.toMono(adminClient
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
@ -263,4 +278,45 @@ public class KafkaService {
.iterator()
.next());
}
private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
return ClusterUtil.toMono(ac.describeTopics(internalTopic.keySet()).all()).flatMap(topic ->
ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
.map(log -> {
var partitionSegmentSizeStream = leadersCache.get(ac).entrySet().stream()
.flatMap(l -> {
Map<TopicPartition, Long> result = new HashMap<>();
result.put(l.getKey(), log.get(l.getValue()).values().stream().mapToLong(e -> e.replicaInfos.get(l.getKey()).size).sum());
return Stream.of(result);
});
var partitionSegmentSize = ClusterUtil.toSingleMap(partitionSegmentSizeStream);
var resultTopicMetricsStream = internalTopic.keySet().stream().flatMap(k -> {
Map<String, InternalTopic> result = new HashMap<>();
result.put(k, internalTopic.get(k).toBuilder()
.segmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).mapToLong(Map.Entry::getValue).sum())
.partitionSegmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).build());
return Stream.of(result);
});
var resultBrokerMetricsStream = clusterMetrics.getInternalBrokerMetrics().entrySet().stream().map(
e -> {
var brokerSegmentSize = log.get(e.getKey()).values().stream()
.mapToLong(v -> v.replicaInfos.values().stream()
.mapToLong(r -> r.size).sum()).sum();
InternalBrokerMetrics tempBrokerMetrics = InternalBrokerMetrics.builder().segmentSize(brokerSegmentSize).build();
return Collections.singletonMap(e.getKey(), tempBrokerMetrics);
});
var resultClusterMetrics = clusterMetrics.toBuilder()
.internalBrokerMetrics(ClusterUtil.toSingleMap(resultBrokerMetricsStream))
.segmentSize(partitionSegmentSize.values().stream().reduce(Long::sum).orElseThrow())
.build();
return InternalSegmentSizeDto.builder()
.clusterMetricsWithSegmentSize(resultClusterMetrics)
.internalTopicWithSegmentSize(ClusterUtil.toSingleMap(resultTopicMetricsStream)).build();
})
);
}
}

View file

@ -3,8 +3,12 @@ kafka:
-
name: local
bootstrapServers: localhost:29091
zookeeper: localhost:2181
zookeeper: localhost:2183
-
name: secondLocal
bootstrapServers: localhost:29092
zookeeper: localhost:2182
-
name: localReplica
bootstrapServers: localhost:29093
zookeeper: localhost:2183

View file

@ -281,6 +281,8 @@ components:
type: integer
outOfSyncReplicasCount:
type: integer
segmentZise:
type: integer
Topic:
type: object