|
@@ -80,14 +80,14 @@ public class TopicsService {
|
|
|
Optional<TopicColumnsToSortDTO> sortBy,
|
|
|
Optional<SortOrderDTO> sortOrder) {
|
|
|
return adminClientService.get(cluster).flatMap(ac ->
|
|
|
- new Pagination(ac, metricsCache.get(cluster))
|
|
|
- .getPage(pageNum, nullablePerPage, showInternal, search, sortBy, sortOrder)
|
|
|
- .flatMap(page ->
|
|
|
- loadTopics(cluster, page.getTopics())
|
|
|
- .map(topics ->
|
|
|
- new TopicsResponseDTO()
|
|
|
- .topics(topics.stream().map(clusterMapper::toTopic).collect(toList()))
|
|
|
- .pageCount(page.getTotalPages()))));
|
|
|
+ new Pagination(ac, metricsCache.get(cluster))
|
|
|
+ .getPage(pageNum, nullablePerPage, showInternal, search, sortBy, sortOrder)
|
|
|
+ .flatMap(page ->
|
|
|
+ loadTopics(cluster, page.getTopics())
|
|
|
+ .map(topics ->
|
|
|
+ new TopicsResponseDTO()
|
|
|
+ .topics(topics.stream().map(clusterMapper::toTopic).collect(toList()))
|
|
|
+ .pageCount(page.getTotalPages()))));
|
|
|
}
|
|
|
|
|
|
private Mono<List<InternalTopic>> loadTopics(KafkaCluster c, List<String> topics) {
|
|
@@ -193,31 +193,31 @@ public class TopicsService {
|
|
|
|
|
|
public Mono<TopicDTO> recreateTopic(KafkaCluster cluster, String topicName) {
|
|
|
return loadTopic(cluster, topicName)
|
|
|
- .flatMap(t -> deleteTopic(cluster, topicName)
|
|
|
- .thenReturn(t).delayElement(Duration.ofSeconds(recreateDelayInSeconds))
|
|
|
- .flatMap(topic -> adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(),
|
|
|
- topic.getPartitionCount(),
|
|
|
- (short) topic.getReplicationFactor(),
|
|
|
- topic.getTopicConfigs()
|
|
|
- .stream()
|
|
|
- .collect(Collectors
|
|
|
- .toMap(InternalTopicConfig::getName,
|
|
|
- InternalTopicConfig::getValue)))
|
|
|
- .thenReturn(topicName))
|
|
|
- .retryWhen(Retry.fixedDelay(recreateMaxRetries,
|
|
|
- Duration.ofSeconds(recreateDelayInSeconds))
|
|
|
- .filter(throwable -> throwable instanceof TopicExistsException)
|
|
|
- .onRetryExhaustedThrow((a, b) ->
|
|
|
- new TopicRecreationException(topicName,
|
|
|
- recreateMaxRetries * recreateDelayInSeconds)))
|
|
|
- .flatMap(a -> loadTopic(cluster, topicName)).map(clusterMapper::toTopic)
|
|
|
- )
|
|
|
- );
|
|
|
+ .flatMap(t -> deleteTopic(cluster, topicName)
|
|
|
+ .thenReturn(t).delayElement(Duration.ofSeconds(recreateDelayInSeconds))
|
|
|
+ .flatMap(topic -> adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(),
|
|
|
+ topic.getPartitionCount(),
|
|
|
+ (short) topic.getReplicationFactor(),
|
|
|
+ topic.getTopicConfigs()
|
|
|
+ .stream()
|
|
|
+ .collect(Collectors
|
|
|
+ .toMap(InternalTopicConfig::getName,
|
|
|
+ InternalTopicConfig::getValue)))
|
|
|
+ .thenReturn(topicName))
|
|
|
+ .retryWhen(Retry.fixedDelay(recreateMaxRetries,
|
|
|
+ Duration.ofSeconds(recreateDelayInSeconds))
|
|
|
+ .filter(TopicExistsException.class::isInstance)
|
|
|
+ .onRetryExhaustedThrow((a, b) ->
|
|
|
+ new TopicRecreationException(topicName,
|
|
|
+ recreateMaxRetries * recreateDelayInSeconds)))
|
|
|
+ .flatMap(a -> loadTopic(cluster, topicName)).map(clusterMapper::toTopic)
|
|
|
+ )
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
private Mono<InternalTopic> updateTopic(KafkaCluster cluster,
|
|
|
- String topicName,
|
|
|
- TopicUpdateDTO topicUpdate) {
|
|
|
+ String topicName,
|
|
|
+ TopicUpdateDTO topicUpdate) {
|
|
|
return adminClientService.get(cluster)
|
|
|
.flatMap(ac ->
|
|
|
ac.updateTopicConfig(topicName, topicUpdate.getConfigs())
|
|
@@ -403,10 +403,11 @@ public class TopicsService {
|
|
|
);
|
|
|
return ac.createPartitions(newPartitionsMap)
|
|
|
.then(loadTopic(cluster, topicName));
|
|
|
- })
|
|
|
- .map(t -> new PartitionsIncreaseResponseDTO()
|
|
|
+ }).map(t -> new PartitionsIncreaseResponseDTO()
|
|
|
.topicName(t.getName())
|
|
|
- .totalPartitionsCount(t.getPartitionCount())));
|
|
|
+ .totalPartitionsCount(t.getPartitionCount())
|
|
|
+ )
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
|