Compare commits

...
Sign in to create a new pull request.

5 commits

Author SHA1 Message Date
iliax
c1e13ef0f8 endpoints improvement 2022-12-19 23:15:13 +04:00
Ilya Kuramshin
5255e05357
Merge branch 'master' into partitions_reassignment 2022-11-23 23:51:36 +04:00
iliax
4cf8f7a7db minot fix 2022-11-23 23:51:20 +04:00
iliax
14bd3086e9 Partitions-reassignment endpoints implemented 2022-11-23 23:33:26 +04:00
iliax
432c027c0b wip 2022-11-18 12:31:45 +04:00
7 changed files with 798 additions and 1 deletions

View file

@ -4,10 +4,14 @@ import static java.util.stream.Collectors.toList;
import com.provectus.kafka.ui.api.TopicsApi;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.GeneratePartitionsReassignmentCommandDTO;
import com.provectus.kafka.ui.model.InProgressReassignmentDTO;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.PartitionReassignmentCancellationDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
import com.provectus.kafka.ui.model.SortOrderDTO;
@ -21,12 +25,16 @@ import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.service.TopicsService;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.service.reassign.ReassignmentService;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
@ -43,6 +51,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
private final TopicsService topicsService;
private final TopicAnalysisService topicAnalysisService;
private final ReassignmentService reassignmentService;
private final ClusterMapper clusterMapper;
@Override
@ -209,4 +218,57 @@ public class TopicsController extends AbstractController implements TopicsApi {
.orElseGet(() -> ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>>
generatePartitionAssignment(String clusterName,
Mono<GeneratePartitionsReassignmentCommandDTO> reassignCmdDto,
ServerWebExchange exchange) {
return reassignCmdDto
.flatMap(generateDto ->
reassignmentService.generate(
getCluster(clusterName),
generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet()),
generateDto.getBrokerIds()))
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>> getCurrentPartitionAssignment(String clusterName,
List<String> topics,
ServerWebExchange exchange) {
return reassignmentService.getCurrentAssignment(getCluster(clusterName), new HashSet<>(topics))
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> executePartitionAssignment(String clusterName,
Mono<ReassignPartitionsCommandDTO> cmdDto,
ServerWebExchange exchange) {
return cmdDto
.flatMap(cmd -> reassignmentService.executeReassignment(getCluster(clusterName), cmd))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<InProgressReassignmentDTO>> getInProgressAssignments(String clusterName,
ServerWebExchange exchange) {
return reassignmentService.getInProgressAssignments(getCluster(clusterName))
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> cancelRunningPartitionAssignment(String clusterName,
Mono<PartitionReassignmentCancellationDTO> cancelDto,
ServerWebExchange exchange) {
return cancelDto
.flatMap(dto ->
reassignmentService.cancelReassignment(
getCluster(clusterName),
dto.getPartitions().stream()
.map(p -> new TopicPartition(p.getTopic(), p.getPartition()))
.collect(Collectors.toSet())))
.thenReturn(ResponseEntity.ok().build());
}
}

View file

@ -48,6 +48,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -357,6 +358,10 @@ public class ReactiveAdminClient implements Closeable {
return toMono(client.alterPartitionReassignments(reassignments).all());
}
public Mono<Map<TopicPartition, PartitionReassignment>> listPartitionReassignments() {
return toMono(client.listPartitionReassignments().reassignments());
}
public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
return toMono(client.createPartitions(newPartitionsMap).all());
}

View file

@ -0,0 +1,188 @@
package com.provectus.kafka.ui.service.reassign;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.random.RandomGenerator;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j
@RequiredArgsConstructor
class ReassignmentOperations {
public static final int SUPPORTED_CMD_VERSION = 1;
public static final String LOG_DIR_NOT_SET_STRING = "any";
private record BrokerMetadata(int id, Optional<String> rack) {}
private final ReactiveAdminClient adminClient;
Mono<Map<TopicPartition, List<Integer>>> generatePartitionReassignment(Set<String> topics,
List<Integer> brokerIds,
boolean rackAware) {
return Mono.zip(getCurrentAssignment(topics), getBrokersMetadata(brokerIds))
.map(t -> calculateAssignment(t.getT1(), t.getT2(), rackAware));
}
// [ topic -> [partition -> list of replica ids] ]
Mono<Map<TopicPartition, List<Integer>>> getCurrentAssignment(Set<String> topics) {
return adminClient.describeTopics(topics)
.map(topicToDescriptionMap -> topicToDescriptionMap.entrySet().stream()
.flatMap((Map.Entry<String, TopicDescription> e) ->
e.getValue().partitions().stream()
.map(p ->
Tuples.of(
new TopicPartition(e.getKey(), p.partition()),
p.replicas().stream().map(Node::id).toList())))
.collect(toMap(Tuple2::getT1, Tuple2::getT2)));
}
Mono<Void> validateAndExecute(List<Tuple2<TopicPartition, List<Integer>>> reassignment, Runnable preExecute) {
return validateCmd(reassignment)
.doOnNext(r -> preExecute.run())
.flatMap(adminClient::alterPartitionReassignments);
}
private Mono<Map<TopicPartition, Optional<NewPartitionReassignment>>> validateCmd(
List<Tuple2<TopicPartition, List<Integer>>> reassignment) {
if (reassignment.isEmpty()) {
throw new ValidationException("Partition reassignment list cannot be empty");
}
if (reassignment.stream().map(Tuple2::getT2).anyMatch(List::isEmpty)) {
throw new ValidationException("Partition replica list cannot be empty");
}
if (reassignment.stream().map(Tuple2::getT1).distinct().count() < reassignment.size()) {
throw new ValidationException("Partition reassignment contains duplicate topic partitions");
}
return adminClient.describeCluster()
.doOnNext(description -> {
var knownIds = description.getNodes().stream().map(Node::id).toList();
var unknownIds = reassignment.stream()
.flatMap(t -> t.getT2().stream())
.filter(id -> !knownIds.contains(id))
.toList();
if (!unknownIds.isEmpty()) {
throw new ValidationException("Unknown broker ids: " + unknownIds);
}
})
.thenReturn(reassignment.stream()
.collect(toMap(Tuple2::getT1, t -> Optional.of(new NewPartitionReassignment(t.getT2())))));
}
private Mono<List<BrokerMetadata>> getBrokersMetadata(List<Integer> brokerIds) {
return adminClient.describeCluster()
.map(description -> description.getNodes().stream()
.filter(n -> brokerIds.contains(n.id()))
.map(n -> new BrokerMetadata(n.id(), Optional.ofNullable(n.rack())))
.toList());
}
private static Map<TopicPartition, List<Integer>> calculateAssignment(
Map<TopicPartition, List<Integer>> currentAssignments,
List<BrokerMetadata> brokerMetadata,
boolean rackAware) {
Map<String, Map<TopicPartition, List<Integer>>> perTopic = currentAssignments.entrySet().stream()
.collect(groupingBy(e -> e.getKey().topic(), toMap(Map.Entry::getKey, Map.Entry::getValue)));
return rackAware
? calculateAssignmentRackAware(perTopic, brokerMetadata)
: calculateAssignmentRackUnaware(perTopic, brokerMetadata);
}
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackAware(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata) {
if (brokerMetadata.stream().anyMatch(m -> m.rack().isEmpty())) {
throw new ValidationException("Not all brokers have rack information for replica rack aware assignment");
}
log.warn("Rack-aware assignment calculation is not implemented yet, falling back to usual calculation");
return calculateAssignmentRackUnaware(currentAssignments, brokerMetadata);
}
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackUnaware(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata) {
Map<TopicPartition, List<Integer>> result = new LinkedHashMap<>();
currentAssignments.forEach((topic, currentAssignment) -> {
result.putAll(
assignReplicasToBrokersRackUnaware(
topic,
currentAssignment.size(),
currentAssignment.entrySet().iterator().next().getValue().size(),
brokerMetadata.stream().map(BrokerMetadata::id).collect(toList()),
ThreadLocalRandom.current()
)
);
});
return result;
}
// implementation copied from https://github.com/apache/kafka/blob/1874f2388cffa7a1e866cbe4aff8b92c9d953b41/core/src/main/scala/kafka/admin/AdminUtils.scala#L125
@VisibleForTesting
static Map<TopicPartition, List<Integer>> assignReplicasToBrokersRackUnaware(
String topic,
int numPartitions,
int replicationFactor,
List<Integer> brokerList,
RandomGenerator rand) {
var result = new LinkedHashMap<TopicPartition, List<Integer>>();
int startIndex = rand.nextInt(brokerList.size());
int currentPartitionId = 0;
int nextReplicaShift = startIndex;
for (int i = 0; i < numPartitions; i++) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) {
nextReplicaShift += 1;
}
int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size();
var replicaBuffer = Lists.newArrayList(brokerList.get(firstReplicaIndex));
for (int j = 0; j < replicationFactor - 1; j++) {
replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
}
result.put(new TopicPartition(topic, currentPartitionId), replicaBuffer);
currentPartitionId += 1;
}
return result;
}
private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int numBrokers) {
var shift = 1 + (secondReplicaShift + replicaIndex) % (numBrokers - 1);
return (firstReplicaIndex + shift) % numBrokers;
}
Mono<Void> cancelReassignment(Set<TopicPartition> partitions) {
return adminClient.listPartitionReassignments()
.map(reassignments -> reassignments.entrySet().stream()
.filter(e -> partitions.contains(e.getKey()))
.filter(e -> {
PartitionReassignment reassignment = e.getValue();
return !reassignment.addingReplicas().isEmpty()
|| !reassignment.removingReplicas().isEmpty();
})
.map(Map.Entry::getKey)
.collect(Collectors.toSet()))
.flatMap(tps -> tps.isEmpty()
? Mono.empty()
: adminClient.alterPartitionReassignments(tps.stream().collect(toMap(p -> p, p -> Optional.empty()))));
}
}

