This commit is contained in:
iliax 2022-11-18 12:31:45 +04:00
parent 7fcbf7507b
commit 432c027c0b
8 changed files with 514 additions and 0 deletions

View file

@ -4,10 +4,13 @@ import static java.util.stream.Collectors.toList;
import com.provectus.kafka.ui.api.TopicsApi;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.GeneratePartitionsReassignmentCommandDTO;
import com.provectus.kafka.ui.model.InProgressReassignmentDTO;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
import com.provectus.kafka.ui.model.SortOrderDTO;
@ -21,8 +24,10 @@ import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.service.TopicsService;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.service.reassign.ReassignmentService;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -43,6 +48,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
private final TopicsService topicsService;
private final TopicAnalysisService topicAnalysisService;
private final ReassignmentService reassignmentService;
private final ClusterMapper clusterMapper;
@Override
@ -209,4 +215,44 @@ public class TopicsController extends AbstractController implements TopicsApi {
.orElseGet(() -> ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>> generatePartitionAssignment(String clusterName,
Mono<GeneratePartitionsReassignmentCommandDTO> generatePartitionsReassignmentCommandDTO,
ServerWebExchange exchange) {
return generatePartitionsReassignmentCommandDTO
.flatMap(generateDto ->
reassignmentService.generate(
getCluster(clusterName),
generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet()),
generateDto.getBrokerIds()))
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>> getCurrentPartitionAssignment(String clusterName,
Mono<GeneratePartitionsReassignmentCommandDTO> generatePartitionsReassignmentCommandDTO,
ServerWebExchange exchange) {
return generatePartitionsReassignmentCommandDTO
.flatMap(generateDto ->
reassignmentService.getCurrentAssignment(
getCluster(clusterName),
generateDto.getTopics().stream().map(t -> t.getTopic()).collect(Collectors.toSet())))
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> executePartitionAssignment(String clusterName,
Mono<ReassignPartitionsCommandDTO> reassignPartitionsCommandDTO,
ServerWebExchange exchange) {
return null;
}
@Override
public Mono<ResponseEntity<InProgressReassignmentDTO>> getInProgressAssignments(String clusterName,
ServerWebExchange exchange) {
return reassignmentService.getInProgressAssignments(getCluster(clusterName))
.map(ResponseEntity::ok);
}
}

View file

@ -48,6 +48,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -357,6 +358,10 @@ public class ReactiveAdminClient implements Closeable {
return toMono(client.alterPartitionReassignments(reassignments).all());
}
public Mono<Map<TopicPartition, PartitionReassignment>> listPartitionReassignments() {
return toMono(client.listPartitionReassignments().reassignments());
}
public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
return toMono(client.createPartitions(newPartitionsMap).all());
}

View file

@ -0,0 +1,4 @@
package com.provectus.kafka.ui.service.reassign;
public class ReassignementsStore {
}

View file

@ -0,0 +1,11 @@
package com.provectus.kafka.ui.service.reassign;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.common.TopicPartition;
public record ReassignmentPlan(Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
}

View file

@ -0,0 +1,151 @@
package com.provectus.kafka.ui.service.reassign;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.PartitionReassignmentDTO;
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.random.RandomGenerator;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j
@RequiredArgsConstructor
public class ReassignmentPlanner {
record BrokerMetadata(int id, Optional<String> rack) {
}
private final ReactiveAdminClient adminClient;
public Mono<ReassignPartitionsCommandDTO> generatePartitionReassignment(Set<String> topics,
List<Integer> brokerIds,
boolean rackAware) {
return Mono.zip(currentAssignments(adminClient, topics), brokerMetadata(brokerIds)).map(t ->
createSuggestedReassignment(
calculateAssignment(t.getT1(), t.getT2(), rackAware)));
}
private static ReassignPartitionsCommandDTO createSuggestedReassignment(
Map<TopicPartition, List<Integer>> assignment) {
var dto = new ReassignPartitionsCommandDTO().version(1);
assignment.forEach((tp, replicas) ->
dto.addPartitionsItem(
new PartitionReassignmentDTO()
.topic(tp.topic())
.partition(tp.partition())
.replicas(replicas)
.logDirs(replicas.stream().map(r -> "any").toList())));
return dto;
}
// [ topic -> [tp -> list of replicas] ]
public static Mono<Map<String, Map<TopicPartition, List<Integer>>>> currentAssignments(ReactiveAdminClient ac, Set<String> topics) {
return ac.describeTopics(topics)
.map(topicToDescriptionMap ->
topicToDescriptionMap.entrySet().stream()
.map(e ->
Tuples.of(
e.getKey(),
e.getValue().partitions().stream()
.map(p ->
Tuples.of(
new TopicPartition(e.getKey(), p.partition()),
p.replicas().stream().map(Node::id).toList()
)).collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2))
))
.collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2))
);
}
private Mono<List<BrokerMetadata>> brokerMetadata(List<Integer> brokerIds) {
return adminClient.describeCluster()
.map(description -> description.getNodes().stream()
.filter(n -> brokerIds.contains(n.id()))
.map(n -> new BrokerMetadata(n.id(), Optional.ofNullable(n.rack())))
.toList());
}
@VisibleForTesting
static Map<TopicPartition, List<Integer>> calculateAssignment(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata,
boolean rackAware) {
if (rackAware && brokerMetadata.stream().anyMatch(m -> m.rack().isEmpty())) {
throw new ValidationException("Not all brokers have rack information for replica rack aware assignment");
}
return rackAware
? calculateAssignmentRackAware(currentAssignments, brokerMetadata)
: calculateAssignmentRackUnaware(currentAssignments, brokerMetadata);
}
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackAware(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata) {
log.warn("Rack-aware assignment calculation is not implemented yet, falling back to usual calculation");
return calculateAssignmentRackUnaware(currentAssignments, brokerMetadata);
}
private static Map<TopicPartition, List<Integer>> calculateAssignmentRackUnaware(
Map<String, Map<TopicPartition, List<Integer>>> currentAssignments,
List<BrokerMetadata> brokerMetadata) {
Map<TopicPartition, List<Integer>> result = new LinkedHashMap<>();
currentAssignments.forEach((topic, currentAssignment) -> {
result.putAll(
assignReplicasToBrokersRackUnaware(
topic,
currentAssignment.size(),
currentAssignment.entrySet().iterator().next().getValue().size(),
brokerMetadata.stream().map(BrokerMetadata::id).collect(Collectors.toList()),
ThreadLocalRandom.current()
)
);
});
return result;
}
static Map<TopicPartition, List<Integer>> assignReplicasToBrokersRackUnaware(
String topic,
int nPartitions,
int replicationFactor,
List<Integer> brokerList,
RandomGenerator rand) {
var result = new LinkedHashMap<TopicPartition, List<Integer>>();
int startIndex = rand.nextInt(brokerList.size());
int currentPartitionId = 0;
int nextReplicaShift = rand.nextInt(brokerList.size());
for (int i = 0; i < nPartitions; i++) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size() == 0)) {
nextReplicaShift += 1;
}
int firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size();
var replicaBuffer = Lists.newArrayList(brokerList.get(firstReplicaIndex));
for (int j = 0; j < replicationFactor - 1; j++) {
replicaBuffer.add(brokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size())));
}
result.put(new TopicPartition(topic, currentPartitionId), replicaBuffer);
currentPartitionId += 1;
}
return result;
}
private static int replicaIndex(int firstReplicaIndex, int secondReplicaShift, int replicaIndex, int nBrokers) {
var shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1);
return (firstReplicaIndex + shift) % nBrokers;
}
}

