ソースを参照

Issue#314 replication factor change (#599)

* adding replication factor change feature

* adding comments for algorithm and some fixes

* small fix

* checkstyle fixes

* fix

* adding updateCluster endpoint and algorithm fixes

* hot fixes

* update algorithm and add brokers list to KafkaCluster

* pull request fix

Co-authored-by: marselakhmetov <makhmetov@provectus.com>
Marsel 4 年 前
コミット
0ec40dd7e4

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java

@@ -39,4 +39,10 @@ public class ClustersController implements ClustersApi {
   public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
     return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters())));
   }
+
+  @Override
+  public Mono<ResponseEntity<Cluster>> updateClusterInfo(String clusterName,
+                                                         ServerWebExchange exchange) {
+    return clusterService.updateCluster(clusterName).map(ResponseEntity::ok);
+  }
 }

+ 11 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -3,6 +3,8 @@ package com.provectus.kafka.ui.controller;
 import com.provectus.kafka.ui.api.TopicsApi;
 import com.provectus.kafka.ui.model.PartitionsIncrease;
 import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
+import com.provectus.kafka.ui.model.ReplicationFactorChange;
+import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse;
 import com.provectus.kafka.ui.model.Topic;
 import com.provectus.kafka.ui.model.TopicColumnsToSort;
 import com.provectus.kafka.ui.model.TopicConfig;
@@ -98,4 +100,13 @@ public class TopicsController implements TopicsApi {
         partitions -> clusterService.increaseTopicPartitions(clusterName, topicName, partitions))
         .map(ResponseEntity::ok);
   }
+
+  @Override
+  public Mono<ResponseEntity<ReplicationFactorChangeResponse>> changeReplicationFactor(
+      String clusterName, String topicName, Mono<ReplicationFactorChange> replicationFactorChange,
+      ServerWebExchange exchange) {
+    return replicationFactorChange
+        .flatMap(rfc -> clusterService.changeReplicationFactor(clusterName, topicName, rfc))
+        .map(ResponseEntity::ok);
+  }
 }

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -23,6 +23,7 @@ public class KafkaCluster {
   private final ServerStatus zookeeperStatus;
   private final InternalClusterMetrics metrics;
   private final Map<String, InternalTopic> topics;
+  private final List<Integer> brokers;
   private final Throwable lastKafkaException;
   private final Throwable lastZookeeperException;
   private final Path protobufFile;

+ 25 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -19,6 +19,8 @@ import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.PartitionsIncrease;
 import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
+import com.provectus.kafka.ui.model.ReplicationFactorChange;
+import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse;
 import com.provectus.kafka.ui.model.Topic;
 import com.provectus.kafka.ui.model.TopicColumnsToSort;
 import com.provectus.kafka.ui.model.TopicConfig;
@@ -268,6 +270,15 @@ public class ClusterService {
     return updatedCluster;
   }
 
+  public Mono<Cluster> updateCluster(String clusterName) {
+    return clustersStorage.getClusterByName(clusterName)
+        .map(cluster -> kafkaService.getUpdatedCluster(cluster)
+            .doOnNext(updatedCluster -> clustersStorage
+                .setKafkaCluster(updatedCluster.getName(), updatedCluster))
+            .map(clusterMapper::toCluster))
+        .orElse(Mono.error(new ClusterNotFoundException()));
+  }
+
   public Flux<TopicMessage> getMessages(String clusterName, String topicName,
                                         ConsumerPosition consumerPosition, String query,
                                         Integer limit) {
@@ -345,4 +356,18 @@ public class ClusterService {
       return Mono.error(e);
     }
   }
+
+  public Mono<ReplicationFactorChangeResponse> changeReplicationFactor(
+      String clusterName,
+      String topicName,
+      ReplicationFactorChange replicationFactorChange) {
+    return clustersStorage.getClusterByName(clusterName).map(cluster ->
+        kafkaService.changeReplicationFactor(cluster, topicName, replicationFactorChange)
+            .doOnNext(topic -> updateCluster(topic, cluster.getName(), cluster))
+            .map(t -> new ReplicationFactorChangeResponse()
+                .topicName(t.getName())
+                .totalReplicationFactor(t.getReplicationFactor())))
+        .orElse(Mono.error(new ClusterNotFoundException(
+            String.format("No cluster for name '%s'", clusterName))));
+  }
 }