View file

@ -0,0 +1,118 @@
package com.provectus.kafka.ui.service.reassign;
import static com.provectus.kafka.ui.service.reassign.ReassignmentOperations.LOG_DIR_NOT_SET_STRING;
import static com.provectus.kafka.ui.service.reassign.ReassignmentOperations.SUPPORTED_CMD_VERSION;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.InProgressPartitionReassignmentDTO;
import com.provectus.kafka.ui.model.InProgressReassignmentDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.PartitionReassignmentDTO;
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j
@Service
@RequiredArgsConstructor
public class ReassignmentService {
private final AdminClientService adminClientService;
public Mono<ReassignPartitionsCommandDTO> generate(KafkaCluster cluster,
Set<String> topics,
List<Integer> brokerIds) {
return adminClientService.get(cluster)
.map(ReassignmentOperations::new)
.flatMap(planner -> planner.generatePartitionReassignment(topics, brokerIds, false))
.map(ReassignmentService::mapToReassignmentDto);
}
public Mono<Void> executeReassignment(KafkaCluster cluster, ReassignPartitionsCommandDTO cmd) {
return adminClientService.get(cluster)
.map(ReassignmentOperations::new)
.flatMap(ops -> {
if (!cmd.getPartitions().stream()
.flatMap(p -> p.getLogDirs().stream())
.allMatch(logDir -> logDir.equalsIgnoreCase(LOG_DIR_NOT_SET_STRING))) {
return Mono.error(new ValidationException("Log dir altering is not supported"));
}
List<Tuple2<TopicPartition, List<Integer>>> reassignment = cmd.getPartitions().stream()
.map(p -> Tuples.of(new TopicPartition(p.getTopic(), p.getPartition()), p.getReplicas()))
.toList();
return ops.validateAndExecute(reassignment, () -> logRequestedAssignment(cmd));
});
}
@SneakyThrows
private void logRequestedAssignment(ReassignPartitionsCommandDTO cmd) {
log.info("Executing partitions reassignment: \n{}",
new JsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(cmd));
}
public Mono<Void> cancelReassignment(KafkaCluster cluster, Set<TopicPartition> partitions) {
return adminClientService.get(cluster)
.map(ReassignmentOperations::new)
.flatMap(ops -> ops.cancelReassignment(partitions));
}
public Mono<ReassignPartitionsCommandDTO> getCurrentAssignment(KafkaCluster cluster,
Set<String> topics) {
return adminClientService.get(cluster)
.map(ReassignmentOperations::new)
.flatMap(ops -> ops.getCurrentAssignment(topics))
.map(ReassignmentService::mapToReassignmentDto);
}
public Mono<InProgressReassignmentDTO> getInProgressAssignments(KafkaCluster cluster) {
return adminClientService.get(cluster)
.flatMap(ReactiveAdminClient::listPartitionReassignments)
.map(ReassignmentService::mapToInProgressReassignmentsDto);
}
private static InProgressReassignmentDTO mapToInProgressReassignmentsDto(
Map<TopicPartition, PartitionReassignment> reassignments) {
return new InProgressReassignmentDTO()
.partitions(
reassignments.entrySet().stream()
.map(e -> new InProgressPartitionReassignmentDTO()
.topic(e.getKey().topic())
.partition(e.getKey().partition())
.currentReplicas(e.getValue().replicas())
.addingReplicas(e.getValue().addingReplicas())
.removingReplicas(e.getValue().removingReplicas())
)
.toList()
);
}
private static ReassignPartitionsCommandDTO mapToReassignmentDto(
Map<TopicPartition, List<Integer>> assignment) {
return new ReassignPartitionsCommandDTO()
.version(SUPPORTED_CMD_VERSION)
.partitions(assignment.entrySet().stream().map(ReassignmentService::mapPartitionAssignmentDto).toList());
}
private static PartitionReassignmentDTO mapPartitionAssignmentDto(Map.Entry<TopicPartition, List<Integer>>
partitionsAssignment) {
return new PartitionReassignmentDTO()
.topic(partitionsAssignment.getKey().topic())
.partition(partitionsAssignment.getKey().partition())
.replicas(partitionsAssignment.getValue())
.logDirs(partitionsAssignment.getValue().stream().map(r -> LOG_DIR_NOT_SET_STRING).toList());
}
}

