|
@@ -1,37 +1,50 @@
|
|
|
package com.provectus.kafka.ui.service;
|
|
|
|
|
|
-import com.provectus.kafka.ui.model.ConsumerGroup;
|
|
|
+import com.provectus.kafka.ui.exception.TopicMetadataException;
|
|
|
+import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
+import com.provectus.kafka.ui.model.CleanupPolicy;
|
|
|
+import com.provectus.kafka.ui.model.CreateTopicMessage;
|
|
|
import com.provectus.kafka.ui.model.ExtendedAdminClient;
|
|
|
import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
|
|
|
import com.provectus.kafka.ui.model.InternalBrokerMetrics;
|
|
|
import com.provectus.kafka.ui.model.InternalClusterMetrics;
|
|
|
+import com.provectus.kafka.ui.model.InternalConsumerGroup;
|
|
|
import com.provectus.kafka.ui.model.InternalPartition;
|
|
|
+import com.provectus.kafka.ui.model.InternalReplica;
|
|
|
import com.provectus.kafka.ui.model.InternalSegmentSizeDto;
|
|
|
import com.provectus.kafka.ui.model.InternalTopic;
|
|
|
import com.provectus.kafka.ui.model.InternalTopicConfig;
|
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
|
import com.provectus.kafka.ui.model.Metric;
|
|
|
+import com.provectus.kafka.ui.model.PartitionsIncrease;
|
|
|
+import com.provectus.kafka.ui.model.ReplicationFactorChange;
|
|
|
import com.provectus.kafka.ui.model.ServerStatus;
|
|
|
-import com.provectus.kafka.ui.model.TopicConsumerGroups;
|
|
|
import com.provectus.kafka.ui.model.TopicCreation;
|
|
|
import com.provectus.kafka.ui.model.TopicUpdate;
|
|
|
+import com.provectus.kafka.ui.serde.DeserializationService;
|
|
|
+import com.provectus.kafka.ui.serde.RecordSerDe;
|
|
|
import com.provectus.kafka.ui.util.ClusterUtil;
|
|
|
import com.provectus.kafka.ui.util.JmxClusterUtil;
|
|
|
import com.provectus.kafka.ui.util.JmxMetricsName;
|
|
|
import com.provectus.kafka.ui.util.JmxMetricsValueName;
|
|
|
import java.math.BigDecimal;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.LongSummaryStatistics;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.Properties;
|
|
|
+import java.util.UUID;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
+import lombok.Setter;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
|
import org.apache.kafka.clients.admin.AdminClient;
|
|
@@ -39,17 +52,23 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
|
|
|
import org.apache.kafka.clients.admin.AlterConfigOp;
|
|
|
import org.apache.kafka.clients.admin.Config;
|
|
|
import org.apache.kafka.clients.admin.ConfigEntry;
|
|
|
-import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
|
|
import org.apache.kafka.clients.admin.ConsumerGroupListing;
|
|
|
import org.apache.kafka.clients.admin.ListTopicsOptions;
|
|
|
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
|
|
+import org.apache.kafka.clients.admin.NewPartitions;
|
|
|
import org.apache.kafka.clients.admin.NewTopic;
|
|
|
import org.apache.kafka.clients.admin.RecordsToDelete;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
|
+import org.apache.kafka.clients.producer.KafkaProducer;
|
|
|
+import org.apache.kafka.clients.producer.ProducerConfig;
|
|
|
+import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
+import org.apache.kafka.clients.producer.RecordMetadata;
|
|
|
import org.apache.kafka.common.Node;
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
|
+import org.apache.kafka.common.serialization.ByteArraySerializer;
|
|
|
import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
@@ -71,6 +90,8 @@ public class KafkaService {
|
|
|
private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
|
|
|
private final JmxClusterUtil jmxClusterUtil;
|
|
|
private final ClustersStorage clustersStorage;
|
|
|
+ private final DeserializationService deserializationService;
|
|
|
+ @Setter // used in tests
|
|
|
@Value("${kafka.admin-client-timeout}")
|
|
|
private int clientTimeout;
|
|
|
|
|
@@ -90,13 +111,16 @@ public class KafkaService {
|
|
|
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
|
|
|
return getOrCreateAdminClient(cluster)
|
|
|
.flatMap(
|
|
|
- ac -> getClusterMetrics(ac.getAdminClient())
|
|
|
- .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
|
|
|
- .flatMap(clusterMetrics ->
|
|
|
- getTopicsData(ac.getAdminClient()).flatMap(it ->
|
|
|
- updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it)
|
|
|
- ).map(segmentSizeDto -> buildFromData(cluster, segmentSizeDto))
|
|
|
- )
|
|
|
+ ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap(
|
|
|
+ version ->
|
|
|
+ getClusterMetrics(ac.getAdminClient())
|
|
|
+ .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
|
|
|
+ .flatMap(clusterMetrics ->
|
|
|
+ getTopicsData(ac.getAdminClient()).flatMap(it ->
|
|
|
+ updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it)
|
|
|
+ ).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto))
|
|
|
+ )
|
|
|
+ )
|
|
|
).onErrorResume(
|
|
|
e -> Mono.just(cluster.toBuilder()
|
|
|
.status(ServerStatus.OFFLINE)
|
|
@@ -106,10 +130,12 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
private KafkaCluster buildFromData(KafkaCluster currentCluster,
|
|
|
+ String version,
|
|
|
InternalSegmentSizeDto segmentSizeDto) {
|
|
|
|
|
|
var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
|
|
|
var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize();
|
|
|
+ var brokersIds = new ArrayList<>(brokersMetrics.getInternalBrokerMetrics().keySet());
|
|
|
|
|
|
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
|
|
|
brokersMetrics.toBuilder();
|
|
@@ -135,15 +161,18 @@ public class KafkaService {
|
|
|
.onlinePartitionCount(topicsMetrics.getOnlinePartitionCount())
|
|
|
.offlinePartitionCount(topicsMetrics.getOfflinePartitionCount())
|
|
|
.zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zookeeperStatus))
|
|
|
+ .version(version)
|
|
|
.build();
|
|
|
|
|
|
return currentCluster.toBuilder()
|
|
|
+ .version(version)
|
|
|
.status(ServerStatus.ONLINE)
|
|
|
.zookeeperStatus(zookeeperStatus)
|
|
|
.lastZookeeperException(zookeeperException)
|
|
|
.lastKafkaException(null)
|
|
|
.metrics(clusterMetrics)
|
|
|
.topics(topics)
|
|
|
+ .brokers(brokersIds)
|
|
|
.build();
|
|
|
}
|
|
|
|
|
@@ -179,12 +208,18 @@ public class KafkaService {
|
|
|
|
|
|
private Map<String, InternalTopic> mergeWithConfigs(
|
|
|
List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
|
|
|
- return topics.stream().map(
|
|
|
- t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build()
|
|
|
- ).collect(Collectors.toMap(
|
|
|
- InternalTopic::getName,
|
|
|
- e -> e
|
|
|
- ));
|
|
|
+ return topics.stream()
|
|
|
+ .map(t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build())
|
|
|
+ .map(t -> t.toBuilder().cleanUpPolicy(
|
|
|
+ CleanupPolicy.fromString(t.getTopicConfigs().stream()
|
|
|
+ .filter(config -> config.getName().equals("cleanup.policy"))
|
|
|
+ .findFirst()
|
|
|
+ .orElseGet(() -> InternalTopicConfig.builder().value("unknown").build())
|
|
|
+ .getValue())).build())
|
|
|
+ .collect(Collectors.toMap(
|
|
|
+ InternalTopic::getName,
|
|
|
+ e -> e
|
|
|
+ ));
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@@ -197,11 +232,12 @@ public class KafkaService {
|
|
|
final Mono<Map<String, List<InternalTopicConfig>>> configsMono =
|
|
|
loadTopicsConfig(adminClient, topics);
|
|
|
|
|
|
- return ClusterUtil.toMono(adminClient.describeTopics(topics).all()).map(
|
|
|
- m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())
|
|
|
- ).flatMap(internalTopics -> configsMono.map(configs ->
|
|
|
- mergeWithConfigs(internalTopics, configs).values()
|
|
|
- )).flatMapMany(Flux::fromIterable);
|
|
|
+ return ClusterUtil.toMono(adminClient.describeTopics(topics).all())
|
|
|
+ .map(m -> m.values().stream()
|
|
|
+ .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()))
|
|
|
+ .flatMap(internalTopics -> configsMono
|
|
|
+ .map(configs -> mergeWithConfigs(internalTopics, configs).values()))
|
|
|
+ .flatMapMany(Flux::fromIterable);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -234,10 +270,12 @@ public class KafkaService {
|
|
|
topicData.getReplicationFactor().shortValue());
|
|
|
newTopic.configs(topicData.getConfigs());
|
|
|
return createTopic(adminClient, newTopic).map(v -> topicData);
|
|
|
- }).flatMap(
|
|
|
- topicData ->
|
|
|
- getTopicsData(adminClient, Collections.singleton(topicData.getName()))
|
|
|
- .next()
|
|
|
+ })
|
|
|
+ .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage())))
|
|
|
+ .flatMap(
|
|
|
+ topicData ->
|
|
|
+ getTopicsData(adminClient, Collections.singleton(topicData.getName()))
|
|
|
+ .next()
|
|
|
).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
|
|
|
.flatMap(t ->
|
|
|
loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
|
|
@@ -299,45 +337,59 @@ public class KafkaService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Mono<Collection<ConsumerGroupDescription>> getConsumerGroupsInternal(
|
|
|
+ public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
|
|
|
KafkaCluster cluster) {
|
|
|
return getOrCreateAdminClient(cluster).flatMap(ac ->
|
|
|
ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
|
|
|
.flatMap(s ->
|
|
|
- ClusterUtil.toMono(
|
|
|
- ac.getAdminClient().describeConsumerGroups(
|
|
|
- s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())
|
|
|
- ).all()
|
|
|
- ).map(Map::values)
|
|
|
+ getConsumerGroupsInternal(
|
|
|
+ cluster,
|
|
|
+ s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()))
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
|
|
|
+ KafkaCluster cluster, List<String> groupIds) {
|
|
|
+
|
|
|
+ return getOrCreateAdminClient(cluster).flatMap(ac ->
|
|
|
+ ClusterUtil.toMono(
|
|
|
+ ac.getAdminClient().describeConsumerGroups(groupIds).all()
|
|
|
+ ).map(Map::values)
|
|
|
+ ).flatMap(descriptions ->
|
|
|
+ Flux.fromIterable(descriptions)
|
|
|
+ .parallel()
|
|
|
+ .flatMap(d ->
|
|
|
+ groupMetadata(cluster, d.groupId())
|
|
|
+ .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets))
|
|
|
)
|
|
|
+ .sequential()
|
|
|
+ .collectList()
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
|
|
|
- return getConsumerGroupsInternal(cluster)
|
|
|
- .map(c -> c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList()));
|
|
|
- }
|
|
|
+ public Mono<List<InternalConsumerGroup>> getConsumerGroups(
|
|
|
+ KafkaCluster cluster, Optional<String> topic, List<String> groupIds) {
|
|
|
+ final Mono<List<InternalConsumerGroup>> consumerGroups;
|
|
|
|
|
|
- public Mono<TopicConsumerGroups> getTopicConsumerGroups(KafkaCluster cluster, String topic) {
|
|
|
- final Map<TopicPartition, Long> endOffsets = topicEndOffsets(cluster, topic);
|
|
|
+ if (groupIds.isEmpty()) {
|
|
|
+ consumerGroups = getConsumerGroupsInternal(cluster);
|
|
|
+ } else {
|
|
|
+ consumerGroups = getConsumerGroupsInternal(cluster, groupIds);
|
|
|
+ }
|
|
|
|
|
|
- return getConsumerGroupsInternal(cluster)
|
|
|
- .flatMapIterable(c ->
|
|
|
+ return consumerGroups.map(c ->
|
|
|
c.stream()
|
|
|
.map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic))
|
|
|
.filter(Optional::isPresent)
|
|
|
.map(Optional::get)
|
|
|
- .map(d ->
|
|
|
- groupMetadata(cluster, d.groupId())
|
|
|
- .flatMapIterable(meta ->
|
|
|
- d.members().stream().flatMap(m ->
|
|
|
- ClusterUtil.convertToConsumerTopicPartitionDetails(
|
|
|
- m, meta, endOffsets, d.groupId()
|
|
|
- ).stream()
|
|
|
- ).collect(Collectors.toList())
|
|
|
- )
|
|
|
- ).collect(Collectors.toList())
|
|
|
- ).flatMap(f -> f).collectList().map(l -> new TopicConsumerGroups().consumers(l));
|
|
|
+ .map(g ->
|
|
|
+ g.toBuilder().endOffsets(
|
|
|
+ topicPartitionsEndOffsets(cluster, g.getOffsets().keySet())
|
|
|
+ ).build()
|
|
|
+ )
|
|
|
+ .collect(Collectors.toList())
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
|
|
@@ -349,16 +401,6 @@ public class KafkaService {
|
|
|
).flatMap(ClusterUtil::toMono);
|
|
|
}
|
|
|
|
|
|
- public Map<TopicPartition, Long> topicEndOffsets(
|
|
|
- KafkaCluster cluster, String topic) {
|
|
|
- try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
|
|
|
- final List<TopicPartition> topicPartitions = consumer.partitionsFor(topic).stream()
|
|
|
- .map(i -> new TopicPartition(i.topic(), i.partition()))
|
|
|
- .collect(Collectors.toList());
|
|
|
- return consumer.endOffsets(topicPartitions);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
public Map<TopicPartition, Long> topicPartitionsEndOffsets(
|
|
|
KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
|
|
|
try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
|
|
@@ -367,13 +409,19 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
|
|
|
+ return createConsumer(cluster, Map.of());
|
|
|
+ }
|
|
|
+
|
|
|
+ public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
|
|
|
+ Map<String, Object> properties) {
|
|
|
Properties props = new Properties();
|
|
|
props.putAll(cluster.getProperties());
|
|
|
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
|
|
|
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID());
|
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
+ props.putAll(properties);
|
|
|
|
|
|
return new KafkaConsumer<>(props);
|
|
|
}
|
|
@@ -487,7 +535,7 @@ public class KafkaService {
|
|
|
final Map<Integer, LongSummaryStatistics> brokerStats =
|
|
|
topicPartitions.stream().collect(
|
|
|
Collectors.groupingBy(
|
|
|
- t -> t.getT1(),
|
|
|
+ Tuple2::getT1,
|
|
|
Collectors.summarizingLong(Tuple3::getT3)
|
|
|
)
|
|
|
);
|
|
@@ -631,5 +679,212 @@ public class KafkaService {
|
|
|
.map(ac -> ac.deleteRecords(records)).then();
|
|
|
}
|
|
|
|
|
|
+ public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
|
|
|
+ CreateTopicMessage msg) {
|
|
|
+ RecordSerDe serde =
|
|
|
+ deserializationService.getRecordDeserializerForCluster(cluster);
|
|
|
+
|
|
|
+ Properties properties = new Properties();
|
|
|
+ properties.putAll(cluster.getProperties());
|
|
|
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
|
|
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
|
|
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
|
|
+ try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
|
|
|
+ final ProducerRecord<byte[], byte[]> producerRecord = serde.serialize(
|
|
|
+ topic,
|
|
|
+ msg.getKey(),
|
|
|
+ msg.getContent(),
|
|
|
+ msg.getPartition()
|
|
|
+ );
|
|
|
+
|
|
|
+ CompletableFuture<RecordMetadata> cf = new CompletableFuture<>();
|
|
|
+ producer.send(producerRecord, (metadata, exception) -> {
|
|
|
+ if (exception != null) {
|
|
|
+ cf.completeExceptionally(exception);
|
|
|
+ } else {
|
|
|
+ cf.complete(metadata);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return Mono.fromFuture(cf);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<InternalTopic> increaseTopicPartitions(AdminClient adminClient,
|
|
|
+ String topicName,
|
|
|
+ Map<String, NewPartitions> newPartitionsMap
|
|
|
+ ) {
|
|
|
+ return ClusterUtil.toMono(adminClient.createPartitions(newPartitionsMap).all(), topicName)
|
|
|
+ .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<InternalTopic> increaseTopicPartitions(
|
|
|
+ KafkaCluster cluster,
|
|
|
+ String topicName,
|
|
|
+ PartitionsIncrease partitionsIncrease) {
|
|
|
+ return getOrCreateAdminClient(cluster)
|
|
|
+ .flatMap(ac -> {
|
|
|
+ Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount();
|
|
|
+ Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
|
|
|
+
|
|
|
+ if (requestedCount < actualCount) {
|
|
|
+ return Mono.error(
|
|
|
+ new ValidationException(String.format(
|
|
|
+ "Topic currently has %s partitions, which is higher than the requested %s.",
|
|
|
+ actualCount, requestedCount)));
|
|
|
+ }
|
|
|
+ if (requestedCount.equals(actualCount)) {
|
|
|
+ return Mono.error(
|
|
|
+ new ValidationException(
|
|
|
+ String.format("Topic already has %s partitions.", actualCount)));
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, NewPartitions> newPartitionsMap = Collections.singletonMap(
|
|
|
+ topicName,
|
|
|
+ NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount())
|
|
|
+ );
|
|
|
+ return increaseTopicPartitions(ac.getAdminClient(), topicName, newPartitionsMap);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<InternalTopic> changeReplicationFactor(
|
|
|
+ AdminClient adminClient,
|
|
|
+ String topicName,
|
|
|
+ Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments
|
|
|
+ ) {
|
|
|
+ return ClusterUtil.toMono(adminClient
|
|
|
+ .alterPartitionReassignments(reassignments).all(), topicName)
|
|
|
+ .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Change topic replication factor, works on brokers versions 5.4.x and higher
|
|
|
+ */
|
|
|
+ public Mono<InternalTopic> changeReplicationFactor(
|
|
|
+ KafkaCluster cluster,
|
|
|
+ String topicName,
|
|
|
+ ReplicationFactorChange replicationFactorChange) {
|
|
|
+ return getOrCreateAdminClient(cluster)
|
|
|
+ .flatMap(ac -> {
|
|
|
+ Integer actual = cluster.getTopics().get(topicName).getReplicationFactor();
|
|
|
+ Integer requested = replicationFactorChange.getTotalReplicationFactor();
|
|
|
+ Integer brokersCount = cluster.getMetrics().getBrokerCount();
|
|
|
+
|
|
|
+ if (requested.equals(actual)) {
|
|
|
+ return Mono.error(
|
|
|
+ new ValidationException(
|
|
|
+ String.format("Topic already has replicationFactor %s.", actual)));
|
|
|
+ }
|
|
|
+ if (requested > brokersCount) {
|
|
|
+ return Mono.error(
|
|
|
+ new ValidationException(
|
|
|
+ String.format("Requested replication factor %s more than brokers count %s.",
|
|
|
+ requested, brokersCount)));
|
|
|
+ }
|
|
|
+ return changeReplicationFactor(ac.getAdminClient(), topicName,
|
|
|
+ getPartitionsReassignments(cluster, topicName,
|
|
|
+ replicationFactorChange));
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsReassignments(
|
|
|
+ KafkaCluster cluster,
|
|
|
+ String topicName,
|
|
|
+ ReplicationFactorChange replicationFactorChange) {
|
|
|
+ // Current assignment map (Partition number -> List of brokers)
|
|
|
+ Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(cluster, topicName);
|
|
|
+ // Brokers map (Broker id -> count)
|
|
|
+ Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment);
|
|
|
+ int currentReplicationFactor = cluster.getTopics().get(topicName).getReplicationFactor();
|
|
|
+
|
|
|
+ // If we should to increase Replication factor
|
|
|
+ if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
|
|
|
+ // For each partition
|
|
|
+ for (var assignmentList : currentAssignment.values()) {
|
|
|
+ // Get brokers list sorted by usage
|
|
|
+ var brokers = brokersUsage.entrySet().stream()
|
|
|
+ .sorted(Map.Entry.comparingByValue())
|
|
|
+ .map(Map.Entry::getKey)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ // Iterate brokers and try to add them in assignment
|
|
|
+ // while (partition replicas count != requested replication factor)
|
|
|
+ for (Integer broker : brokers) {
|
|
|
+ if (!assignmentList.contains(broker)) {
|
|
|
+ assignmentList.add(broker);
|
|
|
+ brokersUsage.merge(broker, 1, Integer::sum);
|
|
|
+ }
|
|
|
+ if (assignmentList.size() == replicationFactorChange.getTotalReplicationFactor()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (assignmentList.size() != replicationFactorChange.getTotalReplicationFactor()) {
|
|
|
+ throw new ValidationException("Something went wrong during adding replicas");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we should to decrease Replication factor
|
|
|
+ } else if (replicationFactorChange.getTotalReplicationFactor() < currentReplicationFactor) {
|
|
|
+ for (Map.Entry<Integer, List<Integer>> assignmentEntry : currentAssignment.entrySet()) {
|
|
|
+ var partition = assignmentEntry.getKey();
|
|
|
+ var brokers = assignmentEntry.getValue();
|
|
|
+
|
|
|
+ // Get brokers list sorted by usage in reverse order
|
|
|
+ var brokersUsageList = brokersUsage.entrySet().stream()
|
|
|
+ .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
|
|
|
+ .map(Map.Entry::getKey)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ // Iterate brokers and try to remove them from assignment
|
|
|
+ // while (partition replicas count != requested replication factor)
|
|
|
+ for (Integer broker : brokersUsageList) {
|
|
|
+ // Check is the broker the leader of partition
|
|
|
+ if (!cluster.getTopics().get(topicName).getPartitions().get(partition).getLeader()
|
|
|
+ .equals(broker)) {
|
|
|
+ brokers.remove(broker);
|
|
|
+ brokersUsage.merge(broker, -1, Integer::sum);
|
|
|
+ }
|
|
|
+ if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) {
|
|
|
+ throw new ValidationException("Something went wrong during removing replicas");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new ValidationException("Replication factor already equals requested");
|
|
|
+ }
|
|
|
+
|
|
|
+ // Return result map
|
|
|
+ return currentAssignment.entrySet().stream().collect(Collectors.toMap(
|
|
|
+ e -> new TopicPartition(topicName, e.getKey()),
|
|
|
+ e -> Optional.of(new NewPartitionReassignment(e.getValue()))
|
|
|
+ ));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<Integer, List<Integer>> getCurrentAssignment(KafkaCluster cluster, String topicName) {
|
|
|
+ return cluster.getTopics().get(topicName).getPartitions().values().stream()
|
|
|
+ .collect(Collectors.toMap(
|
|
|
+ InternalPartition::getPartition,
|
|
|
+ p -> p.getReplicas().stream()
|
|
|
+ .map(InternalReplica::getBroker)
|
|
|
+ .collect(Collectors.toList())
|
|
|
+ ));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
|
|
|
+ Map<Integer, List<Integer>> currentAssignment) {
|
|
|
+ Map<Integer, Integer> result = cluster.getBrokers().stream()
|
|
|
+ .collect(Collectors.toMap(
|
|
|
+ c -> c,
|
|
|
+ c -> 0
|
|
|
+ ));
|
|
|
+ currentAssignment.values().forEach(brokers -> brokers
|
|
|
+ .forEach(broker -> result.put(broker, result.get(broker) + 1)));
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
|
|
|
}
|