View file

@ -0,0 +1,78 @@
package com.provectus.kafka.ui.service.reassign;
import com.provectus.kafka.ui.model.InProgressPartitionReassignmentDTO;
import com.provectus.kafka.ui.model.InProgressReassignmentDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.PartitionReassignmentDTO;
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.PartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
public class ReassignmentService {
private final AdminClientService adminClientService;
public Mono<ReassignPartitionsCommandDTO> generate(KafkaCluster cluster,
Set<String> topics,
List<Integer> brokerIds) {
return adminClientService.get(cluster)
.map(ReassignmentPlanner::new)
.flatMap(planner -> planner.generatePartitionReassignment(topics, brokerIds, false));
}
public Mono<ReassignPartitionsCommandDTO> getCurrentAssignment(KafkaCluster cluster,
Set<String> topics) {
return adminClientService.get(cluster)
.flatMap(ac -> ReassignmentPlanner.currentAssignments(ac, topics))
.map(this::map);
}
public Mono<InProgressReassignmentDTO> getInProgressAssignments(KafkaCluster cluster) {
return adminClientService.get(cluster)
.flatMap(ReactiveAdminClient::listPartitionReassignments)
.map(this::mapInProgressReassignments);
}
private InProgressReassignmentDTO mapInProgressReassignments(Map<TopicPartition, PartitionReassignment> reassignments) {
return new InProgressReassignmentDTO()
.partitions(
reassignments.entrySet().stream()
.map(e -> new InProgressPartitionReassignmentDTO()
.topic(e.getKey().topic())
.partition(e.getKey().partition())
.currentReplicas(e.getValue().replicas())
.addingReplicas(e.getValue().addingReplicas())
.removingReplicas(e.getValue().removingReplicas())
)
.toList()
);
}
private ReassignPartitionsCommandDTO map(Map<String, Map<TopicPartition, List<Integer>>> assignment) {
return new ReassignPartitionsCommandDTO()
.version(1)
.partitions(
assignment.values().stream()
.flatMap(m -> m.entrySet().stream())
.map(p -> new PartitionReassignmentDTO()
.topic(p.getKey().topic())
.partition(p.getKey().partition())
.replicas(p.getValue())
.logDirs(p.getValue().stream().map(r -> "any").toList())
)
.toList()
);
}
}

