Move DTO mappers to controllers (#1805)

* [ISSUE-949] moved mapper methods to controllers

* [ISSUE-949] Applied code review comments

* [ISSUE-949] fixed formatting issue

Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
This commit is contained in:
ValentinPrischepa 2022-05-16 04:06:20 -07:00 committed by GitHub
parent 6849e25c8e
commit ae1a400255
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 270 additions and 263 deletions

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.BrokersApi;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.BrokerConfigDTO;
import com.provectus.kafka.ui.model.BrokerConfigItemDTO;
import com.provectus.kafka.ui.model.BrokerDTO;
@ -22,11 +23,13 @@ import reactor.core.publisher.Mono;
@Slf4j
public class BrokersController extends AbstractController implements BrokersApi {
private final BrokerService brokerService;
private final ClusterMapper clusterMapper;
@Override
public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
ServerWebExchange exchange) {
return brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@ -50,7 +53,8 @@ public class BrokersController extends AbstractController implements BrokersApi
public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName, Integer id,
ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(
brokerService.getBrokerConfig(getCluster(clusterName), id)));
brokerService.getBrokerConfig(getCluster(clusterName), id)
.map(clusterMapper::toBrokerConfig)));
}
@Override

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.SchemasApi;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
@ -29,6 +30,8 @@ public class SchemasController extends AbstractController implements SchemasApi
private static final Integer DEFAULT_PAGE_SIZE = 25;
private final ClusterMapper mapper;
private final SchemaRegistryService schemaRegistryService;
@Override
@ -46,6 +49,7 @@ public class SchemasController extends AbstractController implements SchemasApi
ServerWebExchange exchange) {
return schemaRegistryService.checksSchemaCompatibility(
getCluster(clusterName), subject, newSchemaSubject)
.map(mapper::toCompatibilityCheckResponse)
.map(ResponseEntity::ok);
}
@ -91,6 +95,7 @@ public class SchemasController extends AbstractController implements SchemasApi
public Mono<ResponseEntity<CompatibilityLevelDTO>> getGlobalSchemaCompatibilityLevel(
String clusterName, ServerWebExchange exchange) {
return schemaRegistryService.getGlobalSchemaCompatibilityLevel(getCluster(clusterName))
.map(mapper::toCompatibilityLevelDto)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}

View file

