From 0ec40dd7e46c9a3a0645070e861bc24e63c7eef8 Mon Sep 17 00:00:00 2001 From: Marsel <49659004+MarselAhmetov@users.noreply.github.com> Date: Wed, 7 Jul 2021 17:59:52 +0300 Subject: [PATCH] Issue#314 replication factor change (#599) * adding replication factor change feature * adding comments for algorithm and some fixes * small fix * checkstyle fixes * fix * adding updateCluster endpoint and algorithm fixes * hot fixes * update algorithm and add brokers list to KafkaCluster * pull request fix Co-authored-by: marselakhmetov --- .../ui/controller/ClustersController.java | 6 + .../kafka/ui/controller/TopicsController.java | 11 ++ .../kafka/ui/model/KafkaCluster.java | 1 + .../kafka/ui/service/ClusterService.java | 25 +++ .../kafka/ui/service/KafkaService.java | 149 +++++++++++++++++- .../main/resources/swagger/kafka-ui-api.yaml | 76 +++++++++ 6 files changed, 267 insertions(+), 1 deletion(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java index fea8f24c88..cb3b3f2674 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java @@ -39,4 +39,10 @@ public class ClustersController implements ClustersApi { public Mono>> getClusters(ServerWebExchange exchange) { return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters()))); } + + @Override + public Mono> updateClusterInfo(String clusterName, + ServerWebExchange exchange) { + return clusterService.updateCluster(clusterName).map(ResponseEntity::ok); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index d456e092d9..707e2fac41 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -3,6 +3,8 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.TopicsApi; import com.provectus.kafka.ui.model.PartitionsIncrease; import com.provectus.kafka.ui.model.PartitionsIncreaseResponse; +import com.provectus.kafka.ui.model.ReplicationFactorChange; +import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse; import com.provectus.kafka.ui.model.Topic; import com.provectus.kafka.ui.model.TopicColumnsToSort; import com.provectus.kafka.ui.model.TopicConfig; @@ -98,4 +100,13 @@ public class TopicsController implements TopicsApi { partitions -> clusterService.increaseTopicPartitions(clusterName, topicName, partitions)) .map(ResponseEntity::ok); } + + @Override + public Mono> changeReplicationFactor( + String clusterName, String topicName, Mono replicationFactorChange, + ServerWebExchange exchange) { + return replicationFactorChange + .flatMap(rfc -> clusterService.changeReplicationFactor(clusterName, topicName, rfc)) + .map(ResponseEntity::ok); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index cc0477f6bb..805f0937e9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -23,6 +23,7 @@ public class KafkaCluster { private final ServerStatus zookeeperStatus; private final InternalClusterMetrics metrics; private final Map topics; + private final List brokers; private final Throwable lastKafkaException; private final Throwable lastZookeeperException; private final Path protobufFile; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 6168462acc..c68c5900ca 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -19,6 +19,8 @@ import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.PartitionsIncrease; import com.provectus.kafka.ui.model.PartitionsIncreaseResponse; +import com.provectus.kafka.ui.model.ReplicationFactorChange; +import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse; import com.provectus.kafka.ui.model.Topic; import com.provectus.kafka.ui.model.TopicColumnsToSort; import com.provectus.kafka.ui.model.TopicConfig; @@ -268,6 +270,15 @@ public class ClusterService { return updatedCluster; } + public Mono updateCluster(String clusterName) { + return clustersStorage.getClusterByName(clusterName) + .map(cluster -> kafkaService.getUpdatedCluster(cluster) + .doOnNext(updatedCluster -> clustersStorage + .setKafkaCluster(updatedCluster.getName(), updatedCluster)) + .map(clusterMapper::toCluster)) + .orElse(Mono.error(new ClusterNotFoundException())); + } + public Flux getMessages(String clusterName, String topicName, ConsumerPosition consumerPosition, String query, Integer limit) { @@ -345,4 +356,18 @@ public class ClusterService { return Mono.error(e); } } + + public Mono changeReplicationFactor( + String clusterName, + String topicName, + ReplicationFactorChange replicationFactorChange) { + return clustersStorage.getClusterByName(clusterName).map(cluster -> + kafkaService.changeReplicationFactor(cluster, topicName, replicationFactorChange) + .doOnNext(topic -> updateCluster(topic, cluster.getName(), cluster)) + .map(t -> new ReplicationFactorChangeResponse() + .topicName(t.getName()) + .totalReplicationFactor(t.getReplicationFactor()))) + .orElse(Mono.error(new ClusterNotFoundException( + String.format("No cluster for name '%s'", clusterName)))); + } } \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index e772af94ad..9a42ab9a37 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -8,13 +8,14 @@ import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics; import com.provectus.kafka.ui.model.InternalPartition; +import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalSegmentSizeDto; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Metric; import com.provectus.kafka.ui.model.PartitionsIncrease; -import com.provectus.kafka.ui.model.PartitionsIncreaseResponse; +import com.provectus.kafka.ui.model.ReplicationFactorChange; import com.provectus.kafka.ui.model.ServerStatus; import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.model.TopicCreation; @@ -26,8 +27,10 @@ import com.provectus.kafka.ui.util.JmxClusterUtil; import com.provectus.kafka.ui.util.JmxMetricsName; import com.provectus.kafka.ui.util.JmxMetricsValueName; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.LongSummaryStatistics; @@ -51,6 +54,7 @@ import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.RecordsToDelete; @@ -130,6 +134,7 @@ public class KafkaService { var topics = segmentSizeDto.getInternalTopicWithSegmentSize(); var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize(); + var brokersIds = new ArrayList<>(brokersMetrics.getInternalBrokerMetrics().keySet()); InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = brokersMetrics.toBuilder(); @@ -165,6 +170,7 @@ public class KafkaService { .lastKafkaException(null) .metrics(clusterMetrics) .topics(topics) + .brokers(brokersIds) .build(); } @@ -722,4 +728,145 @@ public class KafkaService { }); } + private Mono changeReplicationFactor( + AdminClient adminClient, + String topicName, + Map> reassignments + ) { + return ClusterUtil.toMono(adminClient + .alterPartitionReassignments(reassignments).all(), topicName) + .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next()); + } + + /** + * Change topic replication factor, works on brokers versions 5.4.x and higher + */ + public Mono changeReplicationFactor( + KafkaCluster cluster, + String topicName, + ReplicationFactorChange replicationFactorChange) { + return getOrCreateAdminClient(cluster) + .flatMap(ac -> { + Integer actual = cluster.getTopics().get(topicName).getReplicationFactor(); + Integer requested = replicationFactorChange.getTotalReplicationFactor(); + Integer brokersCount = cluster.getMetrics().getBrokerCount(); + + if (requested.equals(actual)) { + return Mono.error( + new ValidationException( + String.format("Topic already has replicationFactor %s.", actual))); + } + if (requested > brokersCount) { + return Mono.error( + new ValidationException( + String.format("Requested replication factor %s more than brokers count %s.", + requested, brokersCount))); + } + return changeReplicationFactor(ac.getAdminClient(), topicName, + getPartitionsReassignments(cluster, topicName, + replicationFactorChange)); + }); + } + + private Map> getPartitionsReassignments( + KafkaCluster cluster, + String topicName, + ReplicationFactorChange replicationFactorChange) { + // Current assignment map (Partition number -> List of brokers) + Map> currentAssignment = getCurrentAssignment(cluster, topicName); + // Brokers map (Broker id -> count) + Map brokersUsage = getBrokersMap(cluster, currentAssignment); + int currentReplicationFactor = cluster.getTopics().get(topicName).getReplicationFactor(); + + // If we should to increase Replication factor + if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) { + // For each partition + for (var assignmentList : currentAssignment.values()) { + // Get brokers list sorted by usage + var brokers = brokersUsage.entrySet().stream() + .sorted(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + + // Iterate brokers and try to add them in assignment + // while (partition replicas count != requested replication factor) + for (Integer broker : brokers) { + if (!assignmentList.contains(broker)) { + assignmentList.add(broker); + brokersUsage.merge(broker, 1, Integer::sum); + } + if (assignmentList.size() == replicationFactorChange.getTotalReplicationFactor()) { + break; + } + } + if (assignmentList.size() != replicationFactorChange.getTotalReplicationFactor()) { + throw new ValidationException("Something went wrong during adding replicas"); + } + } + + // If we should to decrease Replication factor + } else if (replicationFactorChange.getTotalReplicationFactor() < currentReplicationFactor) { + for (Map.Entry> assignmentEntry : currentAssignment.entrySet()) { + var partition = assignmentEntry.getKey(); + var brokers = assignmentEntry.getValue(); + + // Get brokers list sorted by usage in reverse order + var brokersUsageList = brokersUsage.entrySet().stream() + .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + + // Iterate brokers and try to remove them from assignment + // while (partition replicas count != requested replication factor) + for (Integer broker : brokersUsageList) { + // Check is the broker the leader of partition + if (!cluster.getTopics().get(topicName).getPartitions().get(partition).getLeader() + .equals(broker)) { + brokers.remove(broker); + brokersUsage.merge(broker, -1, Integer::sum); + } + if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) { + break; + } + } + if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) { + throw new ValidationException("Something went wrong during removing replicas"); + } + } + } else { + throw new ValidationException("Replication factor already equals requested"); + } + + // Return result map + return currentAssignment.entrySet().stream().collect(Collectors.toMap( + e -> new TopicPartition(topicName, e.getKey()), + e -> Optional.of(new NewPartitionReassignment(e.getValue())) + )); + } + + private Map> getCurrentAssignment(KafkaCluster cluster, String topicName) { + return cluster.getTopics().get(topicName).getPartitions().values().stream() + .collect(Collectors.toMap( + InternalPartition::getPartition, + p -> p.getReplicas().stream() + .map(InternalReplica::getBroker) + .collect(Collectors.toList()) + )); + } + + private Map getBrokersMap(KafkaCluster cluster, + Map> currentAssignment) { + Map result = cluster.getBrokers().stream() + .collect(Collectors.toMap( + c -> c, + c -> 0 + )); + currentAssignment.values().forEach(brokers -> brokers + .forEach(broker -> result.put(broker, result.get(broker) + 1))); + + return result; + } + + + } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index e48609fed2..9c75c110da 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -31,6 +31,30 @@ paths: items: $ref: '#/components/schemas/Cluster' + + /api/clusters/{clusterName}/cache: + post: + tags: + - Clusters + summary: updateClusterInfo + operationId: updateClusterInfo + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Cluster' + 404: + description: Not found + + /api/clusters/{clusterName}/brokers: get: tags: @@ -287,6 +311,38 @@ paths: items: $ref: '#/components/schemas/TopicConfig' + /api/clusters/{clusterName}/topics/{topicName}/replications: + patch: + tags: + - Topics + summary: changeReplicationFactor + operationId: changeReplicationFactor + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ReplicationFactorChange' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ReplicationFactorChangeResponse' + 404: + description: Not found + /api/clusters/{clusterName}/topics/{topicName}/messages: get: tags: @@ -2098,3 +2154,23 @@ components: required: - totalPartitionsCount - topicName + + ReplicationFactorChange: + type: object + properties: + totalReplicationFactor: + type: integer + minimum: 1 + required: + - totalReplicationFactor + + ReplicationFactorChangeResponse: + type: object + properties: + totalReplicationFactor: + type: integer + topicName: + type: string + required: + - totalReplicationFactor + - topicName \ No newline at end of file