+ 148 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java

@@ -8,13 +8,14 @@ import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
 import com.provectus.kafka.ui.model.InternalBrokerMetrics;
 import com.provectus.kafka.ui.model.InternalClusterMetrics;
 import com.provectus.kafka.ui.model.InternalPartition;
+import com.provectus.kafka.ui.model.InternalReplica;
 import com.provectus.kafka.ui.model.InternalSegmentSizeDto;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.Metric;
 import com.provectus.kafka.ui.model.PartitionsIncrease;
-import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
+import com.provectus.kafka.ui.model.ReplicationFactorChange;
 import com.provectus.kafka.ui.model.ServerStatus;
 import com.provectus.kafka.ui.model.TopicConsumerGroups;
 import com.provectus.kafka.ui.model.TopicCreation;
@@ -26,8 +27,10 @@ import com.provectus.kafka.ui.util.JmxClusterUtil;
 import com.provectus.kafka.ui.util.JmxMetricsName;
 import com.provectus.kafka.ui.util.JmxMetricsValueName;
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.LongSummaryStatistics;
@@ -51,6 +54,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
 import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
 import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.RecordsToDelete;
@@ -130,6 +134,7 @@ public class KafkaService {
 
     var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
     var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize();
+    var brokersIds = new ArrayList<>(brokersMetrics.getInternalBrokerMetrics().keySet());
 
     InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
         brokersMetrics.toBuilder();
@@ -165,6 +170,7 @@ public class KafkaService {
         .lastKafkaException(null)
         .metrics(clusterMetrics)
         .topics(topics)
+        .brokers(brokersIds)
         .build();
   }
 
@@ -722,4 +728,145 @@ public class KafkaService {
         });
   }
 
