Update topic info after create/update ops (#114)

This commit is contained in:
German Osin 2020-11-09 14:24:45 +03:00 committed by GitHub
parent 88cc301bb6
commit 198c97403f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 151 additions and 116 deletions

View file

@ -6,6 +6,9 @@ import com.provectus.kafka.ui.model.*;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import java.util.Map;
import java.util.stream.Collectors;
@Mapper(componentModel = "spring")
public interface ClusterMapper {
@ -19,7 +22,13 @@ public interface ClusterMapper {
ClusterMetrics toClusterMetrics(InternalClusterMetrics metrics);
BrokerMetrics toBrokerMetrics(InternalBrokerMetrics metrics);
Topic toTopic(InternalTopic topic);
Partition toPartition(InternalPartition topic);
TopicDetails toTopicDetails(InternalTopic topic);
TopicConfig toTopicConfig(InternalTopicConfig topic);
Replica toReplica(InternalReplica replica);
default java.util.List<Partition> map(Map<Integer, InternalPartition> map) {
return map.values().stream().map(this::toPartition).collect(Collectors.toList());
}
}

View file

@ -22,7 +22,7 @@ public class InternalClusterMetrics {
private final int outOfSyncReplicasCount;
private final Map<String, Number> bytesInPerSec;
private final Map<String, Number> bytesOutPerSec;
private final int segmentCount;
private final long segmentCount;
private final long segmentSize;
private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
private final List<Metric> metrics;

View file

@ -15,4 +15,6 @@ public class InternalPartition {
private final int replicasCount;
private final long offsetMin;
private final long offsetMax;
private final long segmentSize;
private final long segmentCount;
}

View file

@ -13,7 +13,7 @@ public class InternalTopic {
private final String name;
private final boolean internal;
private final List<InternalPartition> partitions;
private final Map<Integer,InternalPartition> partitions;
private final List<InternalTopicConfig> topicConfigs;
private final int replicas;
@ -22,6 +22,6 @@ public class InternalTopic {
private final int replicationFactor;
private final int underReplicatedPartitions;
private final long segmentSize;
private final int segmentCount;
private final Map<TopicPartition, Long> partitionSegmentSize;
private final long segmentCount;
// private final Map<TopicPartition, Long> partitionSegmentSize;
}

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.ConsumerPosition;
import com.provectus.kafka.ui.cluster.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
@ -83,10 +84,12 @@ public class ClusterService {
.map(t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
}
public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
return clustersStorage.getClusterByName(name).map(
cluster -> kafkaService.createTopic(cluster, topicFormData)
).orElse(Mono.empty()).map(clusterMapper::toTopic);
public Mono<Topic> createTopic(String clusterName, Mono<TopicFormData> topicFormData) {
return clustersStorage.getClusterByName(clusterName).map(cluster ->
kafkaService.createTopic(cluster, topicFormData)
.doOnNext(t -> updateCluster(t, clusterName, cluster))
.map(clusterMapper::toTopic)
).orElse(Mono.empty());
}
@SneakyThrows
@ -151,18 +154,15 @@ public class ClusterService {
return clustersStorage.getClusterByName(clusterName).map(cl ->
topicFormData
.flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
.doOnNext(t -> updateCluster(t, clusterName, cl))
.map(clusterMapper::toTopic)
.flatMap(t -> updateCluster(t, clusterName, cl))
)
.orElse(Mono.empty());
).orElse(Mono.empty());
}
private <T> Mono<T> updateCluster(T topic, String clusterName, KafkaCluster cluster) {
return kafkaService.getUpdatedCluster(cluster)
.map(c -> {
clustersStorage.setKafkaCluster(clusterName, c);
return topic;
});
private KafkaCluster updateCluster(InternalTopic topic, String clusterName, KafkaCluster cluster) {
final KafkaCluster updatedCluster = kafkaService.getUpdatedCluster(cluster, topic);
clustersStorage.setKafkaCluster(clusterName, updatedCluster);
return updatedCluster;
}
public Flux<TopicMessage> getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) {

View file

@ -1,25 +1,5 @@
package com.provectus.kafka.ui.cluster.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.cluster.deserialization.DeserializationService;
@ -30,11 +10,26 @@ import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.SeekType;
import com.provectus.kafka.ui.model.TopicMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@Service
@Log4j2
@RequiredArgsConstructor
@ -128,7 +123,7 @@ public class ConsumingService {
return Optional.ofNullable(cluster.getTopics().get(topic))
.orElseThrow(() -> new IllegalArgumentException("Unknown topic: " + topic))
.getPartitions().stream()
.getPartitions().values().stream()
.filter(internalPartition -> partitionPositions.isEmpty() || partitionPositions.containsKey(internalPartition.getPartition()))
.map(partitionInfo -> new TopicPartition(topic, partitionInfo.getPartition()))
.collect(Collectors.toList());

View file

@ -129,7 +129,10 @@ public class ClusterUtil {
.mapToInt(InternalPartition::getReplicasCount)
.sum();
topic.partitions(partitions);
topic.partitions(partitions.stream().collect(Collectors.toMap(
InternalPartition::getPartition,
t -> t
)));
topic.replicas(replicasCount);
topic.partitionCount(topicDescription.partitions().size());
topic.inSyncReplicas(inSyncReplicasCount);

View file

@ -3,7 +3,10 @@ package com.provectus.kafka.ui.kafka;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.cluster.util.JmxClusterUtil;
import com.provectus.kafka.ui.model.*;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.Metric;
import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.TopicFormData;
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@ -11,7 +14,6 @@ import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
@ -19,8 +21,10 @@ import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import java.util.*;
@ -40,10 +44,15 @@ public class KafkaService {
private final ZookeeperService zookeeperService;
private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
private final JmxClusterUtil jmxClusterUtil;
private final ClustersStorage clustersStorage;
public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) {
final Map<String, InternalTopic> topics = new HashMap<>(cluster.getTopics());
topics.put(updatedTopic.getName(), updatedTopic);
return cluster.toBuilder().topics(topics).build();
}
@SneakyThrows
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
return getOrCreateAdminClient(cluster)
@ -51,10 +60,8 @@ public class KafkaService {
ac -> getClusterMetrics(ac.getAdminClient())
.flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
.flatMap( clusterMetrics ->
getTopicsData(ac.getAdminClient()).flatMap( topics ->
loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
.map( configs -> mergeWithConfigs(topics, configs))
.flatMap(it -> updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it))
getTopicsData(ac.getAdminClient()).flatMap( it ->
updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it)
).map( segmentSizeDto -> buildFromData(cluster, segmentSizeDto))
)
).onErrorResume(
@ -116,8 +123,8 @@ public class KafkaService {
underReplicatedPartitions += topic.getUnderReplicatedPartitions();
inSyncReplicasCount += topic.getInSyncReplicas();
outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas());
onlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1).sum();
offlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1).sum();
onlinePartitionCount += topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1).sum();
offlinePartitionCount += topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1).sum();
}
return InternalClusterMetrics.builder()
@ -142,21 +149,20 @@ public class KafkaService {
@SneakyThrows
private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names())
.flatMap(topics -> ClusterUtil.toMono(adminClient.describeTopics(topics).all()))
.map(topic -> {
var leadersMap = topic.values().stream()
.flatMap(t -> t.partitions().stream()
.flatMap(t1 -> {
Map<TopicPartition, Integer> result = new HashMap<>();
result.put(new TopicPartition(t.name(), t1.partition()), t1.leader().id());
return Stream.of(result);
}));
leadersCache.put(adminClient, ClusterUtil.toSingleMap(leadersMap));
return topic;
})
.map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
.flatMap(topics -> getTopicsData(adminClient, topics).collectList());
}
private Flux<InternalTopic> getTopicsData(AdminClient adminClient, Collection<String> topics) {
final Mono<Map<String, List<InternalTopicConfig>>> configsMono = loadTopicsConfig(adminClient, topics);
return ClusterUtil.toMono(adminClient.describeTopics(topics).all()).map(
m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())
).flatMap( internalTopics -> configsMono.map(configs ->
mergeWithConfigs(internalTopics, configs).values()
)).flatMapMany(Flux::fromIterable);
}
private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
return ClusterUtil.toMono(client.describeCluster().nodes())
.flatMap(brokers ->
@ -182,12 +188,8 @@ public class KafkaService {
NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
newTopic.configs(topicData.getConfigs());
return createTopic(adminClient, newTopic).map( v -> topicData);
}).flatMap(topicData -> {
var tdw = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
return getTopicDescription(tdw.values().get(topicData.getName()), topicData.getName());
})
}).flatMap(topicData -> getTopicsData(adminClient, Collections.singleton(topicData.getName())).next())
.switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
.map(ClusterUtil::mapToInternalTopic)
.flatMap( t ->
loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
.map( c -> mergeWithConfigs(Collections.singletonList(t), c))
@ -210,18 +212,8 @@ public class KafkaService {
return ExtendedAdminClient.extendedAdminClient(adminClient);
}
private Mono<TopicDescription> getTopicDescription(KafkaFuture<TopicDescription> entry, String topicName) {
return ClusterUtil.toMono(entry)
.onErrorResume(e -> {
log.error("Can't get topic with name: " + topicName);
return Mono.empty();
});
}
@SneakyThrows
private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(AdminClient adminClient, List<String> topicNames) {
private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(AdminClient adminClient, Collection<String> topicNames) {
List<ConfigResource> resources = topicNames.stream()
.map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
.collect(Collectors.toList());
@ -279,8 +271,6 @@ public class KafkaService {
});
}
private Mono<InternalTopic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
return getTopicsData(ac.getAdminClient())
.map(s -> s.stream()
@ -302,45 +292,78 @@ public class KafkaService {
}
private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
return ClusterUtil.toMono(ac.describeTopics(internalTopic.keySet()).all()).flatMap(topic ->
private InternalTopic mergeWithStats(InternalTopic topic, Map<String, LongSummaryStatistics> topics, Map<TopicPartition, LongSummaryStatistics> partitions) {
final LongSummaryStatistics stats = topics.get(topic.getName());
return topic.toBuilder()
.segmentSize(stats.getSum())
.segmentCount(stats.getCount())
.partitions(
topic.getPartitions().entrySet().stream().map(e ->
Tuples.of(e.getKey(), mergeWithStats(topic.getName(), e.getValue(), partitions))
).collect(Collectors.toMap(
Tuple2::getT1,
Tuple2::getT2
))
).build();
}
private InternalPartition mergeWithStats(String topic, InternalPartition partition, Map<TopicPartition, LongSummaryStatistics> partitions) {
final LongSummaryStatistics stats = partitions.get(new TopicPartition(topic, partition.getPartition()));
return partition.toBuilder()
.segmentSize(stats.getSum())
.segmentCount(stats.getCount())
.build();
}
private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, List<InternalTopic> internalTopics) {
List<String> names = internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList());
return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic ->
ClusterUtil.toMono(ac.describeLogDirs(clusterMetrics.getInternalBrokerMetrics().keySet()).all())
.map(log -> {
var partitionSegmentSizeStream = leadersCache.get(ac).entrySet().stream()
.flatMap(l -> {
Map<TopicPartition, Long> result = new HashMap<>();
result.put(l.getKey(), log.get(l.getValue()).values().stream().mapToLong(e -> e.replicaInfos.get(l.getKey()).size).sum());
return Stream.of(result);
});
var partitionSegmentSize = ClusterUtil.toSingleMap(partitionSegmentSizeStream);
var resultTopicMetricsStream = internalTopic.keySet().stream().flatMap(k -> {
Map<String, InternalTopic> result = new HashMap<>();
result.put(k, internalTopic.get(k).toBuilder()
.segmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).mapToLong(Map.Entry::getValue).sum())
.partitionSegmentSize(partitionSegmentSize.entrySet().stream().filter(e -> e.getKey().topic().equals(k)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).build());
return Stream.of(result);
});
final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
log.entrySet().stream().flatMap(b ->
b.getValue().entrySet().stream().flatMap(topicMap ->
topicMap.getValue().replicaInfos.entrySet().stream()
.map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
)
).collect(Collectors.toList());
var resultBrokerMetricsStream = clusterMetrics.getInternalBrokerMetrics().entrySet().stream().map(
e -> {
var brokerSegmentSize = log.get(e.getKey()).values().stream()
.mapToLong(v -> v.replicaInfos.values().stream()
.mapToLong(r -> r.size).sum()).sum();
InternalBrokerMetrics tempBrokerMetrics = e.getValue().toBuilder().segmentSize(brokerSegmentSize).build();
return Collections.singletonMap(e.getKey(), tempBrokerMetrics);
});
final Map<TopicPartition, LongSummaryStatistics> partitionStats = topicPartitions.stream().collect(
Collectors.groupingBy(
Tuple2::getT2,
Collectors.summarizingLong(Tuple3::getT3)
)
);
var resultClusterMetrics = clusterMetrics.toBuilder()
.internalBrokerMetrics(ClusterUtil.toSingleMap(resultBrokerMetricsStream))
.segmentSize(partitionSegmentSize.values().stream().reduce(Long::sum).orElseThrow())
.build();
final Map<String, LongSummaryStatistics> topicStats = topicPartitions.stream().collect(
Collectors.groupingBy(
t -> t.getT2().topic(),
Collectors.summarizingLong(Tuple3::getT3)
)
);
final LongSummaryStatistics summary = topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3));
final Map<String, InternalTopic> resultTopics = internalTopics.stream().map(e ->
Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats))
).collect(Collectors.toMap(
Tuple2::getT1,
Tuple2::getT2
));
return InternalSegmentSizeDto.builder()
.clusterMetricsWithSegmentSize(resultClusterMetrics)
.internalTopicWithSegmentSize(ClusterUtil.toSingleMap(resultTopicMetricsStream)).build();
.clusterMetricsWithSegmentSize(
clusterMetrics.toBuilder()
.segmentSize(summary.getSum())
.segmentCount(summary.getCount())
.build()
)
.internalTopicWithSegmentSize(resultTopics).build();
})
);
);
}
public List<Metric> getJmxMetric(String clusterName, Node node) {
@ -378,12 +401,12 @@ public class KafkaService {
.collect(Collectors.toList())).build();
}
public List<InternalPartition> getTopicPartitions(KafkaCluster c, InternalTopic topic ) {
var tps = topic.getPartitions().stream()
public Map<Integer, InternalPartition> getTopicPartitions(KafkaCluster c, InternalTopic topic ) {
var tps = topic.getPartitions().values().stream()
.map(t -> new TopicPartition(topic.getName(), t.getPartition()))
.collect(Collectors.toList());
Map<Integer, InternalPartition> partitions =
topic.getPartitions().stream().collect(Collectors.toMap(
topic.getPartitions().values().stream().collect(Collectors.toMap(
InternalPartition::getPartition,
tp -> tp
));
@ -397,9 +420,12 @@ public class KafkaService {
.offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L))
.offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L))
.build()
).collect(Collectors.toList());
).collect(Collectors.toMap(
InternalPartition::getPartition,
tp -> tp
));
} catch (Exception e) {
return Collections.emptyList();
return Collections.emptyMap();
}
}
}