Partitions-reassignment endpoints implemented

This commit is contained in:
iliax 2022-11-23 23:33:26 +04:00
parent 432c027c0b
commit 14bd3086e9
10 changed files with 544 additions and 254 deletions

View file

@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.GeneratePartitionsReassignmentCommandDTO;
import com.provectus.kafka.ui.model.InProgressReassignmentDTO;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.PartitionReassignmentCancellationDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
@ -32,6 +33,7 @@ import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
@ -218,10 +220,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
@Override
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>> generatePartitionAssignment(String clusterName,
Mono<GeneratePartitionsReassignmentCommandDTO> generatePartitionsReassignmentCommandDTO,
ServerWebExchange exchange) {
return generatePartitionsReassignmentCommandDTO
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>>
generatePartitionAssignment(String clusterName,
Mono<GeneratePartitionsReassignmentCommandDTO> reassignCmdDto,
ServerWebExchange exchange) {
return reassignCmdDto
.flatMap(generateDto ->
reassignmentService.generate(
getCluster(clusterName),
@ -232,21 +235,24 @@ public class TopicsController extends AbstractController implements TopicsApi {
@Override
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>> getCurrentPartitionAssignment(String clusterName,
Mono<GeneratePartitionsReassignmentCommandDTO> generatePartitionsReassignmentCommandDTO,
Flux<String> topicsList,
ServerWebExchange exchange) {
return generatePartitionsReassignmentCommandDTO
.flatMap(generateDto ->
return topicsList
.collect(Collectors.toSet())
.flatMap(topics ->
reassignmentService.getCurrentAssignment(
getCluster(clusterName),
generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet())))
topics))
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> executePartitionAssignment(String clusterName,
Mono<ReassignPartitionsCommandDTO> reassignPartitionsCommandDTO,
Mono<ReassignPartitionsCommandDTO> cmdDto,
ServerWebExchange exchange) {
return null;
return cmdDto
.flatMap(cmd -> reassignmentService.executeReassignment(getCluster(clusterName), cmd))
.thenReturn(ResponseEntity.ok().build());
}
@Override
@ -255,4 +261,18 @@ public class TopicsController extends AbstractController implements TopicsApi {
return reassignmentService.getInProgressAssignments(getCluster(clusterName))
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> cancelPartitionAssignment(String clusterName,
Mono<PartitionReassignmentCancellationDTO> cancelDto,
ServerWebExchange exchange) {
return cancelDto
.flatMap(dto ->
reassignmentService.cancelReassignment(
getCluster(clusterName),
dto.getPartitions().stream()
.map(p -> new TopicPartition(p.getTopic(), p.getPartition()))
.collect(Collectors.toSet())))
.thenReturn(ResponseEntity.ok().build());
}
}

View file

@ -1,4 +0,0 @@
package com.provectus.kafka.ui.service.reassign;
public class ReassignementsStore {
}

View file

@ -0,0 +1,188 @@
package com.provectus.kafka.ui.service.reassign;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.random.RandomGenerator;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j
@RequiredArgsConstructor
class ReassignmentOperations {
public static final int SUPPORTED_CMD_VERSION = 1;
public static final String LOG_DIR_NOT_SET_STRING = "any";
private record BrokerMetadata(int id, Optional<String> rack) {}
private final ReactiveAdminClient adminClient;
Mono<Map<TopicPartition, List<Integer>>> generatePartitionReassignment(Set<String> topics,
List<Integer> brokerIds,
boolean rackAware) {
return Mono.zip(getCurrentAssignment(topics), getBrokersMetadata(brokerIds))
.map(t -> calculateAssignment(t.getT1(), t.getT2(), rackAware));
}
// [ topic -> [partition -> list of replica ids] ]
Mono<Map<TopicPartition, List<Integer>>> getCurrentAssignment(Set<String> topics) {
return adminClient.describeTopics(topics)
.map(topicToDescriptionMap -> topicToDescriptionMap.entrySet().stream()
.flatMap((Map.Entry<String, TopicDescription> e) ->
e.getValue().partitions().stream()
.map(p ->
Tuples.of(
new TopicPartition(e.getKey(), p.partition()),
p.replicas().stream().map(Node::id).toList())))
.collect(toMap(Tuple2::getT1, Tuple2::getT2)));
}
Mono<Void> validateAndExecute(List<Tuple2<TopicPartition, List<Integer>>> reassignment, Runnable preExecute) {
return validateCmd(reassignment)
.doOnNext(r -> preExecute.run())
.flatMap(adminClient::alterPartitionReassignments);
}
private Mono<Map<TopicPartition, Optional<NewPartitionReassignment>>> validateCmd(
List<Tuple2<TopicPartition, List<Integer>>> reassignment) {
if (reassignment.isEmpty()) {
throw new ValidationException("Partition reassignment list cannot be empty");
}
if (reassignment.stream().map(Tuple2::getT2).anyMatch(List::isEmpty)) {
throw new ValidationException("Partition replica list cannot be empty");
}
if (reassignment.stream().map(Tuple2::getT1).distinct().count() < reassignment.size()) {
throw new ValidationException("Partition reassignment contains duplicate topic partitions");
}
return adminClient.describeCluster()
.doOnNext(description -> {
var knownIds = description.getNodes().stream().map(Node::id).toList();
var unknownIds = reassignment.stream()
.flatMap(t -> t.getT2().stream())
.filter(id -> !knownIds.contains(id))
.toList();
if (!unknownIds.isEmpty()) {
throw new ValidationException("Unknown broker ids: " + unknownIds);
}
})
.thenReturn(reassignment.stream()
.collect(toMap(Tuple2::getT1, t -> Optional.of(new NewPartitionReassignment(t.getT2())))));
}
private Mono<List<BrokerMetadata>> getBrokersMetadata(List<Integer> brokerIds) {
return adminClient.describeCluster()
.map(description -> description.getNodes().stream()
.filter(n -> brokerIds.contains(n.id()))
.map(n -> new BrokerMetadata(n.id(), Optional.ofNullable(n.rack())))
.toList());
}
private static Map<TopicPartition, List<Integer>> calculateAssignment(
Map<TopicPartition, List<Integer>> currentAssignments,
List<BrokerMetadata> brokerMetadata,
boolean rackAware) {
Map<String, Map<TopicPartition, List<Integer>>> perTopic = currentAssignments.entrySet().stream()
.collect(groupingBy(e -> e.getKey().topic(), toMap(Map.Entry::getKey, Map.Entry::getValue)));
return rackAware
? calculateAssignmentRackAware(perTopic, brokerMetadata)
: calculateAssignmentRackUnaware(perTopic, brokerMetadata);
}
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackAware(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata) {
if (brokerMetadata.stream().anyMatch(m -> m.rack().isEmpty())) {
throw new ValidationException("Not all brokers have rack information for replica rack aware assignment");
}
log.warn("Rack-aware assignment calculation is not implemented yet, falling back to usual calculation");
return calculateAssignmentRackUnaware(currentAssignments, brokerMetadata);
}
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackUnaware(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata) {
Map<TopicPartition, List<Integer>> result = new LinkedHashMap<>();
currentAssignments.forEach((topic, currentAssignment) -> {
result.putAll(
assignReplicasToBrokersRackUnaware(
topic,
currentAssignment.size(),
currentAssignment.entrySet().iterator().next().getValue().size(),
brokerMetadata.stream().map(BrokerMetadata::id).collect(toList()),
ThreadLocalRandom.current()
)
);
});
return result;
}
// implementation copied from https://github.com/apache/kafka/blob/1874f2388cffa7a1e866cbe4aff8b92c9d953b41/core/src/main/scala/kafka/admin/AdminUtils.scala#L125
@VisibleForTesting
static Map<TopicPartition, List<Integer>> assignReplicasToBrokersRackUnaware(
String topic,
int numPartitions,
int replicationFactor,
List<Integer> brokerList,
RandomGenerator rand) {
var result = new LinkedHashMap<TopicPartition, List<Integer>>();
int startIndex = rand.nextInt(brokerList.size());
int currentPartitionId = 0;
int nextReplicaShift = startIndex;
for (int i = 0; i < numPartitions; i++) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) {
nextReplicaShift += 1;
}
int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size();
var replicaBuffer = Lists.newArrayList(brokerList.get(firstReplicaIndex));
for (int j = 0; j < replicationFactor - 1; j++) {
replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
}
result.put(new TopicPartition(topic, currentPartitionId), replicaBuffer);
currentPartitionId += 1;
}
return result;
}
private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int numBrokers) {
var shift = 1 + (secondReplicaShift + replicaIndex) % (numBrokers - 1);
return (firstReplicaIndex + shift) % numBrokers;
}
Mono<Void> cancelReassignment(Set<TopicPartition> partitions) {
return adminClient.listPartitionReassignments()
.map(reassignments -> reassignments.entrySet().stream()
.filter(e -> partitions.contains(e.getKey()))
.filter(e -> {
PartitionReassignment reassignment = e.getValue();
return !reassignment.addingReplicas().isEmpty()
|| !reassignment.removingReplicas().isEmpty();
})
.map(Map.Entry::getKey)
.collect(Collectors.toSet()))
.flatMap(tps -> tps.isEmpty()
? Mono.empty()
: adminClient.alterPartitionReassignments(tps.stream().collect(toMap(p -> p, p -> Optional.empty()))));
}
}

View file

@ -1,11 +0,0 @@
package com.provectus.kafka.ui.service.reassign;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.common.TopicPartition;
public record ReassignmentPlan(Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
}

View file

@ -1,151 +0,0 @@
package com.provectus.kafka.ui.service.reassign;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.PartitionReassignmentDTO;
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.random.RandomGenerator;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j
@RequiredArgsConstructor
public class ReassignmentPlanner {
record BrokerMetadata(int id, Optional<String> rack) {
}
private final ReactiveAdminClient adminClient;
public Mono<ReassignPartitionsCommandDTO> generatePartitionReassignment(Set<String> topics,
List<Integer> brokerIds,
boolean rackAware) {
return Mono.zip(currentAssignments(adminClient, topics), brokerMetadata(brokerIds)).map(t ->
createSuggestedReassignment(
calculateAssignment(t.getT1(), t.getT2(), rackAware)));
}
private static ReassignPartitionsCommandDTO createSuggestedReassignment(
Map<TopicPartition, List<Integer>> assignment) {
var dto = new ReassignPartitionsCommandDTO().version(1);
assignment.forEach((tp, replicas) ->
dto.addPartitionsItem(
new PartitionReassignmentDTO()
.topic(tp.topic())
.partition(tp.partition())
.replicas(replicas)
.logDirs(replicas.stream().map(r -> "any").toList())));
return dto;
}
// [ topic -> [tp -> list of replicas] ]
public static Mono<Map<String, Map<TopicPartition, List<Integer>>>> currentAssignments(ReactiveAdminClient ac, Set<String> topics) {
return ac.describeTopics(topics)
.map(topicToDescriptionMap ->
topicToDescriptionMap.entrySet().stream()
.map(e ->
Tuples.of(
e.getKey(),
e.getValue().partitions().stream()
.map(p ->
Tuples.of(
new TopicPartition(e.getKey(), p.partition()),
p.replicas().stream().map(Node::id).toList()
)).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2))
))
.collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2))
);
}
private Mono<List<BrokerMetadata>> brokerMetadata(List<Integer> brokerIds) {
return adminClient.describeCluster()
.map(description -> description.getNodes().stream()
.filter(n -> brokerIds.contains(n.id()))
.map(n -> new BrokerMetadata(n.id(), Optional.ofNullable(n.rack())))
.toList());
}
@VisibleForTesting
static Map<TopicPartition, List<Integer>> calculateAssignment(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata,
boolean rackAware) {
if (rackAware && brokerMetadata.stream().anyMatch(m -> m.rack().isEmpty())) {
throw new ValidationException("Not all brokers have rack information for replica rack aware assignment");
}
return rackAware
? calculateAssignmentRackAware(currentAssignments, brokerMetadata)
: calculateAssignmentRackUnaware(currentAssignments, brokerMetadata);
}
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackAware(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata) {
log.warn("Rack-aware assignment calculation is not implemented yet, falling back to usual calculation");
return calculateAssignmentRackUnaware(currentAssignments, brokerMetadata);
}
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackUnaware(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata) {
Map<TopicPartition, List<Integer>> result = new LinkedHashMap<>();
currentAssignments.forEach((topic, currentAssignment) -> {
result.putAll(
assignReplicasToBrokersRackUnaware(
topic,
currentAssignment.size(),
currentAssignment.entrySet().iterator().next().getValue().size(),
brokerMetadata.stream().map(BrokerMetadata::id).collect(Collectors.toList()),
ThreadLocalRandom.current()
)
);
});
return result;
}
static Map<TopicPartition, List<Integer>> assignReplicasToBrokersRackUnaware(
String topic,
int nPartitions,
int replicationFactor,
List<Integer> brokerList,
RandomGenerator rand) {
var result = new LinkedHashMap<TopicPartition, List<Integer>>();
int startIndex = rand.nextInt(brokerList.size());
int currentPartitionId = 0;
int nextReplicaShift = rand.nextInt(brokerList.size());
for (int i = 0; i < nPartitions; i++) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) {
nextReplicaShift += 1;
}
int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size();
var replicaBuffer = Lists.newArrayList(brokerList.get(firstReplicaIndex));
for (int j = 0; j < replicationFactor - 1; j++) {
replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
}
result.put(new TopicPartition(topic, currentPartitionId), replicaBuffer);
currentPartitionId += 1;
}
return result;
}
private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int nBrokers) {
var shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1);
return (firstReplicaIndex + shift) % nBrokers;
}
}