+  private Mono<InternalTopic> changeReplicationFactor(
+      AdminClient adminClient,
+      String topicName,
+      Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments
+  ) {
+    return ClusterUtil.toMono(adminClient
+        .alterPartitionReassignments(reassignments).all(), topicName)
+        .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
+  }
+
+  /**
+   * Change topic replication factor, works on brokers versions 5.4.x and higher
+   */
+  public Mono<InternalTopic> changeReplicationFactor(
+      KafkaCluster cluster,
+      String topicName,
+      ReplicationFactorChange replicationFactorChange) {
+    return getOrCreateAdminClient(cluster)
+        .flatMap(ac -> {
+          Integer actual = cluster.getTopics().get(topicName).getReplicationFactor();
+          Integer requested = replicationFactorChange.getTotalReplicationFactor();
+          Integer brokersCount = cluster.getMetrics().getBrokerCount();
+
+          if (requested.equals(actual)) {
+            return Mono.error(
+                new ValidationException(
+                    String.format("Topic already has replicationFactor %s.", actual)));
+          }
+          if (requested > brokersCount) {
+            return Mono.error(
+                new ValidationException(
+                    String.format("Requested replication factor %s more than brokers count %s.",
+                        requested, brokersCount)));
+          }
+          return changeReplicationFactor(ac.getAdminClient(), topicName,
+              getPartitionsReassignments(cluster, topicName,
+                  replicationFactorChange));
+        });
+  }
+
+  private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsReassignments(
+      KafkaCluster cluster,
+      String topicName,
+      ReplicationFactorChange replicationFactorChange) {
+    // Current assignment map (Partition number -> List of brokers)
+    Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(cluster, topicName);
+    // Brokers map (Broker id -> count)
+    Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment);
+    int currentReplicationFactor = cluster.getTopics().get(topicName).getReplicationFactor();
+
+    // If we should to increase Replication factor
+    if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
+      // For each partition
+      for (var assignmentList : currentAssignment.values()) {
+        // Get brokers list sorted by usage
+        var brokers = brokersUsage.entrySet().stream()
+            .sorted(Map.Entry.comparingByValue())
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+
+        // Iterate brokers and try to add them in assignment
+        // while (partition replicas count != requested replication factor)
+        for (Integer broker : brokers) {
+          if (!assignmentList.contains(broker)) {
+            assignmentList.add(broker);
+            brokersUsage.merge(broker, 1, Integer::sum);
+          }
+          if (assignmentList.size() == replicationFactorChange.getTotalReplicationFactor()) {
+            break;
+          }
+        }
+        if (assignmentList.size() != replicationFactorChange.getTotalReplicationFactor()) {
+          throw new ValidationException("Something went wrong during adding replicas");
+        }
+      }
+
+      // If we should to decrease Replication factor
+    } else if (replicationFactorChange.getTotalReplicationFactor() < currentReplicationFactor) {
+      for (Map.Entry<Integer, List<Integer>> assignmentEntry : currentAssignment.entrySet()) {
+        var partition = assignmentEntry.getKey();
+        var brokers = assignmentEntry.getValue();
+
+        // Get brokers list sorted by usage in reverse order
+        var brokersUsageList = brokersUsage.entrySet().stream()
+            .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toList());
+
+        // Iterate brokers and try to remove them from assignment
+        // while (partition replicas count != requested replication factor)
+        for (Integer broker : brokersUsageList) {
+          // Check is the broker the leader of partition
+          if (!cluster.getTopics().get(topicName).getPartitions().get(partition).getLeader()
+              .equals(broker)) {
+            brokers.remove(broker);
+            brokersUsage.merge(broker, -1, Integer::sum);
+          }
+          if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) {
+            break;
+          }
+        }
+        if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) {
+          throw new ValidationException("Something went wrong during removing replicas");
+        }
+      }
+    } else {
+      throw new ValidationException("Replication factor already equals requested");
+    }
+
+    // Return result map
+    return currentAssignment.entrySet().stream().collect(Collectors.toMap(
+        e -> new TopicPartition(topicName, e.getKey()),
+        e -> Optional.of(new NewPartitionReassignment(e.getValue()))
+    ));
+  }
+
+  private Map<Integer, List<Integer>> getCurrentAssignment(KafkaCluster cluster, String topicName) {
+    return cluster.getTopics().get(topicName).getPartitions().values().stream()
+        .collect(Collectors.toMap(
+            InternalPartition::getPartition,
+            p -> p.getReplicas().stream()
+                .map(InternalReplica::getBroker)
+                .collect(Collectors.toList())
+        ));
+  }
+
+  private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
+                                              Map<Integer, List<Integer>> currentAssignment) {
+    Map<Integer, Integer> result = cluster.getBrokers().stream()
+        .collect(Collectors.toMap(
+            c -> c,
+            c -> 0
+        ));
+    currentAssignment.values().forEach(brokers -> brokers
+        .forEach(broker -> result.put(broker, result.get(broker) + 1)));
+
+    return result;
+  }
+
+
+
 }

+ 76 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -31,6 +31,30 @@ paths:
                 items:
                   $ref: '#/components/schemas/Cluster'
 
+
+  /api/clusters/{clusterName}/cache:
+    post:
+      tags:
+        - Clusters
+      summary: updateClusterInfo
+      operationId: updateClusterInfo
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/Cluster'
+        404:
+          description: Not found
+
+
   /api/clusters/{clusterName}/brokers:
     get:
       tags:
@@ -287,6 +311,38 @@ paths:
                 items:
                   $ref: '#/components/schemas/TopicConfig'
 
+  /api/clusters/{clusterName}/topics/{topicName}/replications:
+    patch:
+      tags:
+        - Topics
+      summary: changeReplicationFactor
+      operationId: changeReplicationFactor
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ReplicationFactorChange'
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ReplicationFactorChangeResponse'
+        404:
+          description: Not found
+
   /api/clusters/{clusterName}/topics/{topicName}/messages:
     get:
       tags:
@@ -2098,3 +2154,23 @@ components:
       required:
         - totalPartitionsCount
         - topicName
+
+    ReplicationFactorChange:
+      type: object
+      properties:
+        totalReplicationFactor:
+          type: integer
+          minimum: 1
+      required:
+        - totalReplicationFactor
+
+    ReplicationFactorChangeResponse:
+      type: object
+      properties:
+        totalReplicationFactor:
+          type: integer
+        topicName:
+          type: string
+      required:
+        - totalReplicationFactor
+        - topicName