Compare commits
5 commits
master
...
partitions
Author | SHA1 | Date | |
---|---|---|---|
![]() |
c1e13ef0f8 | ||
![]() |
5255e05357 | ||
![]() |
4cf8f7a7db | ||
![]() |
14bd3086e9 | ||
![]() |
432c027c0b |
7 changed files with 798 additions and 1 deletions
|
@ -4,10 +4,14 @@ import static java.util.stream.Collectors.toList;
|
|||
|
||||
import com.provectus.kafka.ui.api.TopicsApi;
|
||||
import com.provectus.kafka.ui.mapper.ClusterMapper;
|
||||
import com.provectus.kafka.ui.model.GeneratePartitionsReassignmentCommandDTO;
|
||||
import com.provectus.kafka.ui.model.InProgressReassignmentDTO;
|
||||
import com.provectus.kafka.ui.model.InternalTopic;
|
||||
import com.provectus.kafka.ui.model.InternalTopicConfig;
|
||||
import com.provectus.kafka.ui.model.PartitionReassignmentCancellationDTO;
|
||||
import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
|
||||
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
|
||||
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
|
||||
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
|
||||
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
|
||||
import com.provectus.kafka.ui.model.SortOrderDTO;
|
||||
|
@ -21,12 +25,16 @@ import com.provectus.kafka.ui.model.TopicUpdateDTO;
|
|||
import com.provectus.kafka.ui.model.TopicsResponseDTO;
|
||||
import com.provectus.kafka.ui.service.TopicsService;
|
||||
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
|
||||
import com.provectus.kafka.ui.service.reassign.ReassignmentService;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.validation.Valid;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
@ -43,6 +51,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
|
||||
private final TopicsService topicsService;
|
||||
private final TopicAnalysisService topicAnalysisService;
|
||||
private final ReassignmentService reassignmentService;
|
||||
private final ClusterMapper clusterMapper;
|
||||
|
||||
@Override
|
||||
|
@ -209,4 +218,57 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.orElseGet(() -> ResponseEntity.notFound().build())
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>>
|
||||
generatePartitionAssignment(String clusterName,
|
||||
Mono<GeneratePartitionsReassignmentCommandDTO> reassignCmdDto,
|
||||
ServerWebExchange exchange) {
|
||||
return reassignCmdDto
|
||||
.flatMap(generateDto ->
|
||||
reassignmentService.generate(
|
||||
getCluster(clusterName),
|
||||
generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet()),
|
||||
generateDto.getBrokerIds()))
|
||||
.map(ResponseEntity::ok);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>> getCurrentPartitionAssignment(String clusterName,
|
||||
List<String> topics,
|
||||
ServerWebExchange exchange) {
|
||||
return reassignmentService.getCurrentAssignment(getCluster(clusterName), new HashSet<>(topics))
|
||||
.map(ResponseEntity::ok);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> executePartitionAssignment(String clusterName,
|
||||
Mono<ReassignPartitionsCommandDTO> cmdDto,
|
||||
ServerWebExchange exchange) {
|
||||
return cmdDto
|
||||
.flatMap(cmd -> reassignmentService.executeReassignment(getCluster(clusterName), cmd))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<InProgressReassignmentDTO>> getInProgressAssignments(String clusterName,
|
||||
ServerWebExchange exchange) {
|
||||
return reassignmentService.getInProgressAssignments(getCluster(clusterName))
|
||||
.map(ResponseEntity::ok);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> cancelRunningPartitionAssignment(String clusterName,
|
||||
Mono<PartitionReassignmentCancellationDTO> cancelDto,
|
||||
ServerWebExchange exchange) {
|
||||
return cancelDto
|
||||
.flatMap(dto ->
|
||||
reassignmentService.cancelReassignment(
|
||||
getCluster(clusterName),
|
||||
dto.getPartitions().stream()
|
||||
.map(p -> new TopicPartition(p.getTopic(), p.getPartition()))
|
||||
.collect(Collectors.toSet())))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
|||
import org.apache.kafka.clients.admin.NewPartitions;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.admin.PartitionReassignment;
|
||||
import org.apache.kafka.clients.admin.RecordsToDelete;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
|
@ -357,6 +358,10 @@ public class ReactiveAdminClient implements Closeable {
|
|||
return toMono(client.alterPartitionReassignments(reassignments).all());
|
||||
}
|
||||
|
||||
public Mono<Map<TopicPartition, PartitionReassignment>> listPartitionReassignments() {
|
||||
return toMono(client.listPartitionReassignments().reassignments());
|
||||
}
|
||||
|
||||
public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
|
||||
return toMono(client.createPartitions(newPartitionsMap).all());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,188 @@
|
|||
package com.provectus.kafka.ui.service.reassign;
|
||||
|
||||
import static java.util.stream.Collectors.groupingBy;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.random.RandomGenerator;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
||||
import org.apache.kafka.clients.admin.PartitionReassignment;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
class ReassignmentOperations {
|
||||
|
||||
public static final int SUPPORTED_CMD_VERSION = 1;
|
||||
public static final String LOG_DIR_NOT_SET_STRING = "any";
|
||||
|
||||
private record BrokerMetadata(int id, Optional<String> rack) {}
|
||||
|
||||
private final ReactiveAdminClient adminClient;
|
||||
|
||||
Mono<Map<TopicPartition, List<Integer>>> generatePartitionReassignment(Set<String> topics,
|
||||
List<Integer> brokerIds,
|
||||
boolean rackAware) {
|
||||
return Mono.zip(getCurrentAssignment(topics), getBrokersMetadata(brokerIds))
|
||||
.map(t -> calculateAssignment(t.getT1(), t.getT2(), rackAware));
|
||||
}
|
||||
|
||||
// [ topic -> [partition -> list of replica ids] ]
|
||||
Mono<Map<TopicPartition, List<Integer>>> getCurrentAssignment(Set<String> topics) {
|
||||
return adminClient.describeTopics(topics)
|
||||
.map(topicToDescriptionMap -> topicToDescriptionMap.entrySet().stream()
|
||||
.flatMap((Map.Entry<String, TopicDescription> e) ->
|
||||
e.getValue().partitions().stream()
|
||||
.map(p ->
|
||||
Tuples.of(
|
||||
new TopicPartition(e.getKey(), p.partition()),
|
||||
p.replicas().stream().map(Node::id).toList())))
|
||||
.collect(toMap(Tuple2::getT1, Tuple2::getT2)));
|
||||
}
|
||||
|
||||
Mono<Void> validateAndExecute(List<Tuple2<TopicPartition, List<Integer>>> reassignment, Runnable preExecute) {
|
||||
return validateCmd(reassignment)
|
||||
.doOnNext(r -> preExecute.run())
|
||||
.flatMap(adminClient::alterPartitionReassignments);
|
||||
}
|
||||
|
||||
private Mono<Map<TopicPartition, Optional<NewPartitionReassignment>>> validateCmd(
|
||||
List<Tuple2<TopicPartition, List<Integer>>> reassignment) {
|
||||
if (reassignment.isEmpty()) {
|
||||
throw new ValidationException("Partition reassignment list cannot be empty");
|
||||
}
|
||||
if (reassignment.stream().map(Tuple2::getT2).anyMatch(List::isEmpty)) {
|
||||
throw new ValidationException("Partition replica list cannot be empty");
|
||||
}
|
||||
if (reassignment.stream().map(Tuple2::getT1).distinct().count() < reassignment.size()) {
|
||||
throw new ValidationException("Partition reassignment contains duplicate topic partitions");
|
||||
}
|
||||
return adminClient.describeCluster()
|
||||
.doOnNext(description -> {
|
||||
var knownIds = description.getNodes().stream().map(Node::id).toList();
|
||||
var unknownIds = reassignment.stream()
|
||||
.flatMap(t -> t.getT2().stream())
|
||||
.filter(id -> !knownIds.contains(id))
|
||||
.toList();
|
||||
if (!unknownIds.isEmpty()) {
|
||||
throw new ValidationException("Unknown broker ids: " + unknownIds);
|
||||
}
|
||||
})
|
||||
.thenReturn(reassignment.stream()
|
||||
.collect(toMap(Tuple2::getT1, t -> Optional.of(new NewPartitionReassignment(t.getT2())))));
|
||||
}
|
||||
|
||||
private Mono<List<BrokerMetadata>> getBrokersMetadata(List<Integer> brokerIds) {
|
||||
return adminClient.describeCluster()
|
||||
.map(description -> description.getNodes().stream()
|
||||
.filter(n -> brokerIds.contains(n.id()))
|
||||
.map(n -> new BrokerMetadata(n.id(), Optional.ofNullable(n.rack())))
|
||||
.toList());
|
||||
}
|
||||
|
||||
private static Map<TopicPartition, List<Integer>> calculateAssignment(
|
||||
Map<TopicPartition, List<Integer>> currentAssignments,
|
||||
List<BrokerMetadata> brokerMetadata,
|
||||
boolean rackAware) {
|
||||
Map<String, Map<TopicPartition, List<Integer>>> perTopic = currentAssignments.entrySet().stream()
|
||||
.collect(groupingBy(e -> e.getKey().topic(), toMap(Map.Entry::getKey, Map.Entry::getValue)));
|
||||
return rackAware
|
||||
? calculateAssignmentRackAware(perTopic, brokerMetadata)
|
||||
: calculateAssignmentRackUnaware(perTopic, brokerMetadata);
|
||||
}
|
||||
|
||||
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackAware(
|
||||
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
|
||||
List<BrokerMetadata> brokerMetadata) {
|
||||
if (brokerMetadata.stream().anyMatch(m -> m.rack().isEmpty())) {
|
||||
throw new ValidationException("Not all brokers have rack information for replica rack aware assignment");
|
||||
}
|
||||
log.warn("Rack-aware assignment calculation is not implemented yet, falling back to usual calculation");
|
||||
return calculateAssignmentRackUnaware(currentAssignments, brokerMetadata);
|
||||
}
|
||||
|
||||
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackUnaware(
|
||||
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
|
||||
List<BrokerMetadata> brokerMetadata) {
|
||||
Map<TopicPartition, List<Integer>> result = new LinkedHashMap<>();
|
||||
currentAssignments.forEach((topic, currentAssignment) -> {
|
||||
result.putAll(
|
||||
assignReplicasToBrokersRackUnaware(
|
||||
topic,
|
||||
currentAssignment.size(),
|
||||
currentAssignment.entrySet().iterator().next().getValue().size(),
|
||||
brokerMetadata.stream().map(BrokerMetadata::id).collect(toList()),
|
||||
ThreadLocalRandom.current()
|
||||
)
|
||||
);
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
// implementation copied from https://github.com/apache/kafka/blob/1874f2388cffa7a1e866cbe4aff8b92c9d953b41/core/src/main/scala/kafka/admin/AdminUtils.scala#L125
|
||||
@VisibleForTesting
|
||||
static Map<TopicPartition, List<Integer>> assignReplicasToBrokersRackUnaware(
|
||||
String topic,
|
||||
int numPartitions,
|
||||
int replicationFactor,
|
||||
List<Integer> brokerList,
|
||||
RandomGenerator rand) {
|
||||
var result = new LinkedHashMap<TopicPartition, List<Integer>>();
|
||||
int startIndex = rand.nextInt(brokerList.size());
|
||||
int currentPartitionId = 0;
|
||||
int nextReplicaShift = startIndex;
|
||||
for (int i = 0; i < numPartitions; i++) {
|
||||
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) {
|
||||
nextReplicaShift += 1;
|
||||
}
|
||||
int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size();
|
||||
var replicaBuffer = Lists.newArrayList(brokerList.get(firstReplicaIndex));
|
||||
for (int j = 0; j < replicationFactor - 1; j++) {
|
||||
replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
|
||||
}
|
||||
result.put(new TopicPartition(topic, currentPartitionId), replicaBuffer);
|
||||
currentPartitionId += 1;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int numBrokers) {
|
||||
var shift = 1 + (secondReplicaShift + replicaIndex) % (numBrokers - 1);
|
||||
return (firstReplicaIndex + shift) % numBrokers;
|
||||
}
|
||||
|
||||
Mono<Void> cancelReassignment(Set<TopicPartition> partitions) {
|
||||
return adminClient.listPartitionReassignments()
|
||||
.map(reassignments -> reassignments.entrySet().stream()
|
||||
.filter(e -> partitions.contains(e.getKey()))
|
||||
.filter(e -> {
|
||||
PartitionReassignment reassignment = e.getValue();
|
||||
return !reassignment.addingReplicas().isEmpty()
|
||||
|| !reassignment.removingReplicas().isEmpty();
|
||||
})
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet()))
|
||||
.flatMap(tps -> tps.isEmpty()
|
||||
? Mono.empty()
|
||||
: adminClient.alterPartitionReassignments(tps.stream().collect(toMap(p -> p, p -> Optional.empty()))));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
package com.provectus.kafka.ui.service.reassign;
|
||||
|
||||
import static com.provectus.kafka.ui.service.reassign.ReassignmentOperations.LOG_DIR_NOT_SET_STRING;
|
||||
import static com.provectus.kafka.ui.service.reassign.ReassignmentOperations.SUPPORTED_CMD_VERSION;
|
||||
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.model.InProgressPartitionReassignmentDTO;
|
||||
import com.provectus.kafka.ui.model.InProgressReassignmentDTO;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.PartitionReassignmentDTO;
|
||||
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
|
||||
import com.provectus.kafka.ui.service.AdminClientService;
|
||||
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.admin.PartitionReassignment;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class ReassignmentService {
|
||||
|
||||
private final AdminClientService adminClientService;
|
||||
|
||||
public Mono<ReassignPartitionsCommandDTO> generate(KafkaCluster cluster,
|
||||
Set<String> topics,
|
||||
List<Integer> brokerIds) {
|
||||
return adminClientService.get(cluster)
|
||||
.map(ReassignmentOperations::new)
|
||||
.flatMap(planner -> planner.generatePartitionReassignment(topics, brokerIds, false))
|
||||
.map(ReassignmentService::mapToReassignmentDto);
|
||||
}
|
||||
|
||||
public Mono<Void> executeReassignment(KafkaCluster cluster, ReassignPartitionsCommandDTO cmd) {
|
||||
return adminClientService.get(cluster)
|
||||
.map(ReassignmentOperations::new)
|
||||
.flatMap(ops -> {
|
||||
if (!cmd.getPartitions().stream()
|
||||
.flatMap(p -> p.getLogDirs().stream())
|
||||
.allMatch(logDir -> logDir.equalsIgnoreCase(LOG_DIR_NOT_SET_STRING))) {
|
||||
return Mono.error(new ValidationException("Log dir altering is not supported"));
|
||||
}
|
||||
List<Tuple2<TopicPartition, List<Integer>>> reassignment = cmd.getPartitions().stream()
|
||||
.map(p -> Tuples.of(new TopicPartition(p.getTopic(), p.getPartition()), p.getReplicas()))
|
||||
.toList();
|
||||
return ops.validateAndExecute(reassignment, () -> logRequestedAssignment(cmd));
|
||||
});
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private void logRequestedAssignment(ReassignPartitionsCommandDTO cmd) {
|
||||
log.info("Executing partitions reassignment: \n{}",
|
||||
new JsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(cmd));
|
||||
}
|
||||
|
||||
public Mono<Void> cancelReassignment(KafkaCluster cluster, Set<TopicPartition> partitions) {
|
||||
return adminClientService.get(cluster)
|
||||
.map(ReassignmentOperations::new)
|
||||
.flatMap(ops -> ops.cancelReassignment(partitions));
|
||||
}
|
||||
|
||||
public Mono<ReassignPartitionsCommandDTO> getCurrentAssignment(KafkaCluster cluster,
|
||||
Set<String> topics) {
|
||||
return adminClientService.get(cluster)
|
||||
.map(ReassignmentOperations::new)
|
||||
.flatMap(ops -> ops.getCurrentAssignment(topics))
|
||||
.map(ReassignmentService::mapToReassignmentDto);
|
||||
}
|
||||
|
||||
public Mono<InProgressReassignmentDTO> getInProgressAssignments(KafkaCluster cluster) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ReactiveAdminClient::listPartitionReassignments)
|
||||
.map(ReassignmentService::mapToInProgressReassignmentsDto);
|
||||
}
|
||||
|
||||
private static InProgressReassignmentDTO mapToInProgressReassignmentsDto(
|
||||
Map<TopicPartition, PartitionReassignment> reassignments) {
|
||||
return new InProgressReassignmentDTO()
|
||||
.partitions(
|
||||
reassignments.entrySet().stream()
|
||||
.map(e -> new InProgressPartitionReassignmentDTO()
|
||||
.topic(e.getKey().topic())
|
||||
.partition(e.getKey().partition())
|
||||
.currentReplicas(e.getValue().replicas())
|
||||
.addingReplicas(e.getValue().addingReplicas())
|
||||
.removingReplicas(e.getValue().removingReplicas())
|
||||
)
|
||||
.toList()
|
||||
);
|
||||
}
|
||||
|
||||
private static ReassignPartitionsCommandDTO mapToReassignmentDto(
|
||||
Map<TopicPartition, List<Integer>> assignment) {
|
||||
return new ReassignPartitionsCommandDTO()
|
||||
.version(SUPPORTED_CMD_VERSION)
|
||||
.partitions(assignment.entrySet().stream().map(ReassignmentService::mapPartitionAssignmentDto).toList());
|
||||
}
|
||||
|
||||
private static PartitionReassignmentDTO mapPartitionAssignmentDto(Map.Entry<TopicPartition, List<Integer>>
|
||||
partitionsAssignment) {
|
||||
return new PartitionReassignmentDTO()
|
||||
.topic(partitionsAssignment.getKey().topic())
|
||||
.partition(partitionsAssignment.getKey().partition())
|
||||
.replicas(partitionsAssignment.getValue())
|
||||
.logDirs(partitionsAssignment.getValue().stream().map(r -> LOG_DIR_NOT_SET_STRING).toList());
|
||||
}
|
||||
|
||||
}
|
|
@ -19,6 +19,7 @@ import com.provectus.kafka.ui.model.SortOrderDTO;
|
|||
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDTO;
|
||||
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
|
||||
import com.provectus.kafka.ui.service.reassign.ReassignmentService;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
@ -43,7 +44,7 @@ class TopicsServicePaginationTest {
|
|||
private final ClusterMapper clusterMapper = new ClusterMapperImpl();
|
||||
|
||||
private final TopicsController topicsController = new TopicsController(
|
||||
topicsService, mock(TopicAnalysisService.class), clusterMapper);
|
||||
topicsService, mock(TopicAnalysisService.class), mock(ReassignmentService.class), clusterMapper);
|
||||
|
||||
private void init(Map<String, InternalTopic> topicsInCache) {
|
||||
|
||||
|
|
|
@ -0,0 +1,224 @@
|
|||
package com.provectus.kafka.ui.service.reassign;
|
||||
|
||||
import static java.util.stream.Collectors.groupingBy;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.anyInt;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.random.RandomGenerator;
|
||||
import java.util.stream.IntStream;
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.CsvSource;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.KafkaContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
class ReassignmentOperationsTest {
|
||||
|
||||
private static final String CONFLUENTINC_VERSION = "7.3.0";
|
||||
|
||||
private static final int KAFKA_1_ID = 1;
|
||||
private static final int KAFKA_2_ID = 2;
|
||||
|
||||
private static final Network NETWORK = Network.newNetwork();
|
||||
private static GenericContainer<?> ZK;
|
||||
private static KafkaContainer KAFKA_1;
|
||||
private static KafkaContainer KAFKA_2;
|
||||
|
||||
private static AdminClient ADMIN_CLIENT;
|
||||
|
||||
ReassignmentOperations ops = new ReassignmentOperations(ReactiveAdminClient.create(ADMIN_CLIENT).block());
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
ZK = new GenericContainer(
|
||||
DockerImageName.parse("confluentinc/cp-zookeeper").withTag(CONFLUENTINC_VERSION))
|
||||
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181")
|
||||
.withNetworkAliases("zookeeper")
|
||||
.withNetwork(NETWORK);
|
||||
KAFKA_1 = new KafkaContainer(
|
||||
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENTINC_VERSION))
|
||||
.withNetwork(NETWORK)
|
||||
.withEnv("KAFKA_BROKER_ID", KAFKA_1_ID + "")
|
||||
.withExternalZookeeper("zookeeper:2181");
|
||||
KAFKA_2 = new KafkaContainer(
|
||||
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENTINC_VERSION))
|
||||
.withNetwork(NETWORK)
|
||||
.withEnv("KAFKA_BROKER_ID", KAFKA_2_ID + "")
|
||||
.withExternalZookeeper("zookeeper:2181");
|
||||
|
||||
ZK.start();
|
||||
KAFKA_1.start();
|
||||
KAFKA_2.start();
|
||||
|
||||
Properties p = new Properties();
|
||||
p.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_1.getBootstrapServers());
|
||||
ADMIN_CLIENT = AdminClient.create(p);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void tearDown() {
|
||||
ADMIN_CLIENT.close();
|
||||
KAFKA_1.stop();
|
||||
KAFKA_2.stop();
|
||||
ZK.stop();
|
||||
NETWORK.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGeneratePartitionReassignment() throws Exception {
|
||||
String testTopic1 = "test_" + UUID.randomUUID();
|
||||
String testTopic2 = "test_" + UUID.randomUUID();
|
||||
ADMIN_CLIENT.createTopics(
|
||||
List.of(
|
||||
new NewTopic(testTopic1, 5, (short) 1),
|
||||
new NewTopic(testTopic2, 3, (short) 2))
|
||||
).all().get();
|
||||
|
||||
Map<TopicPartition, List<Integer>> assignment =
|
||||
ops.generatePartitionReassignment(
|
||||
Set.of(testTopic1, testTopic2), List.of(KAFKA_1_ID, KAFKA_2_ID), false).block();
|
||||
|
||||
var perTopicAssignments = assignment.entrySet().stream()
|
||||
.collect(groupingBy(e -> e.getKey().topic(), toMap(Map.Entry::getKey, Map.Entry::getValue)));
|
||||
|
||||
verifyAssignment(testTopic1, 5, 1, perTopicAssignments.get(testTopic1));
|
||||
verifyAssignment(testTopic2, 3, 2, perTopicAssignments.get(testTopic2));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetCurrentAssignment() throws Exception {
|
||||
String testTopic1 = "test_" + UUID.randomUUID();
|
||||
String testTopic2 = "test_" + UUID.randomUUID();
|
||||
ADMIN_CLIENT.createTopics(
|
||||
List.of(
|
||||
new NewTopic(testTopic1, 2, (short) 2),
|
||||
new NewTopic(testTopic2, 2, (short) 2))
|
||||
).all().get();
|
||||
|
||||
Map<TopicPartition, List<Integer>> currentAssignment =
|
||||
ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block();
|
||||
|
||||
assertThat(currentAssignment.entrySet().stream())
|
||||
.hasSize(4)
|
||||
.allSatisfy(e -> {
|
||||
TopicPartition partition = e.getKey();
|
||||
List<Integer> replicas = e.getValue();
|
||||
assertThat(partition.topic()).isIn(List.of(testTopic1, testTopic2));
|
||||
assertThat(replicas).hasSize(2).containsExactlyInAnyOrder(KAFKA_1_ID, KAFKA_2_ID);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void testValidateAndExecute() throws Exception {
|
||||
String testTopic1 = "test_" + UUID.randomUUID();
|
||||
String testTopic2 = "test_" + UUID.randomUUID();
|
||||
ADMIN_CLIENT.createTopics(
|
||||
List.of(
|
||||
new NewTopic(testTopic1, 2, (short) 2),
|
||||
new NewTopic(testTopic2, 2, (short) 2))
|
||||
).all().get();
|
||||
|
||||
Map<TopicPartition, List<Integer>> currentAssignment =
|
||||
ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block();
|
||||
|
||||
Map<TopicPartition, List<Integer>> desiredAssignment = currentAssignment.entrySet().stream()
|
||||
.map(e -> Tuples.of(e.getKey(), Lists.reverse(e.getValue()))) //reversing replicas list
|
||||
.collect(toMap(Tuple2::getT1, Tuple2::getT2));
|
||||
|
||||
ops.validateAndExecute(
|
||||
desiredAssignment.entrySet().stream().map(e -> Tuples.of(e.getKey(), e.getValue())).toList(), () -> {}).block();
|
||||
|
||||
Awaitility.await()
|
||||
.pollInSameThread()
|
||||
.atMost(Duration.ofSeconds(10))
|
||||
.untilAsserted(() -> {
|
||||
Map<TopicPartition, List<Integer>> actualAssignment =
|
||||
ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block();
|
||||
|
||||
assertThat(actualAssignment).containsExactlyInAnyOrderEntriesOf(desiredAssignment);
|
||||
});
|
||||
}
|
||||
|
||||
//test case copied from https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala#L198
|
||||
@Test
|
||||
void testAssignReplicasToBrokersRackUnaware() {
|
||||
RandomGenerator rand = mock(RandomGenerator.class);
|
||||
when(rand.nextInt(anyInt())).thenReturn(0);
|
||||
|
||||
var assignment = ReassignmentOperations.assignReplicasToBrokersRackUnaware(
|
||||
"test",
|
||||
10,
|
||||
3,
|
||||
List.of(0, 1, 2, 3, 4),
|
||||
rand
|
||||
);
|
||||
assertThat(assignment)
|
||||
.containsExactlyInAnyOrderEntriesOf(
|
||||
Map.of(
|
||||
new TopicPartition("test", 0), List.of(0, 1, 2),
|
||||
new TopicPartition("test", 1), List.of(1, 2, 3),
|
||||
new TopicPartition("test", 2), List.of(2, 3, 4),
|
||||
new TopicPartition("test", 3), List.of(3, 4, 0),
|
||||
new TopicPartition("test", 4), List.of(4, 0, 1),
|
||||
new TopicPartition("test", 5), List.of(0, 2, 3),
|
||||
new TopicPartition("test", 6), List.of(1, 3, 4),
|
||||
new TopicPartition("test", 7), List.of(2, 4, 0),
|
||||
new TopicPartition("test", 8), List.of(3, 0, 1),
|
||||
new TopicPartition("test", 9), List.of(4, 1, 2)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource({
|
||||
"10, 3, 1",
|
||||
"10, 3, 3",
|
||||
"1, 10, 1",
|
||||
"1, 10, 10",
|
||||
})
|
||||
void testAssignReplicasToBrokersRackUnawareWithRealRandom(int partitions, int brokersCnt, int replicationF) {
|
||||
var assignment = ReassignmentOperations.assignReplicasToBrokersRackUnaware(
|
||||
"test",
|
||||
partitions,
|
||||
replicationF,
|
||||
IntStream.range(0, brokersCnt).boxed().toList(),
|
||||
ThreadLocalRandom.current()
|
||||
);
|
||||
verifyAssignment("test", partitions, replicationF, assignment);
|
||||
}
|
||||
|
||||
private void verifyAssignment(String topic, int numParts, int replicationFactor,
|
||||
Map<TopicPartition, List<Integer>> assignmentToCheck) {
|
||||
assertThat(assignmentToCheck.keySet())
|
||||
.containsExactlyInAnyOrderElementsOf(
|
||||
IntStream.range(0, numParts).mapToObj(i -> new TopicPartition(topic, i)).toList());
|
||||
|
||||
assertThat(assignmentToCheck.values().stream())
|
||||
.allMatch(replicas ->
|
||||
replicas.stream().distinct().count() == replicas.size() && replicas.size() == replicationFactor);
|
||||
}
|
||||
|
||||
}
|
|
@ -1729,6 +1729,113 @@ paths:
|
|||
$ref: '#/components/schemas/PartitionsIncreaseResponse'
|
||||
404:
|
||||
description: Not found
|
||||
|
||||
/api/clusters/{clusterName}/partitionsreaassignments/suggestions:
|
||||
post:
|
||||
tags:
|
||||
- Topics
|
||||
operationId: generatePartitionAssignment
|
||||
parameters:
|
||||
- name: clusterName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/GeneratePartitionsReassignmentCommand'
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ReassignPartitionsCommand'
|
||||
|
||||
/api/clusters/{clusterName}/partitionsreaassignments/current:
|
||||
get:
|
||||
tags:
|
||||
- Topics
|
||||
operationId: getCurrentPartitionAssignment
|
||||
parameters:
|
||||
- name: clusterName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: topics
|
||||
required: true
|
||||
in: query
|
||||
description: topic names for which assignments should be returned
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ReassignPartitionsCommand'
|
||||
|
||||
/api/clusters/{clusterName}/partitionsreaassignments/running:
|
||||
post:
|
||||
tags:
|
||||
- Topics
|
||||
operationId: executePartitionAssignment
|
||||
parameters:
|
||||
- name: clusterName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ReassignPartitionsCommand'
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
delete:
|
||||
tags:
|
||||
- Topics
|
||||
operationId: cancelRunningPartitionAssignment
|
||||
parameters:
|
||||
- name: clusterName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/PartitionReassignmentCancellation'
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
get:
|
||||
tags:
|
||||
- Topics
|
||||
operationId: getInProgressAssignments
|
||||
parameters:
|
||||
- name: clusterName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/InProgressReassignment'
|
||||
|
||||
/api/info/timestampformat:
|
||||
get:
|
||||
tags:
|
||||
|
@ -3149,3 +3256,95 @@ components:
|
|||
- COMPACT
|
||||
- COMPACT_DELETE
|
||||
- UNKNOWN
|
||||
|
||||
ReassignPartitionsCommand:
|
||||
type: object
|
||||
description: "NOTE! This format used by kafka-reassign-partitions.sh command-line utility and should not be changed"
|
||||
properties:
|
||||
version:
|
||||
type: integer
|
||||
format: int32
|
||||
partitions:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/PartitionReassignment"
|
||||
|
||||
PartitionReassignment:
|
||||
type: object
|
||||
description: "NOTE! This format used by kafka-reassign-partitions.sh command-line utility and should not be changed"
|
||||
properties:
|
||||
topic:
|
||||
type: string
|
||||
partition:
|
||||
type: integer
|
||||
replicas:
|
||||
type: array
|
||||
items:
|
||||
type: integer
|
||||
log_dirs:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
|
||||
GeneratePartitionsReassignmentCommand:
|
||||
type: object
|
||||
properties:
|
||||
version:
|
||||
type: integer
|
||||
format: int32
|
||||
broker_ids:
|
||||
type: array
|
||||
items:
|
||||
type: integer
|
||||
format: int32
|
||||
topics:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
topic:
|
||||
type: string
|
||||
|
||||
InProgressReassignment:
|
||||
type: object
|
||||
properties:
|
||||
partitions:
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/InProgressPartitionReassignment"
|
||||
|
||||
InProgressPartitionReassignment:
|
||||
type: object
|
||||
properties:
|
||||
topic:
|
||||
type: string
|
||||
partition:
|
||||
type: integer
|
||||
format: int32
|
||||
currentReplicas:
|
||||
type: array
|
||||
items:
|
||||
type: integer
|
||||
format: int32
|
||||
addingReplicas:
|
||||
items:
|
||||
type: integer
|
||||
format: int32
|
||||
removingReplicas:
|
||||
items:
|
||||
type: integer
|
||||
format: int32
|
||||
|
||||
PartitionReassignmentCancellation:
|
||||
type: object
|
||||
properties:
|
||||
partitions:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
topic:
|
||||
type: string
|
||||
partition:
|
||||
type: integer
|
||||
format: int32
|
||||
|
|
Loading…
Add table
Reference in a new issue