@ -1,6 +1,11 @@
package com.provectus.kafka.ui.controller;
import static java.util.stream.Collectors.toList;
import com.provectus.kafka.ui.api.TopicsApi;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
@ -14,10 +19,12 @@ import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.service.TopicsService;
import java.util.Optional;
import java.util.Comparator;
import java.util.List;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
@ -29,12 +36,17 @@ import reactor.core.publisher.Mono;
@RequiredArgsConstructor
@Slf4j
public class TopicsController extends AbstractController implements TopicsApi {
private static final Integer DEFAULT_PAGE_SIZE = 25;
private final TopicsService topicsService;
private final ClusterMapper clusterMapper;
@Override
public Mono<ResponseEntity<TopicDTO>> createTopic(
String clusterName, @Valid Mono<TopicCreationDTO> topicCreation, ServerWebExchange exchange) {
return topicsService.createTopic(getCluster(clusterName), topicCreation)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
@ -43,6 +55,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
public Mono<ResponseEntity<TopicDTO>> recreateTopic(String clusterName,
String topicName, ServerWebExchange serverWebExchange) {
return topicsService.recreateTopic(getCluster(clusterName), topicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED));
}
@ -50,6 +63,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
public Mono<ResponseEntity<TopicDTO>> cloneTopic(
String clusterName, String topicName, String newTopicName, ServerWebExchange exchange) {
return topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED));
}
@ -64,6 +78,10 @@ public class TopicsController extends AbstractController implements TopicsApi {
public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
String clusterName, String topicName, ServerWebExchange exchange) {
return topicsService.getTopicConfigs(getCluster(clusterName), topicName)
.map(lst -> lst.stream()
.map(InternalTopicConfig::from)
.map(clusterMapper::toTopicConfig)
.collect(toList()))
.map(Flux::fromIterable)
.map(ResponseEntity::ok);
}
@ -72,10 +90,10 @@ public class TopicsController extends AbstractController implements TopicsApi {
public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
String clusterName, String topicName, ServerWebExchange exchange) {
return topicsService.getTopicDetails(getCluster(clusterName), topicName)
.map(clusterMapper::toTopicDetails)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName, @Valid Integer page,
@Valid Integer perPage,
@Valid Boolean showInternal,
@ -83,16 +101,54 @@ public class TopicsController extends AbstractController implements TopicsApi {
@Valid TopicColumnsToSortDTO orderBy,
@Valid SortOrderDTO sortOrder,
ServerWebExchange exchange) {
return topicsService
.getTopics(
getCluster(clusterName),
Optional.ofNullable(page),
Optional.ofNullable(perPage),
Optional.ofNullable(showInternal),
Optional.ofNullable(search),
Optional.ofNullable(orderBy),
Optional.ofNullable(sortOrder)
).map(ResponseEntity::ok);
return topicsService.getTopicsForPagination(getCluster(clusterName))
.flatMap(existingTopics -> {
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
List<InternalTopic> filtered = existingTopics.stream()
.filter(topic -> !topic.isInternal()
|| showInternal != null && showInternal)
.filter(topic -> search == null || StringUtils.contains(topic.getName(), search))
.sorted(comparator)
.collect(toList());
var totalPages = (filtered.size() / pageSize)
+ (filtered.size() % pageSize == 0 ? 0 : 1);
List<String> topicsPage = filtered.stream()
.skip(topicsToSkip)
.limit(pageSize)
.map(InternalTopic::getName)
.collect(toList());
return topicsService.loadTopics(getCluster(clusterName), topicsPage)
.map(topicsToRender ->
new TopicsResponseDTO()
.topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
.pageCount(totalPages));
}).map(ResponseEntity::ok);
}
private Comparator<InternalTopic> getComparatorForTopic(
TopicColumnsToSortDTO orderBy) {
var defaultComparator = Comparator.comparing(InternalTopic::getName);
if (orderBy == null) {
return defaultComparator;
}
switch (orderBy) {
case TOTAL_PARTITIONS:
return Comparator.comparing(InternalTopic::getPartitionCount);
case OUT_OF_SYNC_REPLICAS:
return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
case REPLICATION_FACTOR:
return Comparator.comparing(InternalTopic::getReplicationFactor);
case SIZE:
return Comparator.comparing(InternalTopic::getSegmentSize);
case NAME:
default:
return defaultComparator;
}
}
@Override
@ -100,7 +156,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
String clusterId, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
ServerWebExchange exchange) {
return topicsService
.updateTopic(getCluster(clusterId), topicName, topicUpdate).map(ResponseEntity::ok);
.updateTopic(getCluster(clusterId), topicName, topicUpdate)
.map(clusterMapper::toTopic)
.map(ResponseEntity::ok);
}
@Override

View file