View file

@ -19,6 +19,7 @@ import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.service.reassign.ReassignmentService;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@ -43,7 +44,7 @@ class TopicsServicePaginationTest {
private final ClusterMapper clusterMapper = new ClusterMapperImpl();
private final TopicsController topicsController = new TopicsController(
topicsService, mock(TopicAnalysisService.class), clusterMapper);
topicsService, mock(TopicAnalysisService.class), mock(ReassignmentService.class), clusterMapper);
private void init(Map<String, InternalTopic> topicsInCache) {

View file

@ -0,0 +1,224 @@
package com.provectus.kafka.ui.service.reassign;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.random.RandomGenerator;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
class ReassignmentOperationsTest {
private static final String CONFLUENTINC_VERSION = "7.3.0";
private static final int KAFKA_1_ID = 1;
private static final int KAFKA_2_ID = 2;
private static final Network NETWORK = Network.newNetwork();
private static GenericContainer<?> ZK;
private static KafkaContainer KAFKA_1;
private static KafkaContainer KAFKA_2;
private static AdminClient ADMIN_CLIENT;
ReassignmentOperations ops = new ReassignmentOperations(ReactiveAdminClient.create(ADMIN_CLIENT).block());
@BeforeAll
static void init() {
ZK = new GenericContainer(
DockerImageName.parse("confluentinc/cp-zookeeper").withTag(CONFLUENTINC_VERSION))
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181")
.withNetworkAliases("zookeeper")
.withNetwork(NETWORK);
KAFKA_1 = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENTINC_VERSION))
.withNetwork(NETWORK)
.withEnv("KAFKA_BROKER_ID", KAFKA_1_ID + "")
.withExternalZookeeper("zookeeper:2181");
KAFKA_2 = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENTINC_VERSION))
.withNetwork(NETWORK)
.withEnv("KAFKA_BROKER_ID", KAFKA_2_ID + "")
.withExternalZookeeper("zookeeper:2181");
ZK.start();
KAFKA_1.start();
KAFKA_2.start();
Properties p = new Properties();
p.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_1.getBootstrapServers());
ADMIN_CLIENT = AdminClient.create(p);
}
@AfterAll
static void tearDown() {
ADMIN_CLIENT.close();
KAFKA_1.stop();
KAFKA_2.stop();
ZK.stop();
NETWORK.close();
}
@Test
void testGeneratePartitionReassignment() throws Exception {
String testTopic1 = "test_" + UUID.randomUUID();
String testTopic2 = "test_" + UUID.randomUUID();
ADMIN_CLIENT.createTopics(
List.of(
new NewTopic(testTopic1, 5, (short) 1),
new NewTopic(testTopic2, 3, (short) 2))
).all().get();
Map<TopicPartition, List<Integer>> assignment =
ops.generatePartitionReassignment(
Set.of(testTopic1, testTopic2), List.of(KAFKA_1_ID, KAFKA_2_ID), false).block();
var perTopicAssignments = assignment.entrySet().stream()
.collect(groupingBy(e -> e.getKey().topic(), toMap(Map.Entry::getKey, Map.Entry::getValue)));
verifyAssignment(testTopic1, 5, 1, perTopicAssignments.get(testTopic1));
verifyAssignment(testTopic2, 3, 2, perTopicAssignments.get(testTopic2));
}
@Test
void testGetCurrentAssignment() throws Exception {
String testTopic1 = "test_" + UUID.randomUUID();
String testTopic2 = "test_" + UUID.randomUUID();
ADMIN_CLIENT.createTopics(
List.of(
new NewTopic(testTopic1, 2, (short) 2),
new NewTopic(testTopic2, 2, (short) 2))
).all().get();
Map<TopicPartition, List<Integer>> currentAssignment =
ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block();
assertThat(currentAssignment.entrySet().stream())
.hasSize(4)
.allSatisfy(e -> {
TopicPartition partition = e.getKey();
List<Integer> replicas = e.getValue();
assertThat(partition.topic()).isIn(List.of(testTopic1, testTopic2));
assertThat(replicas).hasSize(2).containsExactlyInAnyOrder(KAFKA_1_ID, KAFKA_2_ID);
});
}
@Test
void testValidateAndExecute() throws Exception {
String testTopic1 = "test_" + UUID.randomUUID();
String testTopic2 = "test_" + UUID.randomUUID();
ADMIN_CLIENT.createTopics(
List.of(
new NewTopic(testTopic1, 2, (short) 2),
new NewTopic(testTopic2, 2, (short) 2))
).all().get();
Map<TopicPartition, List<Integer>> currentAssignment =
ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block();
Map<TopicPartition, List<Integer>> desiredAssignment = currentAssignment.entrySet().stream()
.map(e -> Tuples.of(e.getKey(), Lists.reverse(e.getValue()))) //reversing replicas list
.collect(toMap(Tuple2::getT1, Tuple2::getT2));
ops.validateAndExecute(
desiredAssignment.entrySet().stream().map(e -> Tuples.of(e.getKey(), e.getValue())).toList(), () -> {}).block();
Awaitility.await()
.pollInSameThread()
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
Map<TopicPartition, List<Integer>> actualAssignment =
ops.getCurrentAssignment(Set.of(testTopic1, testTopic2)).block();
assertThat(actualAssignment).containsExactlyInAnyOrderEntriesOf(desiredAssignment);
});
}
//test case copied from https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/core/src/test/scala/unit/kafka/admin/AdminRackAwareTest.scala#L198
@Test
void testAssignReplicasToBrokersRackUnaware() {
RandomGenerator rand = mock(RandomGenerator.class);
when(rand.nextInt(anyInt())).thenReturn(0);
var assignment = ReassignmentOperations.assignReplicasToBrokersRackUnaware(
"test",
10,
3,
List.of(0, 1, 2, 3, 4),
rand
);
assertThat(assignment)
.containsExactlyInAnyOrderEntriesOf(
Map.of(
new TopicPartition("test", 0), List.of(0, 1, 2),
new TopicPartition("test", 1), List.of(1, 2, 3),
new TopicPartition("test", 2), List.of(2, 3, 4),
new TopicPartition("test", 3), List.of(3, 4, 0),
new TopicPartition("test", 4), List.of(4, 0, 1),
new TopicPartition("test", 5), List.of(0, 2, 3),
new TopicPartition("test", 6), List.of(1, 3, 4),
new TopicPartition("test", 7), List.of(2, 4, 0),
new TopicPartition("test", 8), List.of(3, 0, 1),
new TopicPartition("test", 9), List.of(4, 1, 2)
)
);
}
@ParameterizedTest
@CsvSource({
"10, 3, 1",
"10, 3, 3",
"1, 10, 1",
"1, 10, 10",
})
void testAssignReplicasToBrokersRackUnawareWithRealRandom(int partitions, int brokersCnt, int replicationF) {
var assignment = ReassignmentOperations.assignReplicasToBrokersRackUnaware(
"test",
partitions,
replicationF,
IntStream.range(0, brokersCnt).boxed().toList(),
ThreadLocalRandom.current()
);
verifyAssignment("test", partitions, replicationF, assignment);
}
private void verifyAssignment(String topic, int numParts, int replicationFactor,
Map<TopicPartition, List<Integer>> assignmentToCheck) {
assertThat(assignmentToCheck.keySet())
.containsExactlyInAnyOrderElementsOf(
IntStream.range(0, numParts).mapToObj(i -> new TopicPartition(topic, i)).toList());
assertThat(assignmentToCheck.values().stream())
.allMatch(replicas ->
replicas.stream().distinct().count() == replicas.size() && replicas.size() == replicationFactor);
}
}