View file

@ -0,0 +1,55 @@
package com.provectus.kafka.ui.service.reassign;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.model.ReassignPartitionsCommandDTO;
import java.util.List;
import org.junit.jupiter.api.Test;
class ReassignmentJsonDtoTest {
@Test
void canBeCreatedFromJsonString() {
var parsed = ReassignmentJsonDto.fromJson(
"{" +
" \"version\": 1, " +
" \"partitions\":" +
" [" +
" {" +
" \"topic\": \"my-topic\"," +
" \"partition\": 0, " +
" \"replicas\":" +
" [ " +
" 0, " +
" 1, " +
" 2 " +
" ], " +
" \"log_dirs\": " +
" [ " +
" \"any\", " +
" \"/user/share/kafka/p0\"," +
" \"any\"" +
" ]" +
" }" +
" ]" +
"}"
);
assertThat(parsed).isEqualTo(
ReassignPartitionsCommandDTO.builder()
.version(1)
.partitions(
List.of(
ReassignmentJsonDto.PartitionAssignmentDto.builder()
.topic("my-topic")
.partition(0)
.replicas(List.of(0, 1, 2))
.logDirs(List.of("any", "/user/share/kafka/p0", "any"))
.build()
)
)
.build()
);
}
}

View file

@ -1729,6 +1729,94 @@ paths:
$ref: '#/components/schemas/PartitionsIncreaseResponse'
404:
description: Not found
/api/clusters/{clusterName}/partitionsreaassignments/generate:
post:
tags:
- Topics
operationId: generatePartitionAssignment
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/GeneratePartitionsReassignmentCommand'
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/ReassignPartitionsCommand'
/api/clusters/{clusterName}/partitionsreaassignments/current:
post:
tags:
- Topics
operationId: getCurrentPartitionAssignment
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/GeneratePartitionsReassignmentCommand'
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/ReassignPartitionsCommand'
/api/clusters/{clusterName}/partitionsreaassignments/execute:
post:
tags:
- Topics
operationId: executePartitionAssignment
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ReassignPartitionsCommand'
responses:
200:
description: OK
/api/clusters/{clusterName}/partitionsreaassignments/inprogress:
get:
tags:
- Topics
operationId: getInProgressAssignments
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/InProgressReassignment'
/api/info/timestampformat:
get:
tags:
@ -3149,3 +3237,79 @@ components:
- COMPACT
- COMPACT_DELETE
- UNKNOWN
ReassignPartitionsCommand:
type: object
properties:
version:
type: integer
format: int32
partitions:
type: array
items:
$ref: "#/components/schemas/PartitionReassignment"
PartitionReassignment:
type: object
properties:
topic:
type: string
partition:
type: integer
replicas:
type: array
items:
type: integer
log_dirs:
type: array
items:
type: string
GeneratePartitionsReassignmentCommand:
type: object
properties:
version:
type: integer
format: int32
broker_ids:
type: array
items:
type: integer
format: int32
topics:
type: array
items:
type: object
properties:
topic:
type: string
InProgressReassignment:
type: object
properties:
partitions:
type: array
items:
$ref: "#/components/schemas/InProgressPartitionReassignment"
InProgressPartitionReassignment:
type: object
properties:
topic:
type: string
partition:
type: integer
format: int32
currentReplicas:
type: array
items:
type: integer
format: int32
addingReplicas:
items:
type: integer
format: int32
removingReplicas:
items:
type: integer
format: int32