@ -126,7 +126,7 @@ public interface ClusterMapper {
CompatibilityCheckResponseDTO toCompatibilityCheckResponse(InternalCompatibilityCheck dto);
@Mapping(target = "compatibility", source = "compatibilityLevel")
CompatibilityLevelDTO toCompatibilityLevel(InternalCompatibilityLevel dto);
CompatibilityLevelDTO toCompatibilityLevelDto(InternalCompatibilityLevel dto);
default List<PartitionDTO> map(Map<Integer, InternalPartition> map) {
return map.values().stream().map(this::toPartition).collect(Collectors.toList());

View file

@ -4,14 +4,12 @@ import com.provectus.kafka.ui.exception.InvalidRequestApiException;
import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
import com.provectus.kafka.ui.model.BrokerConfigDTO;
import com.provectus.kafka.ui.model.BrokerDTO;
import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
import com.provectus.kafka.ui.model.BrokerMetricsDTO;
import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.InternalBrokerConfig;
import com.provectus.kafka.ui.model.JmxBrokerMetrics;
import com.provectus.kafka.ui.model.KafkaCluster;
import java.util.Collections;
import java.util.HashMap;
@ -40,7 +38,6 @@ public class BrokerService {
private final MetricsCache metricsCache;
private final AdminClientService adminClientService;
private final DescribeLogDirsMapper describeLogDirsMapper;
private final ClusterMapper clusterMapper;
private Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(
KafkaCluster cluster, List<Integer> brokersIds) {
@ -149,15 +146,13 @@ public class BrokerService {
.flatMapMany(Flux::fromIterable);
}
public Flux<BrokerConfigDTO> getBrokerConfig(KafkaCluster cluster, Integer brokerId) {
return getBrokersConfig(cluster, brokerId)
.map(clusterMapper::toBrokerConfig);
public Flux<InternalBrokerConfig> getBrokerConfig(KafkaCluster cluster, Integer brokerId) {
return getBrokersConfig(cluster, brokerId);
}
public Mono<BrokerMetricsDTO> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
public Mono<JmxBrokerMetrics> getBrokerMetrics(KafkaCluster cluster, Integer brokerId) {
return Mono.justOrEmpty(
metricsCache.get(cluster).getJmxMetrics().getInternalBrokerMetrics().get(brokerId))
.map(clusterMapper::toBrokerMetrics);
metricsCache.get(cluster).getJmxMetrics().getInternalBrokerMetrics().get(brokerId));
}
}

View file

@ -10,8 +10,6 @@ import com.provectus.kafka.ui.exception.SchemaNotFoundException;
import com.provectus.kafka.ui.exception.SchemaTypeNotSupportedException;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.InternalSchemaRegistry;
import com.provectus.kafka.ui.model.KafkaCluster;
@ -69,7 +67,6 @@ public class SchemaRegistryService {
private static final String UNRECOGNIZED_FIELD_SCHEMA_TYPE = "Unrecognized field: schemaType";
private static final String INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA = "incompatible with an earlier schema";
private final ClusterMapper mapper;
private final WebClient webClient;
public Mono<List<SchemaSubjectDTO>> getAllLatestVersionSchemas(KafkaCluster cluster,
@ -136,7 +133,7 @@ public class SchemaRegistryService {
.zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
.map(tuple -> {
SchemaSubjectDTO schema = tuple.getT1();
String compatibilityLevel = tuple.getT2().getCompatibility().getValue();
String compatibilityLevel = tuple.getT2().getCompatibilityLevel();
schema.setCompatibilityLevel(compatibilityLevel);
return schema;
})
@ -279,7 +276,7 @@ public class SchemaRegistryService {
return updateSchemaCompatibility(cluster, null, compatibilityLevel);
}
public Mono<CompatibilityLevelDTO> getSchemaCompatibilityLevel(KafkaCluster cluster,
public Mono<InternalCompatibilityLevel> getSchemaCompatibilityLevel(KafkaCluster cluster,
String schemaName) {
String globalConfig = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
final var values = new LinkedMultiValueMap<String, String>();
@ -292,21 +289,20 @@ public class SchemaRegistryService {
values)
.retrieve()
.bodyToMono(InternalCompatibilityLevel.class)
.map(mapper::toCompatibilityLevel)
.onErrorResume(error -> Mono.empty());
}
public Mono<CompatibilityLevelDTO> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
public Mono<InternalCompatibilityLevel> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
return this.getSchemaCompatibilityLevel(cluster, null);
}
private Mono<CompatibilityLevelDTO> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
private Mono<InternalCompatibilityLevel> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
String schemaName) {
return this.getSchemaCompatibilityLevel(cluster, schemaName)
.switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(cluster));
}
public Mono<CompatibilityCheckResponseDTO> checksSchemaCompatibility(
public Mono<InternalCompatibilityCheck> checksSchemaCompatibility(
KafkaCluster cluster, String schemaName, Mono<NewSchemaSubjectDTO> newSchemaSubject) {
return configuredWebClient(
cluster,
@ -319,7 +315,6 @@ public class SchemaRegistryService {
.onStatus(NOT_FOUND::equals,
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
.bodyToMono(InternalCompatibilityCheck.class)
.map(mapper::toCompatibilityCheckResponse)
.as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
() -> this.checksSchemaCompatibility(cluster, schemaName, newSchemaSubject))));
}

View file

@ -3,12 +3,10 @@ package com.provectus.kafka.ui.service;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import com.google.common.annotations.VisibleForTesting;
import com.provectus.kafka.ui.exception.TopicMetadataException;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
import com.provectus.kafka.ui.exception.TopicRecreationException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.Feature;
import com.provectus.kafka.ui.model.InternalLogDirStats;
import com.provectus.kafka.ui.model.InternalPartition;
@ -21,15 +19,9 @@ import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicCreationDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.serde.DeserializationService;
import com.provectus.kafka.ui.util.JmxClusterUtil;
import java.time.Duration;
@ -40,10 +32,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
@ -61,10 +51,7 @@ import reactor.util.retry.Retry;
@RequiredArgsConstructor
public class TopicsService {
private static final Integer DEFAULT_PAGE_SIZE = 25;
private final AdminClientService adminClientService;
private final ClusterMapper clusterMapper;
private final DeserializationService deserializationService;
private final MetricsCache metricsCache;
@Value("${topic.recreate.maxRetries:15}")
@ -76,25 +63,7 @@ public class TopicsService {
@Value("${topic.load.after.create.delay.ms:500}")
private int loadTopicAfterCreateDelayInMs;
public Mono<TopicsResponseDTO> getTopics(KafkaCluster cluster,
Optional<Integer> pageNum,
Optional<Integer> nullablePerPage,
Optional<Boolean> showInternal,
Optional<String> search,
Optional<TopicColumnsToSortDTO> sortBy,
Optional<SortOrderDTO> sortOrder) {
return adminClientService.get(cluster).flatMap(ac ->
new Pagination(ac, metricsCache.get(cluster))
.getPage(pageNum, nullablePerPage, showInternal, search, sortBy, sortOrder)
.flatMap(page ->
loadTopics(cluster, page.getTopics())
.map(topics ->
new TopicsResponseDTO()
.topics(topics.stream().map(clusterMapper::toTopic).collect(toList()))
.pageCount(page.getTotalPages()))));
}
private Mono<List<InternalTopic>> loadTopics(KafkaCluster c, List<String> topics) {
public Mono<List<InternalTopic>> loadTopics(KafkaCluster c, List<String> topics) {
if (topics.isEmpty()) {
return Mono.just(List.of());
}
@ -186,18 +155,14 @@ public class TopicsService {
.map(InternalPartitionsOffsets::new);
}
public Mono<TopicDetailsDTO> getTopicDetails(KafkaCluster cluster, String topicName) {
return loadTopic(cluster, topicName).map(clusterMapper::toTopicDetails);
public Mono<InternalTopic> getTopicDetails(KafkaCluster cluster, String topicName) {
return loadTopic(cluster, topicName);
}
public Mono<List<TopicConfigDTO>> getTopicConfigs(KafkaCluster cluster, String topicName) {
public Mono<List<ConfigEntry>> getTopicConfigs(KafkaCluster cluster, String topicName) {
return adminClientService.get(cluster)
.flatMap(ac -> ac.getTopicsConfig(List.of(topicName)))
.map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new))
.map(lst -> lst.stream()
.map(InternalTopicConfig::from)
.map(clusterMapper::toTopicConfig)
.collect(toList()));
.map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new));
}
private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient,
@ -214,13 +179,12 @@ public class TopicsService {
.flatMap(topicData -> loadTopicAfterCreation(c, topicData.getName()));
}
public Mono<TopicDTO> createTopic(KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
return adminClientService.get(cluster)
.flatMap(ac -> createTopic(cluster, ac, topicCreation))
.map(clusterMapper::toTopic);
.flatMap(ac -> createTopic(cluster, ac, topicCreation));
}
public Mono<TopicDTO> recreateTopic(KafkaCluster cluster, String topicName) {
public Mono<InternalTopic> recreateTopic(KafkaCluster cluster, String topicName) {
return loadTopic(cluster, topicName)
.flatMap(t -> deleteTopic(cluster, topicName)
.thenReturn(t)
@ -246,7 +210,7 @@ public class TopicsService {
new TopicRecreationException(topicName,
recreateMaxRetries * recreateDelayInSeconds))
)
.flatMap(a -> loadTopicAfterCreation(cluster, topicName)).map(clusterMapper::toTopic)
.flatMap(a -> loadTopicAfterCreation(cluster, topicName))
)
);
}
@ -260,11 +224,10 @@ public class TopicsService {
.then(loadTopic(cluster, topicName)));
}
public Mono<TopicDTO> updateTopic(KafkaCluster cl, String topicName,
public Mono<InternalTopic> updateTopic(KafkaCluster cl, String topicName,
Mono<TopicUpdateDTO> topicUpdate) {
return topicUpdate
.flatMap(t -> updateTopic(cl, topicName, t))
.map(clusterMapper::toTopic);
.flatMap(t -> updateTopic(cl, topicName, t));
}
private Mono<InternalTopic> changeReplicationFactor(
@ -464,7 +427,7 @@ public class TopicsService {
.getTopicSchema(topicName);
}
public Mono<TopicDTO> cloneTopic(
public Mono<InternalTopic> cloneTopic(
KafkaCluster cluster, String topicName, String newTopicName) {
return loadTopic(cluster, topicName).flatMap(topic ->
adminClientService.get(cluster)
@ -480,86 +443,12 @@ public class TopicsService {
)
).thenReturn(newTopicName)
.flatMap(a -> loadTopicAfterCreation(cluster, newTopicName))
.map(clusterMapper::toTopic)
);
}
@VisibleForTesting
@lombok.Value
static class Pagination {
ReactiveAdminClient adminClient;
MetricsCache.Metrics metrics;
@lombok.Value
static class Page {
List<String> topics;
int totalPages;
}
Mono<Page> getPage(
Optional<Integer> pageNum,
Optional<Integer> nullablePerPage,
Optional<Boolean> showInternal,
Optional<String> search,
Optional<TopicColumnsToSortDTO> sortBy,
Optional<SortOrderDTO> sortOrder) {
return geTopicsForPagination()
.map(paginatingTopics -> {
Predicate<Integer> positiveInt = i -> i > 0;
int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
var topicsToSkip = (pageNum.filter(positiveInt).orElse(1) - 1) * perPage;
var comparator = sortOrder.isEmpty() || !sortOrder.get().equals(SortOrderDTO.DESC)
? getComparatorForTopic(sortBy) : getComparatorForTopic(sortBy).reversed();
List<InternalTopic> topics = paginatingTopics.stream()
.filter(topic -> !topic.isInternal()
|| showInternal.map(i -> topic.isInternal() == i).orElse(true))
.filter(topic ->
search
.map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
.orElse(true))
.sorted(comparator)
.collect(toList());
var totalPages = (topics.size() / perPage)
+ (topics.size() % perPage == 0 ? 0 : 1);
List<String> topicsToRender = topics.stream()
.skip(topicsToSkip)
.limit(perPage)
.map(InternalTopic::getName)
.collect(toList());
return new Page(topicsToRender, totalPages);
});
}
private Comparator<InternalTopic> getComparatorForTopic(
Optional<TopicColumnsToSortDTO> sortBy) {
var defaultComparator = Comparator.comparing(InternalTopic::getName);
if (sortBy.isEmpty()) {
return defaultComparator;
}
switch (sortBy.get()) {
case TOTAL_PARTITIONS:
return Comparator.comparing(InternalTopic::getPartitionCount);
case OUT_OF_SYNC_REPLICAS:
return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
case REPLICATION_FACTOR:
return Comparator.comparing(InternalTopic::getReplicationFactor);
case SIZE:
return Comparator.comparing(InternalTopic::getSegmentSize);
case NAME:
default:
return defaultComparator;
}
}
private Mono<List<String>> filterExisting(Collection<String> topics) {
return adminClient.listTopics(true)
.map(existing -> existing.stream().filter(topics::contains).collect(toList()));
}
private Mono<List<InternalTopic>> geTopicsForPagination() {
return filterExisting(metrics.getTopicDescriptions().keySet())
public Mono<List<InternalTopic>> getTopicsForPagination(KafkaCluster cluster) {
MetricsCache.Metrics metrics = metricsCache.get(cluster);
return filterExisting(cluster, metrics.getTopicDescriptions().keySet())
.map(lst -> lst.stream()
.map(topicName ->
InternalTopic.from(
@ -571,6 +460,10 @@ public class TopicsService {
.collect(toList())
);
}
private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
return adminClientService.get(cluster).flatMap(ac -> ac.listTopics(true))
.map(existing -> existing.stream().filter(topics::contains).collect(toList()));
}
}

View file

@ -7,6 +7,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.provectus.kafka.ui.controller.SchemasController;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.InternalSchemaRegistry;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
@ -22,8 +23,9 @@ public class SchemaRegistryPaginationTest {
private final SchemaRegistryService schemaRegistryService = mock(SchemaRegistryService.class);
private final ClustersStorage clustersStorage = mock(ClustersStorage.class);
private final ClusterMapper clusterMapper = mock(ClusterMapper.class);
private SchemasController controller;
private final SchemasController controller = new SchemasController(clusterMapper, schemaRegistryService);
private void init(String[] subjects) {
when(schemaRegistryService.getAllSubjectNames(isA(KafkaCluster.class)))
@ -34,7 +36,6 @@ public class SchemaRegistryPaginationTest {
.thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
when(schemaRegistryService.getLatestSchemaVersionBySubject(isA(KafkaCluster.class), isA(String.class)))
.thenAnswer(a -> Mono.just(new SchemaSubjectDTO().subject(a.getArgument(1))));
this.controller = new SchemasController(schemaRegistryService);
this.controller.setClustersStorage(clustersStorage);
}

View file

@ -1,17 +1,31 @@
package com.provectus.kafka.ui.service;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.provectus.kafka.ui.controller.TopicsController;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.mapper.ClusterMapperImpl;
import com.provectus.kafka.ui.model.InternalLogDirStats;
import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
import com.provectus.kafka.ui.model.InternalSchemaRegistry;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import java.util.Collection;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.util.JmxClusterUtil;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.TopicDescription;
@ -21,20 +35,26 @@ import reactor.core.publisher.Mono;
class TopicsServicePaginationTest {
private TopicsService.Pagination pagination;
private static final String LOCAL_KAFKA_CLUSTER_NAME = "local";
private void init(Collection<TopicDescription> topicsInCache) {
ReactiveAdminClient adminClient = when(mock(ReactiveAdminClient.class).listTopics(true))
.thenReturn(Mono.just(topicsInCache.stream().map(TopicDescription::name)
.collect(Collectors.toSet())))
.getMock();
private final TopicsService topicsService = mock(TopicsService.class);
private final ClustersStorage clustersStorage = mock(ClustersStorage.class);
private final ClusterMapper clusterMapper = new ClusterMapperImpl();
MetricsCache.Metrics metricsCache = MetricsCache.Metrics.empty().toBuilder()
.topicDescriptions(
topicsInCache.stream().collect(Collectors.toMap(TopicDescription::name, d -> d)))
.build();
private final TopicsController topicsController = new TopicsController(topicsService, clusterMapper);
pagination = new TopicsService.Pagination(adminClient, metricsCache);
private void init(Map<String, InternalTopic> topicsInCache) {
when(clustersStorage.getClusterByName(isA(String.class)))
.thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
when(topicsService.getTopicsForPagination(isA(KafkaCluster.class)))
.thenReturn(Mono.just(new ArrayList<>(topicsInCache.values())));
when(topicsService.loadTopics(isA(KafkaCluster.class), anyList()))
.thenAnswer(a -> {
List<String> lst = a.getArgument(1);
return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
});
this.topicsController.setClustersStorage(clustersStorage);
}
@Test
@ -43,35 +63,49 @@ class TopicsServicePaginationTest {
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.collect(Collectors.toList())
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
var topics = pagination.getPage(
Optional.empty(), Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty(), Optional.empty()).block();
assertThat(topics.getTotalPages()).isEqualTo(4);
assertThat(topics.getTopics()).hasSize(25);
assertThat(topics.getTopics()).isSorted();
var topics = topicsController
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null,
null, null, null).block();
assertThat(topics.getBody().getPageCount()).isEqualTo(4);
assertThat(topics.getBody().getTopics()).hasSize(25);
assertThat(topics.getBody().getTopics())
.isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
}
private KafkaCluster buildKafkaCluster(String clusterName) {
return KafkaCluster.builder()
.name(clusterName)
.schemaRegistry(InternalSchemaRegistry.builder().build())
.build();
}
@Test
public void shouldListFirst25TopicsSortedByNameDescendingOrder() {
var topicDescriptions = IntStream.rangeClosed(1, 100).boxed()
var internalTopics = IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.collect(Collectors.toList());
init(topicDescriptions);
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
init(internalTopics);
var topics = pagination.getPage(
Optional.empty(), Optional.empty(), Optional.empty(),
Optional.empty(), Optional.of(TopicColumnsToSortDTO.NAME), Optional.of(SortOrderDTO.DESC)).block();
assertThat(topics.getTotalPages()).isEqualTo(4);
assertThat(topics.getTopics()).hasSize(25);
assertThat(topics.getTopics()).isSortedAccordingTo(Comparator.reverseOrder());
assertThat(topics.getTopics()).containsExactlyElementsOf(
topicDescriptions.stream()
.map(TopicDescription::name)
.sorted(Comparator.reverseOrder())
var topics = topicsController
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null,
TopicColumnsToSortDTO.NAME, SortOrderDTO.DESC, null).block();
assertThat(topics.getBody().getPageCount()).isEqualTo(4);
assertThat(topics.getBody().getTopics()).hasSize(25);
assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName).reversed());
assertThat(topics.getBody().getTopics()).containsExactlyElementsOf(
internalTopics.values().stream()
.map(clusterMapper::toTopic)
.sorted(Comparator.comparing(TopicDTO::getName).reversed())
.limit(25)
.collect(Collectors.toList())
);
@ -83,14 +117,17 @@ class TopicsServicePaginationTest {
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.collect(Collectors.toList())
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
var topics = pagination.getPage(Optional.of(4), Optional.of(33),
Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()).block();
assertThat(topics.getTotalPages()).isEqualTo(4);
assertThat(topics.getTopics()).hasSize(1)
.first().isEqualTo("99");
var topics = topicsController
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, 33, null, null, null, null, null).block();
assertThat(topics.getBody().getPageCount()).isEqualTo(4);
assertThat(topics.getBody().getTopics()).hasSize(1);
assertThat(topics.getBody().getTopics().get(0).getName().equals("99"));
}
@Test
@ -99,14 +136,17 @@ class TopicsServicePaginationTest {
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.collect(Collectors.toList())
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
var topics = pagination.getPage(Optional.of(0), Optional.of(-1),
Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()).block();
assertThat(topics.getTotalPages()).isEqualTo(4);
assertThat(topics.getTopics()).hasSize(25);
assertThat(topics.getTopics()).isSorted();
var topics = topicsController
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, null, null, null, null, null).block();
assertThat(topics.getBody().getPageCount()).isEqualTo(4);
assertThat(topics.getBody().getTopics()).hasSize(25);
assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
}
@Test
@ -115,87 +155,103 @@ class TopicsServicePaginationTest {
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
.collect(Collectors.toList())
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
var topics = pagination.getPage(
Optional.empty(), Optional.empty(), Optional.of(true),
Optional.empty(), Optional.empty(), Optional.empty()).block();
assertThat(topics.getTotalPages()).isEqualTo(4);
assertThat(topics.getTopics()).hasSize(25);
assertThat(topics.getTopics()).isSorted();
}
var topics = topicsController
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, true, null,
null, null, null).block();
assertThat(topics.getBody().getPageCount()).isEqualTo(4);
assertThat(topics.getBody().getTopics()).hasSize(25);
assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
}
@Test
public void shouldListOnlyNonInternalTopics() {
init(
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.collect(Collectors.toList())
.map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of()))
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
var topics = pagination.getPage(
Optional.empty(), Optional.empty(), Optional.of(true),
Optional.empty(), Optional.empty(), Optional.empty()).block();
assertThat(topics.getTotalPages()).isEqualTo(4);
assertThat(topics.getTopics()).hasSize(25);
assertThat(topics.getTopics()).isSorted();
}
var topics = topicsController
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, -1, false, null,
null, null, null).block();
assertThat(topics.getBody().getPageCount()).isEqualTo(4);
assertThat(topics.getBody().getTopics()).hasSize(5);
assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
}
@Test
public void shouldListOnlyTopicsContainingOne() {
init(
IntStream.rangeClosed(1, 100).boxed()
.map(Objects::toString)
.map(name -> new TopicDescription(name, false, List.of()))
.collect(Collectors.toList())
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
);
var topics = pagination.getPage(
Optional.empty(), Optional.empty(), Optional.empty(),
Optional.of("1"), Optional.empty(), Optional.empty()).block();
assertThat(topics.getTotalPages()).isEqualTo(1);
assertThat(topics.getTopics()).hasSize(20);
assertThat(topics.getTopics()).isSorted();
var topics = topicsController
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, "1",
null, null, null).block();
assertThat(topics.getBody().getPageCount()).isEqualTo(1);
assertThat(topics.getBody().getTopics()).hasSize(20);
assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
}
@Test
public void shouldListTopicsOrderedByPartitionsCount() {
List<TopicDescription> topicDescriptions = IntStream.rangeClosed(1, 100).boxed()
Map<String, InternalTopic> internalTopics = IntStream.rangeClosed(1, 100).boxed()
.map(i -> new TopicDescription(UUID.randomUUID().toString(), false,
IntStream.range(0, i)
.mapToObj(p ->
new TopicPartitionInfo(p, null, List.of(), List.of()))
.collect(Collectors.toList())))
.collect(Collectors.toList());
.map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(),
JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty()))
.collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
init(topicDescriptions);
init(internalTopics);
var topics = pagination.getPage(
Optional.empty(), Optional.empty(), Optional.empty(),
Optional.empty(), Optional.of(TopicColumnsToSortDTO.TOTAL_PARTITIONS), Optional.empty()).block();
assertThat(topics.getTotalPages()).isEqualTo(4);
assertThat(topics.getTopics()).hasSize(25);
assertThat(topics.getTopics()).containsExactlyElementsOf(
topicDescriptions.stream()
.map(TopicDescription::name)
var topicsSortedAsc = topicsController
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, null, null).block();
assertThat(topicsSortedAsc.getBody().getPageCount()).isEqualTo(4);
assertThat(topicsSortedAsc.getBody().getTopics()).hasSize(25);
assertThat(topicsSortedAsc.getBody().getTopics()).containsExactlyElementsOf(
internalTopics.values().stream()
.map(clusterMapper::toTopic)
.sorted(Comparator.comparing(TopicDTO::getPartitionCount))
.limit(25)
.collect(Collectors.toList()));
.collect(Collectors.toList())
);
var topicsSortedDesc = pagination.getPage(
Optional.empty(), Optional.empty(), Optional.empty(),
Optional.empty(), Optional.of(TopicColumnsToSortDTO.TOTAL_PARTITIONS), Optional.of(SortOrderDTO.DESC)).block();
assertThat(topicsSortedDesc.getTotalPages()).isEqualTo(4);
assertThat(topicsSortedDesc.getTopics()).hasSize(25);
assertThat(topicsSortedDesc.getTopics()).containsExactlyElementsOf(
topicDescriptions.stream()
.sorted((a, b) -> b.partitions().size() - a.partitions().size())
.map(TopicDescription::name)
var topicsSortedDesc = topicsController
.getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, SortOrderDTO.DESC, null).block();
assertThat(topicsSortedDesc.getBody().getPageCount()).isEqualTo(4);
assertThat(topicsSortedDesc.getBody().getTopics()).hasSize(25);
assertThat(topicsSortedDesc.getBody().getTopics()).containsExactlyElementsOf(
internalTopics.values().stream()
.map(clusterMapper::toTopic)
.sorted(Comparator.comparing(TopicDTO::getPartitionCount).reversed())
.limit(25)
.collect(Collectors.toList()));
.collect(Collectors.toList())
);
}
}