|
@@ -27,6 +27,7 @@ import com.provectus.kafka.ui.service.TopicsService;
|
|
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
|
|
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
|
|
import com.provectus.kafka.ui.service.reassign.ReassignmentService;
|
|
import com.provectus.kafka.ui.service.reassign.ReassignmentService;
|
|
import java.util.Comparator;
|
|
import java.util.Comparator;
|
|
|
|
+import java.util.HashSet;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
import javax.validation.Valid;
|
|
import javax.validation.Valid;
|
|
@@ -235,14 +236,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>> getCurrentPartitionAssignment(String clusterName,
|
|
public Mono<ResponseEntity<ReassignPartitionsCommandDTO>> getCurrentPartitionAssignment(String clusterName,
|
|
- Flux<String> topicsList,
|
|
|
|
|
|
+ List<String> topics,
|
|
ServerWebExchange exchange) {
|
|
ServerWebExchange exchange) {
|
|
- return topicsList
|
|
|
|
- .collect(Collectors.toSet())
|
|
|
|
- .flatMap(topics ->
|
|
|
|
- reassignmentService.getCurrentAssignment(
|
|
|
|
- getCluster(clusterName),
|
|
|
|
- topics))
|
|
|
|
|
|
+ return reassignmentService.getCurrentAssignment(getCluster(clusterName), new HashSet<>(topics))
|
|
.map(ResponseEntity::ok);
|
|
.map(ResponseEntity::ok);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -263,9 +259,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public Mono<ResponseEntity<Void>> cancelPartitionAssignment(String clusterName,
|
|
|
|
- Mono<PartitionReassignmentCancellationDTO> cancelDto,
|
|
|
|
- ServerWebExchange exchange) {
|
|
|
|
|
|
+ public Mono<ResponseEntity<Void>> cancelRunningPartitionAssignment(String clusterName,
|
|
|
|
+ Mono<PartitionReassignmentCancellationDTO> cancelDto,
|
|
|
|
+ ServerWebExchange exchange) {
|
|
return cancelDto
|
|
return cancelDto
|
|
.flatMap(dto ->
|
|
.flatMap(dto ->
|
|
reassignmentService.cancelReassignment(
|
|
reassignmentService.cancelReassignment(
|