View file

@ -1729,6 +1729,113 @@ paths:
$ref: '#/components/schemas/PartitionsIncreaseResponse'
404:
description: Not found
/api/clusters/{clusterName}/partitionsreaassignments/suggestions:
post:
tags:
- Topics
operationId: generatePartitionAssignment
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/GeneratePartitionsReassignmentCommand'
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/ReassignPartitionsCommand'
/api/clusters/{clusterName}/partitionsreaassignments/current:
get:
tags:
- Topics
operationId: getCurrentPartitionAssignment
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: topics
required: true
in: query
description: topic names for which assignments should be returned
schema:
type: array
items:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/ReassignPartitionsCommand'
/api/clusters/{clusterName}/partitionsreaassignments/running:
post:
tags:
- Topics
operationId: executePartitionAssignment
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ReassignPartitionsCommand'
responses:
200:
description: OK
delete:
tags:
- Topics
operationId: cancelRunningPartitionAssignment
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/PartitionReassignmentCancellation'
responses:
200:
description: OK
get:
tags:
- Topics
operationId: getInProgressAssignments
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/InProgressReassignment'
/api/info/timestampformat:
get:
tags:
@ -3149,3 +3256,95 @@ components:
- COMPACT
- COMPACT_DELETE
- UNKNOWN
ReassignPartitionsCommand:
type: object
description: "NOTE! This format used by kafka-reassign-partitions.sh command-line utility and should not be changed"
properties:
version:
type: integer
format: int32
partitions:
type: array
items:
$ref: "#/components/schemas/PartitionReassignment"
PartitionReassignment:
type: object
description: "NOTE! This format used by kafka-reassign-partitions.sh command-line utility and should not be changed"
properties:
topic:
type: string
partition:
type: integer
replicas:
type: array
items:
type: integer
log_dirs:
type: array
items:
type: string
GeneratePartitionsReassignmentCommand:
type: object
properties:
version:
type: integer
format: int32
broker_ids:
type: array
items:
type: integer
format: int32
topics:
type: array
items:
type: object
properties:
topic:
type: string
InProgressReassignment:
type: object
properties:
partitions:
type: array
items:
$ref: "#/components/schemas/InProgressPartitionReassignment"
InProgressPartitionReassignment:
type: object
properties:
topic:
type: string
partition:
type: integer
format: int32
currentReplicas:
type: array
items:
type: integer
format: int32
addingReplicas:
items:
type: integer
format: int32
removingReplicas:
items:
type: integer
format: int32
PartitionReassignmentCancellation:
type: object
properties:
partitions:
type: array
items:
type: object
properties:
topic:
type: string
partition:
type: integer
format: int32