View file

@ -1,5 +1,10 @@
package com.provectus.kafka.ui.service.reassign;
import static com.provectus.kafka.ui.service.reassign.ReassignmentOperations.LOG_DIR_NOT_SET_STRING;
import static com.provectus.kafka.ui.service.reassign.ReassignmentOperations.SUPPORTED_CMD_VERSION;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.InProgressPartitionReassignmentDTO;
import com.provectus.kafka.ui.model.InProgressReassignmentDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
@ -11,11 +16,16 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j
@Service
@RequiredArgsConstructor
public class ReassignmentService {
@ -26,25 +36,55 @@ public class ReassignmentService {
Set<String> topics,
List<Integer> brokerIds) {
return adminClientService.get(cluster)
.map(ReassignmentPlanner::new)
.flatMap(planner -> planner.generatePartitionReassignment(topics, brokerIds, false));
.map(ReassignmentOperations::new)
.flatMap(planner -> planner.generatePartitionReassignment(topics, brokerIds, false))
.map(ReassignmentService::mapToReassignmentDto);
}
public Mono<Void> executeReassignment(KafkaCluster cluster, ReassignPartitionsCommandDTO cmd) {
return adminClientService.get(cluster)
.map(ReassignmentOperations::new)
.flatMap(ops -> {
if (!cmd.getPartitions().stream()
.flatMap(p -> p.getLogDirs().stream())
.allMatch(logDir -> logDir.equalsIgnoreCase(LOG_DIR_NOT_SET_STRING))) {
return Mono.error(new ValidationException("Log dir altering is not supported"));
}
List<Tuple2<TopicPartition, List<Integer>>> reassignment = cmd.getPartitions().stream()
.map(p -> Tuples.of(new TopicPartition(p.getTopic(), p.getPartition()), p.getReplicas()))
.toList();
return ops.validateAndExecute(reassignment, () -> logRequestedAssignment(cmd));
});
}
@SneakyThrows
private void logRequestedAssignment(ReassignPartitionsCommandDTO cmd) {
log.info("Executing partitions reassignment: \n{}",
new JsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(cmd));
}
public Mono<Void> cancelReassignment(KafkaCluster cluster, Set<TopicPartition> partitions) {
return adminClientService.get(cluster)
.map(ReassignmentOperations::new)
.flatMap(ops -> ops.cancelReassignment(partitions));
}
public Mono<ReassignPartitionsCommandDTO> getCurrentAssignment(KafkaCluster cluster,
Set<String> topics) {
return adminClientService.get(cluster)
.flatMap(ac -> ReassignmentPlanner.currentAssignments(ac, topics))
.map(this::map);
.map(ReassignmentOperations::new)
.flatMap(ops -> ops.getCurrentAssignment(topics))
.map(ReassignmentService::mapToReassignmentDto);
}
public Mono<InProgressReassignmentDTO> getInProgressAssignments(KafkaCluster cluster) {
return adminClientService.get(cluster)
.flatMap(ReactiveAdminClient::listPartitionReassignments)
.map(this::mapInProgressReassignments);
.map(ReassignmentService::mapToInProgressReassignmentsDto);
}
private InProgressReassignmentDTO mapInProgressReassignments(Map<TopicPartition, PartitionReassignment> reassignments) {
private static InProgressReassignmentDTO mapToInProgressReassignmentsDto(
Map<TopicPartition, PartitionReassignment> reassignments) {
return new InProgressReassignmentDTO()
.partitions(
reassignments.entrySet().stream()
@ -59,20 +99,20 @@ public class ReassignmentService {
);
}
private ReassignPartitionsCommandDTO map(Map<String, Map<TopicPartition, List<Integer>>> assignment) {
return new ReassignPartitionsCommandDTO()
.version(1)
.partitions(
assignment.values().stream()
.flatMap(m -> m.entrySet().stream())
.map(p -> new PartitionReassignmentDTO()
.topic(p.getKey().topic())
.partition(p.getKey().partition())
.replicas(p.getValue())
.logDirs(p.getValue().stream().map(r -> "any").toList())
)
.toList()
);
private static ReassignPartitionsCommandDTO mapToReassignmentDto(
Map<TopicPartition, List<Integer>> assignment) {
return new ReassignPartitionsCommandDTO()
.version(SUPPORTED_CMD_VERSION)
.partitions(assignment.entrySet().stream().map(ReassignmentService::mapPartitionAssignmentDto).toList());
}
private static PartitionReassignmentDTO mapPartitionAssignmentDto(Map.Entry<TopicPartition, List<Integer>>
partitionsAssignment) {
return new PartitionReassignmentDTO()
.topic(partitionsAssignment.getKey().topic())
.partition(partitionsAssignment.getKey().partition())
.replicas(partitionsAssignment.getValue())
.logDirs(partitionsAssignment.getValue().stream().map(r -> LOG_DIR_NOT_SET_STRING).toList());
}
}

View file

@ -19,6 +19,7 @@ import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.service.reassign.ReassignmentService;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@ -43,7 +44,7 @@ class TopicsServicePaginationTest {
private final ClusterMapper clusterMapper = new ClusterMapperImpl();
private final TopicsController topicsController = new TopicsController(
topicsService, mock(TopicAnalysisService.class), clusterMapper);
topicsService, mock(TopicAnalysisService.class), mock(ReassignmentService.class), clusterMapper);
private void init(Map<String, InternalTopic> topicsInCache) {

View file

@ -1,55 +0,0 @@
package com.provectus.kafka.ui.service.reassign;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
import java.util.List;
import org.junit.jupiter.api.Test;
class ReassignmentJsonDtoTest {
@Test
void canBeCreatedFromJsonString() {
var parsed = ReassignmentJsonDto.fromJson(
"{" +
" \"version\": 1, " +
" \"partitions\":" +
" [" +
" {" +
" \"topic\": \"my-topic\"," +
" \"partition\": 0, " +
" \"replicas\":" +
" [ " +
" 0, " +
" 1, " +
" 2 " +
" ], " +
" \"log_dirs\": " +
" [ " +
" \"any\", " +
" \"/user/share/kafka/p0\"," +
" \"any\"" +
" ]" +
" }" +
" ]" +
"}"
);
assertThat(parsed).isEqualTo(
ReassignPartitionsCommandDTO.builder()
.version(1)
.partitions(
List.of(
ReassignmentJsonDto.PartitionAssignmentDto.builder()
.topic("my-topic")
.partition(0)
.replicas(List.of(0, 1, 2))
.logDirs(List.of("any", "/user/share/kafka/p0", "any"))
.build()
)
)
.build()
);
}
}

View file

@ -0,0 +1,224 @@
package com.provectus.kafka.ui.service.reassign;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.random.RandomGenerator;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
class ReassignmentOperationsTest {
private static final String CONFLUENTINC_VERSION = "7.3.0";
private static final int KAFKA_1_ID = 1;
private static final int KAFKA_2_ID = 2;
private static final Network NETWORK = Network.newNetwork();
private static GenericContainer<?> ZK;
private static KafkaContainer KAFKA_1;
private static KafkaContainer KAFKA_2;
private static AdminClient ADMIN_CLIENT;
ReassignmentOperations ops = new ReassignmentOperations(ReactiveAdminClient.create(ADMIN_CLIENT).block());
@BeforeAll
static void init() {
ZK = new GenericContainer(
DockerImageName.parse("confluentinc/cp-zookeeper").withTag(CONFLUENTINC_VERSION))
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181")
.withNetworkAliases("zookeeper")
.withNetwork(NETWORK);
KAFKA_1 = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENTINC_VERSION))
.withNetwork(NETWORK)
.withEnv("KAFKA_BROKER_ID", KAFKA_1_ID + "")
.withExternalZookeeper("zookeeper:2181");
KAFKA_2 = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENTINC_VERSION))
.withNetwork(NETWORK)
.withEnv("KAFKA_BROKER_ID", KAFKA_2_ID + "")
.withExternalZookeeper("zookeeper:2181");
ZK.start();
KAFKA_1.start();
KAFKA_2.start();
Properties p = new Properties();
p.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_1.getBootstrapServers());
ADMIN_CLIENT = AdminClient.create(p);
}
@AfterAll
static void tearDown() {
ADMIN_CLIENT.close();
KAFKA_1.stop();
KAFKA_2.stop();
ZK.stop();
NETWORK.close();
}
@Test
void testGeneratePartitionReassignment() throws Exception {
String testTopic1 = "test_" + UUID.randomUUID();
String testTopic2 = "test_" + UUID.randomUUID();
ADMIN_CLIENT.createTopics(
List.of(
new NewTopic(testTopic1, 5, (short) 1),
new NewTopic(testTopic2, 3, (short) 2))
).all().get();
Map<TopicPartition, List<Integer>> assignment =
ops.generatePartitionReassignment(
Set.of(testTopic1, testTopic2), List.of(KAFKA_1_ID, KAFKA_2_ID), false).block();
var perTopicAssignments = assignment.entrySet().stream()
.collect(groupingBy(e -> e.getKey().topic(), toMap(Map.Entry::getKey, Map.Entry::getValue)));
verifyAssignment(testTopic1, 5, 1, perTopicAssignments.get(testTopic1));
verifyAssignment(testTopic2, 3, 2, perTopicAssignments.get(testTopic2));
}
@Test
void testGetCurrentAssignment() throws Exception {
String testTopic1 = "test_" + UUID.randomUUID();
String testTopic2 = "test_" + UUID.randomUUID();
ADMIN_CLIENT.createTopics(
List.of(
new NewTopic(testTopic1, 2, (short) 2),
new NewTopic(testTopic2, 2, (short) 2))
).all().get();
Map<TopicPartition, List<Integer>> currentAssignment =
ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block();
assertThat(currentAssignment.entrySet().stream())
.hasSize(4)
.allSatisfy(e -> {
TopicPartition partition = e.getKey();
List<Integer> replicas = e.getValue();
assertThat(partition.topic()).isIn(List.of(testTopic1, testTopic2));
assertThat(replicas).hasSize(2).containsExactlyInAnyOrder(KAFKA_1_ID, KAFKA_2_ID);
});
}
@Test
void testValidateAndExecute() throws Exception {
String testTopic1 = "test_" + UUID.randomUUID();
String testTopic2 = "test_" + UUID.randomUUID();
ADMIN_CLIENT.createTopics(
List.of(
new NewTopic(testTopic1, 2, (short) 2),
new NewTopic(testTopic2, 2, (short) 2))
).all().get();
Map<TopicPartition, List<Integer>> currentAssignment =
ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block();
Map<TopicPartition, List<Integer>> desiredAssignment = currentAssignment.entrySet().stream()
.map(e -> Tuples.of(e.getKey(), Lists.reverse(e.getValue()))) //reversing replicas list
.collect(toMap(Tuple2::getT1, Tuple2::getT2));
ops.validateAndExecute(
desiredAssignment.entrySet().stream().map(e -> Tuples.of(e.getKey(), e.getValue())).toList(), () -> {}).block();
Awaitility.await()
.pollInSameThread()
.atMost(Duration.ofSeconds(10))
.until(() -> ADMIN_CLIENT.listPartitionReassignments().reassignments().get().isEmpty());
Map<TopicPartition, List<Integer>> actualAssignment =
ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block();
assertThat(actualAssignment).containsExactlyInAnyOrderEntriesOf(desiredAssignment);
}
//test case copied from https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala#L198
@Test
void testAssignReplicasToBrokersRackUnaware() {
RandomGenerator rand = mock(RandomGenerator.class);
when(rand.nextInt(anyInt())).thenReturn(0);
var assignment = ReassignmentOperations.assignReplicasToBrokersRackUnaware(
"test",
10,
3,
List.of(0, 1, 2, 3, 4),
rand
);
assertThat(assignment)
.containsExactlyInAnyOrderEntriesOf(
Map.of(
new TopicPartition("test", 0), List.of(0, 1, 2),
new TopicPartition("test", 1), List.of(1, 2, 3),
new TopicPartition("test", 2), List.of(2, 3, 4),
new TopicPartition("test", 3), List.of(3, 4, 0),
new TopicPartition("test", 4), List.of(4, 0, 1),
new TopicPartition("test", 5), List.of(0, 2, 3),
new TopicPartition("test", 6), List.of(1, 3, 4),
new TopicPartition("test", 7), List.of(2, 4, 0),
new TopicPartition("test", 8), List.of(3, 0, 1),
new TopicPartition("test", 9), List.of(4, 1, 2)
)
);
}
@ParameterizedTest
@CsvSource({
"10, 3, 1",
"10, 3, 3",
"1, 10, 1",
"1, 10, 10",
})
void testAssignReplicasToBrokersRackUnawareWithRealRandom(int partitions, int brokersCnt, int replicationF) {
var assignment = ReassignmentOperations.assignReplicasToBrokersRackUnaware(
"test",
partitions,
replicationF,
IntStream.range(0, brokersCnt).boxed().toList(),
ThreadLocalRandom.current()
);
verifyAssignment("test", partitions, replicationF, assignment);
}
private void verifyAssignment(String topic, int numParts, int replicationFactor,
Map<TopicPartition, List<Integer>> assignmentToCheck) {
assertThat(assignmentToCheck.keySet())
.containsExactlyInAnyOrderElementsOf(
IntStream.range(0, numParts).mapToObj(i -> new TopicPartition(topic, i)).toList());
assertThat(assignmentToCheck.values().stream())
.allMatch(replicas ->
replicas.stream().distinct().count() == replicas.size() && replicas.size() == replicationFactor);
}
}

View file

@ -1764,12 +1764,14 @@ paths:
in: path
required: true
schema:
type: string
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/GeneratePartitionsReassignmentCommand'
type: array
items:
type: string
responses:
200:
description: OK
@ -1798,6 +1800,26 @@ paths:
200:
description: OK
/api/clusters/{clusterName}/partitionsreaassignments/cancel:
delete:
tags:
- Topics
operationId: cancelPartitionAssignment
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/PartitionReassignmentCancellation'
responses:
200:
description: OK
/api/clusters/{clusterName}/partitionsreaassignments/inprogress:
get:
tags:
@ -3240,6 +3262,7 @@ components:
ReassignPartitionsCommand:
type: object
description: "NOTE! This format used by kafka-reassign-partitions.sh command-line utility and should not be changed"
properties:
version:
type: integer
@ -3251,6 +3274,7 @@ components:
PartitionReassignment:
type: object
description: "NOTE! This format used by kafka-reassign-partitions.sh command-line utility and should not be changed"
properties:
topic:
type: string
@ -3313,3 +3337,17 @@ components:
items:
type: integer
format: int32
PartitionReassignmentCancellation:
type: object
properties:
partitions:
type: array
items:
type: object
properties:
topic:
type: string
partition:
type: integer
format: int32