diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index ccfe898fdc..0bea32a875 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -4,10 +4,13 @@ import static java.util.stream.Collectors.toList; import com.provectus.kafka.ui.api.TopicsApi; import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.GeneratePartitionsReassignmentCommandDTO; +import com.provectus.kafka.ui.model.InProgressReassignmentDTO; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.PartitionsIncreaseDTO; import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO; +import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO; import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO; import com.provectus.kafka.ui.model.SortOrderDTO; @@ -21,8 +24,10 @@ import com.provectus.kafka.ui.model.TopicUpdateDTO; import com.provectus.kafka.ui.model.TopicsResponseDTO; import com.provectus.kafka.ui.service.TopicsService; import com.provectus.kafka.ui.service.analyze.TopicAnalysisService; +import com.provectus.kafka.ui.service.reassign.ReassignmentService; import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -43,6 +48,7 @@ public class TopicsController extends AbstractController implements TopicsApi { private final TopicsService topicsService; private final TopicAnalysisService topicAnalysisService; + private final ReassignmentService reassignmentService; private final ClusterMapper clusterMapper; @Override @@ -209,4 +215,44 @@ public class TopicsController extends AbstractController implements TopicsApi { .orElseGet(() -> ResponseEntity.notFound().build()) ); } + + + @Override + public Mono> generatePartitionAssignment(String clusterName, + Mono generatePartitionsReassignmentCommandDTO, + ServerWebExchange exchange) { + return generatePartitionsReassignmentCommandDTO + .flatMap(generateDto -> + reassignmentService.generate( + getCluster(clusterName), + generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet()), + generateDto.getBrokerIds())) + .map(ResponseEntity::ok); + } + + @Override + public Mono> getCurrentPartitionAssignment(String clusterName, + Mono generatePartitionsReassignmentCommandDTO, + ServerWebExchange exchange) { + return generatePartitionsReassignmentCommandDTO + .flatMap(generateDto -> + reassignmentService.getCurrentAssignment( + getCluster(clusterName), + generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet()))) + .map(ResponseEntity::ok); + } + + @Override + public Mono> executePartitionAssignment(String clusterName, + Mono reassignPartitionsCommandDTO, + ServerWebExchange exchange) { + return null; + } + + @Override + public Mono> getInProgressAssignments(String clusterName, + ServerWebExchange exchange) { + return reassignmentService.getInProgressAssignments(getCluster(clusterName)) + .map(ResponseEntity::ok); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 2504473b17..b01e936728 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -48,6 +48,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -357,6 +358,10 @@ public class ReactiveAdminClient implements Closeable { return toMono(client.alterPartitionReassignments(reassignments).all()); } + public Mono> listPartitionReassignments() { + return toMono(client.listPartitionReassignments().reassignments()); + } + public Mono createPartitions(Map newPartitionsMap) { return toMono(client.createPartitions(newPartitionsMap).all()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java new file mode 100644 index 0000000000..84b9769f66 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignementsStore.java @@ -0,0 +1,4 @@ +package com.provectus.kafka.ui.service.reassign; + +public class ReassignementsStore { +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java new file mode 100644 index 0000000000..031a891ee0 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlan.java @@ -0,0 +1,11 @@ +package com.provectus.kafka.ui.service.reassign; + +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.common.TopicPartition; + +public record ReassignmentPlan(Map> reassignments) { + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java new file mode 100644 index 0000000000..8674ea58e3 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentPlanner.java @@ -0,0 +1,151 @@ +package com.provectus.kafka.ui.service.reassign; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.PartitionReassignmentDTO; +import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.random.RandomGenerator; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +@Slf4j +@RequiredArgsConstructor +public class ReassignmentPlanner { + + record BrokerMetadata(int id, Optional rack) { + } + + private final ReactiveAdminClient adminClient; + + public Mono generatePartitionReassignment(Set topics, + List brokerIds, + boolean rackAware) { + return Mono.zip(currentAssignments(adminClient, topics), brokerMetadata(brokerIds)).map(t -> + createSuggestedReassignment( + calculateAssignment(t.getT1(), t.getT2(), rackAware))); + } + + private static ReassignPartitionsCommandDTO createSuggestedReassignment( + Map> assignment) { + var dto = new ReassignPartitionsCommandDTO().version(1); + assignment.forEach((tp, replicas) -> + dto.addPartitionsItem( + new PartitionReassignmentDTO() + .topic(tp.topic()) + .partition(tp.partition()) + .replicas(replicas) + .logDirs(replicas.stream().map(r -> "any").toList()))); + return dto; + } + + // [ topic -> [tp -> list of replicas] ] + public static Mono>>> currentAssignments(ReactiveAdminClient ac, Set topics) { + return ac.describeTopics(topics) + .map(topicToDescriptionMap -> + topicToDescriptionMap.entrySet().stream() + .map(e -> + Tuples.of( + e.getKey(), + e.getValue().partitions().stream() + .map(p -> + Tuples.of( + new TopicPartition(e.getKey(), p.partition()), + p.replicas().stream().map(Node::id).toList() + )).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)) + )) + .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)) + ); + } + + private Mono> brokerMetadata(List brokerIds) { + return adminClient.describeCluster() + .map(description -> description.getNodes().stream() + .filter(n -> brokerIds.contains(n.id())) + .map(n -> new BrokerMetadata(n.id(), Optional.ofNullable(n.rack()))) + .toList()); + } + + @VisibleForTesting + static Map> calculateAssignment( + Map>> currentAssignments, + List brokerMetadata, + boolean rackAware) { + if (rackAware && brokerMetadata.stream().anyMatch(m -> m.rack().isEmpty())) { + throw new ValidationException("Not all brokers have rack information for replica rack aware assignment"); + } + return rackAware + ? calculateAssignmentRackAware(currentAssignments, brokerMetadata) + : calculateAssignmentRackUnaware(currentAssignments, brokerMetadata); + } + + private static Map> calculateAssignmentRackAware( + Map>> currentAssignments, + List brokerMetadata) { + log.warn("Rack-aware assignment calculation is not implemented yet, falling back to usual calculation"); + return calculateAssignmentRackUnaware(currentAssignments, brokerMetadata); + } + + private static Map> calculateAssignmentRackUnaware( + Map>> currentAssignments, + List brokerMetadata) { + Map> result = new LinkedHashMap<>(); + currentAssignments.forEach((topic, currentAssignment) -> { + result.putAll( + assignReplicasToBrokersRackUnaware( + topic, + currentAssignment.size(), + currentAssignment.entrySet().iterator().next().getValue().size(), + brokerMetadata.stream().map(BrokerMetadata::id).collect(Collectors.toList()), + ThreadLocalRandom.current() + ) + ); + }); + return result; + } + + static Map> assignReplicasToBrokersRackUnaware( + String topic, + int nPartitions, + int replicationFactor, + List brokerList, + RandomGenerator rand) { + var result = new LinkedHashMap>(); + int startIndex = rand.nextInt(brokerList.size()); + int currentPartitionId = 0; + int nextReplicaShift = rand.nextInt(brokerList.size()); + for (int i = 0; i < nPartitions; i++) { + if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) { + nextReplicaShift += 1; + } + int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size(); + var replicaBuffer = Lists.newArrayList(brokerList.get(firstReplicaIndex)); + for (int j = 0; j < replicationFactor - 1; j++) { + replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size()))); + } + result.put(new TopicPartition(topic, currentPartitionId), replicaBuffer); + currentPartitionId += 1; + } + return result; + } + + private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int nBrokers) { + var shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1); + return (firstReplicaIndex + shift) % nBrokers; + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java new file mode 100644 index 0000000000..445caa97e8 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/reassign/ReassignmentService.java @@ -0,0 +1,78 @@ +package com.provectus.kafka.ui.service.reassign; + +import com.provectus.kafka.ui.model.InProgressPartitionReassignmentDTO; +import com.provectus.kafka.ui.model.InProgressReassignmentDTO; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.PartitionReassignmentDTO; +import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; +import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.List; +import java.util.Map; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.admin.PartitionReassignment; +import org.apache.kafka.common.TopicPartition; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +@RequiredArgsConstructor +public class ReassignmentService { + + private final AdminClientService adminClientService; + + public Mono generate(KafkaCluster cluster, + Set topics, + List brokerIds) { + return adminClientService.get(cluster) + .map(ReassignmentPlanner::new) + .flatMap(planner -> planner.generatePartitionReassignment(topics, brokerIds, false)); + } + + + public Mono getCurrentAssignment(KafkaCluster cluster, + Set topics) { + return adminClientService.get(cluster) + .flatMap(ac -> ReassignmentPlanner.currentAssignments(ac, topics)) + .map(this::map); + } + + public Mono getInProgressAssignments(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ReactiveAdminClient::listPartitionReassignments) + .map(this::mapInProgressReassignments); + } + + private InProgressReassignmentDTO mapInProgressReassignments(Map reassignments) { + return new InProgressReassignmentDTO() + .partitions( + reassignments.entrySet().stream() + .map(e -> new InProgressPartitionReassignmentDTO() + .topic(e.getKey().topic()) + .partition(e.getKey().partition()) + .currentReplicas(e.getValue().replicas()) + .addingReplicas(e.getValue().addingReplicas()) + .removingReplicas(e.getValue().removingReplicas()) + ) + .toList() + ); + } + + private ReassignPartitionsCommandDTO map(Map>> assignment) { + return new ReassignPartitionsCommandDTO() + .version(1) + .partitions( + assignment.values().stream() + .flatMap(m -> m.entrySet().stream()) + .map(p -> new PartitionReassignmentDTO() + .topic(p.getKey().topic()) + .partition(p.getKey().partition()) + .replicas(p.getValue()) + .logDirs(p.getValue().stream().map(r -> "any").toList()) + ) + .toList() + ); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java new file mode 100644 index 0000000000..9a49041647 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/reassign/ReassignmentJsonDtoTest.java @@ -0,0 +1,55 @@ +package com.provectus.kafka.ui.service.reassign; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO; +import java.util.List; +import org.junit.jupiter.api.Test; + +class ReassignmentJsonDtoTest { + + @Test + void canBeCreatedFromJsonString() { + var parsed = ReassignmentJsonDto.fromJson( + "{" + + " \"version\": 1, " + + " \"partitions\":" + + " [" + + " {" + + " \"topic\": \"my-topic\"," + + " \"partition\": 0, " + + " \"replicas\":" + + " [ " + + " 0, " + + " 1, " + + " 2 " + + " ], " + + " \"log_dirs\": " + + " [ " + + " \"any\", " + + " \"/user/share/kafka/p0\"," + + " \"any\"" + + " ]" + + " }" + + " ]" + + "}" + ); + assertThat(parsed).isEqualTo( + ReassignPartitionsCommandDTO.builder() + .version(1) + .partitions( + List.of( + ReassignmentJsonDto.PartitionAssignmentDto.builder() + .topic("my-topic") + .partition(0) + .replicas(List.of(0, 1, 2)) + .logDirs(List.of("any", "/user/share/kafka/p0", "any")) + .build() + ) + ) + .build() + ); + } + + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 1034fc3202..5a5e969d33 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1729,6 +1729,94 @@ paths: $ref: '#/components/schemas/PartitionsIncreaseResponse' 404: description: Not found + + /api/clusters/{clusterName}/partitionsreaassignments/generate: + post: + tags: + - Topics + operationId: generatePartitionAssignment + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/GeneratePartitionsReassignmentCommand' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ReassignPartitionsCommand' + + /api/clusters/{clusterName}/partitionsreaassignments/current: + post: + tags: + - Topics + operationId: getCurrentPartitionAssignment + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/GeneratePartitionsReassignmentCommand' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ReassignPartitionsCommand' + + /api/clusters/{clusterName}/partitionsreaassignments/execute: + post: + tags: + - Topics + operationId: executePartitionAssignment + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ReassignPartitionsCommand' + responses: + 200: + description: OK + + /api/clusters/{clusterName}/partitionsreaassignments/inprogress: + get: + tags: + - Topics + operationId: getInProgressAssignments + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/InProgressReassignment' + /api/info/timestampformat: get: tags: @@ -3149,3 +3237,79 @@ components: - COMPACT - COMPACT_DELETE - UNKNOWN + + ReassignPartitionsCommand: + type: object + properties: + version: + type: integer + format: int32 + partitions: + type: array + items: + $ref: "#/components/schemas/PartitionReassignment" + + PartitionReassignment: + type: object + properties: + topic: + type: string + partition: + type: integer + replicas: + type: array + items: + type: integer + log_dirs: + type: array + items: + type: string + + GeneratePartitionsReassignmentCommand: + type: object + properties: + version: + type: integer + format: int32 + broker_ids: + type: array + items: + type: integer + format: int32 + topics: + type: array + items: + type: object + properties: + topic: + type: string + + InProgressReassignment: + type: object + properties: + partitions: + type: array + items: + $ref: "#/components/schemas/InProgressPartitionReassignment" + + InProgressPartitionReassignment: + type: object + properties: + topic: + type: string + partition: + type: integer + format: int32 + currentReplicas: + type: array + items: + type: integer + format: int32 + addingReplicas: + items: + type: integer + format: int32 + removingReplicas: + items: + type: integer + format: int32