diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java index e0e1d5dc78..df809d615a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.BrokersApi; +import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.BrokerConfigDTO; import com.provectus.kafka.ui.model.BrokerConfigItemDTO; import com.provectus.kafka.ui.model.BrokerDTO; @@ -22,11 +23,13 @@ import reactor.core.publisher.Mono; @Slf4j public class BrokersController extends AbstractController implements BrokersApi { private final BrokerService brokerService; + private final ClusterMapper clusterMapper; @Override public Mono> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) { return brokerService.getBrokerMetrics(getCluster(clusterName), id) + .map(clusterMapper::toBrokerMetrics) .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()); } @@ -50,7 +53,8 @@ public class BrokersController extends AbstractController implements BrokersApi public Mono>> getBrokerConfig(String clusterName, Integer id, ServerWebExchange exchange) { return Mono.just(ResponseEntity.ok( - brokerService.getBrokerConfig(getCluster(clusterName), id))); + brokerService.getBrokerConfig(getCluster(clusterName), id) + .map(clusterMapper::toBrokerConfig))); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java index 0526ee8b1f..55187903a6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.SchemasApi; import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO; import com.provectus.kafka.ui.model.CompatibilityLevelDTO; import com.provectus.kafka.ui.model.KafkaCluster; @@ -29,6 +30,8 @@ public class SchemasController extends AbstractController implements SchemasApi private static final Integer DEFAULT_PAGE_SIZE = 25; + private final ClusterMapper mapper; + private final SchemaRegistryService schemaRegistryService; @Override @@ -46,6 +49,7 @@ public class SchemasController extends AbstractController implements SchemasApi ServerWebExchange exchange) { return schemaRegistryService.checksSchemaCompatibility( getCluster(clusterName), subject, newSchemaSubject) + .map(mapper::toCompatibilityCheckResponse) .map(ResponseEntity::ok); } @@ -91,6 +95,7 @@ public class SchemasController extends AbstractController implements SchemasApi public Mono> getGlobalSchemaCompatibilityLevel( String clusterName, ServerWebExchange exchange) { return schemaRegistryService.getGlobalSchemaCompatibilityLevel(getCluster(clusterName)) + .map(mapper::toCompatibilityLevelDto) .map(ResponseEntity::ok) .defaultIfEmpty(ResponseEntity.notFound().build()); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index 3ebe2b03a7..e30a540c90 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -1,6 +1,11 @@ package com.provectus.kafka.ui.controller; +import static java.util.stream.Collectors.toList; + import com.provectus.kafka.ui.api.TopicsApi; +import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.InternalTopic; +import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.PartitionsIncreaseDTO; import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO; import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO; @@ -14,10 +19,12 @@ import com.provectus.kafka.ui.model.TopicDetailsDTO; import com.provectus.kafka.ui.model.TopicUpdateDTO; import com.provectus.kafka.ui.model.TopicsResponseDTO; import com.provectus.kafka.ui.service.TopicsService; -import java.util.Optional; +import java.util.Comparator; +import java.util.List; import javax.validation.Valid; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; @@ -29,12 +36,17 @@ import reactor.core.publisher.Mono; @RequiredArgsConstructor @Slf4j public class TopicsController extends AbstractController implements TopicsApi { + + private static final Integer DEFAULT_PAGE_SIZE = 25; + private final TopicsService topicsService; + private final ClusterMapper clusterMapper; @Override public Mono> createTopic( String clusterName, @Valid Mono topicCreation, ServerWebExchange exchange) { return topicsService.createTopic(getCluster(clusterName), topicCreation) + .map(clusterMapper::toTopic) .map(s -> new ResponseEntity<>(s, HttpStatus.OK)) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); } @@ -43,6 +55,7 @@ public class TopicsController extends AbstractController implements TopicsApi { public Mono> recreateTopic(String clusterName, String topicName, ServerWebExchange serverWebExchange) { return topicsService.recreateTopic(getCluster(clusterName), topicName) + .map(clusterMapper::toTopic) .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED)); } @@ -50,6 +63,7 @@ public class TopicsController extends AbstractController implements TopicsApi { public Mono> cloneTopic( String clusterName, String topicName, String newTopicName, ServerWebExchange exchange) { return topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName) + .map(clusterMapper::toTopic) .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED)); } @@ -64,6 +78,10 @@ public class TopicsController extends AbstractController implements TopicsApi { public Mono>> getTopicConfigs( String clusterName, String topicName, ServerWebExchange exchange) { return topicsService.getTopicConfigs(getCluster(clusterName), topicName) + .map(lst -> lst.stream() + .map(InternalTopicConfig::from) + .map(clusterMapper::toTopicConfig) + .collect(toList())) .map(Flux::fromIterable) .map(ResponseEntity::ok); } @@ -72,10 +90,10 @@ public class TopicsController extends AbstractController implements TopicsApi { public Mono> getTopicDetails( String clusterName, String topicName, ServerWebExchange exchange) { return topicsService.getTopicDetails(getCluster(clusterName), topicName) + .map(clusterMapper::toTopicDetails) .map(ResponseEntity::ok); } - @Override public Mono> getTopics(String clusterName, @Valid Integer page, @Valid Integer perPage, @Valid Boolean showInternal, @@ -83,16 +101,54 @@ public class TopicsController extends AbstractController implements TopicsApi { @Valid TopicColumnsToSortDTO orderBy, @Valid SortOrderDTO sortOrder, ServerWebExchange exchange) { - return topicsService - .getTopics( - getCluster(clusterName), - Optional.ofNullable(page), - Optional.ofNullable(perPage), - Optional.ofNullable(showInternal), - Optional.ofNullable(search), - Optional.ofNullable(orderBy), - Optional.ofNullable(sortOrder) - ).map(ResponseEntity::ok); + return topicsService.getTopicsForPagination(getCluster(clusterName)) + .flatMap(existingTopics -> { + int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE; + var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize; + var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC) + ? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed(); + List filtered = existingTopics.stream() + .filter(topic -> !topic.isInternal() + || showInternal != null && showInternal) + .filter(topic -> search == null || StringUtils.contains(topic.getName(), search)) + .sorted(comparator) + .collect(toList()); + var totalPages = (filtered.size() / pageSize) + + (filtered.size() % pageSize == 0 ? 0 : 1); + + List topicsPage = filtered.stream() + .skip(topicsToSkip) + .limit(pageSize) + .map(InternalTopic::getName) + .collect(toList()); + + return topicsService.loadTopics(getCluster(clusterName), topicsPage) + .map(topicsToRender -> + new TopicsResponseDTO() + .topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList())) + .pageCount(totalPages)); + }).map(ResponseEntity::ok); + } + + private Comparator getComparatorForTopic( + TopicColumnsToSortDTO orderBy) { + var defaultComparator = Comparator.comparing(InternalTopic::getName); + if (orderBy == null) { + return defaultComparator; + } + switch (orderBy) { + case TOTAL_PARTITIONS: + return Comparator.comparing(InternalTopic::getPartitionCount); + case OUT_OF_SYNC_REPLICAS: + return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas()); + case REPLICATION_FACTOR: + return Comparator.comparing(InternalTopic::getReplicationFactor); + case SIZE: + return Comparator.comparing(InternalTopic::getSegmentSize); + case NAME: + default: + return defaultComparator; + } } @Override @@ -100,7 +156,9 @@ public class TopicsController extends AbstractController implements TopicsApi { String clusterId, String topicName, @Valid Mono topicUpdate, ServerWebExchange exchange) { return topicsService - .updateTopic(getCluster(clusterId), topicName, topicUpdate).map(ResponseEntity::ok); + .updateTopic(getCluster(clusterId), topicName, topicUpdate) + .map(clusterMapper::toTopic) + .map(ResponseEntity::ok); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index f13865d805..1eb6199f96 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -126,7 +126,7 @@ public interface ClusterMapper { CompatibilityCheckResponseDTO toCompatibilityCheckResponse(InternalCompatibilityCheck dto); @Mapping(target = "compatibility", source = "compatibilityLevel") - CompatibilityLevelDTO toCompatibilityLevel(InternalCompatibilityLevel dto); + CompatibilityLevelDTO toCompatibilityLevelDto(InternalCompatibilityLevel dto); default List map(Map map) { return map.values().stream().map(this::toPartition).collect(Collectors.toList()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java index b5b3aa1680..28125874bb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java @@ -4,14 +4,12 @@ import com.provectus.kafka.ui.exception.InvalidRequestApiException; import com.provectus.kafka.ui.exception.LogDirNotFoundApiException; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException; -import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper; -import com.provectus.kafka.ui.model.BrokerConfigDTO; import com.provectus.kafka.ui.model.BrokerDTO; import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO; -import com.provectus.kafka.ui.model.BrokerMetricsDTO; import com.provectus.kafka.ui.model.BrokersLogdirsDTO; import com.provectus.kafka.ui.model.InternalBrokerConfig; +import com.provectus.kafka.ui.model.JmxBrokerMetrics; import com.provectus.kafka.ui.model.KafkaCluster; import java.util.Collections; import java.util.HashMap; @@ -40,7 +38,6 @@ public class BrokerService { private final MetricsCache metricsCache; private final AdminClientService adminClientService; private final DescribeLogDirsMapper describeLogDirsMapper; - private final ClusterMapper clusterMapper; private Mono>> loadBrokersConfig( KafkaCluster cluster, List brokersIds) { @@ -149,15 +146,13 @@ public class BrokerService { .flatMapMany(Flux::fromIterable); } - public Flux getBrokerConfig(KafkaCluster cluster, Integer brokerId) { - return getBrokersConfig(cluster, brokerId) - .map(clusterMapper::toBrokerConfig); + public Flux getBrokerConfig(KafkaCluster cluster, Integer brokerId) { + return getBrokersConfig(cluster, brokerId); } - public Mono getBrokerMetrics(KafkaCluster cluster, Integer brokerId) { + public Mono getBrokerMetrics(KafkaCluster cluster, Integer brokerId) { return Mono.justOrEmpty( - metricsCache.get(cluster).getJmxMetrics().getInternalBrokerMetrics().get(brokerId)) - .map(clusterMapper::toBrokerMetrics); + metricsCache.get(cluster).getJmxMetrics().getInternalBrokerMetrics().get(brokerId)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index 5207c0cdd1..f4c9355804 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -10,8 +10,6 @@ import com.provectus.kafka.ui.exception.SchemaNotFoundException; import com.provectus.kafka.ui.exception.SchemaTypeNotSupportedException; import com.provectus.kafka.ui.exception.UnprocessableEntityException; import com.provectus.kafka.ui.exception.ValidationException; -import com.provectus.kafka.ui.mapper.ClusterMapper; -import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO; import com.provectus.kafka.ui.model.CompatibilityLevelDTO; import com.provectus.kafka.ui.model.InternalSchemaRegistry; import com.provectus.kafka.ui.model.KafkaCluster; @@ -69,7 +67,6 @@ public class SchemaRegistryService { private static final String UNRECOGNIZED_FIELD_SCHEMA_TYPE = "Unrecognized field: schemaType"; private static final String INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA = "incompatible with an earlier schema"; - private final ClusterMapper mapper; private final WebClient webClient; public Mono> getAllLatestVersionSchemas(KafkaCluster cluster, @@ -136,7 +133,7 @@ public class SchemaRegistryService { .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName)) .map(tuple -> { SchemaSubjectDTO schema = tuple.getT1(); - String compatibilityLevel = tuple.getT2().getCompatibility().getValue(); + String compatibilityLevel = tuple.getT2().getCompatibilityLevel(); schema.setCompatibilityLevel(compatibilityLevel); return schema; }) @@ -279,7 +276,7 @@ public class SchemaRegistryService { return updateSchemaCompatibility(cluster, null, compatibilityLevel); } - public Mono getSchemaCompatibilityLevel(KafkaCluster cluster, + public Mono getSchemaCompatibilityLevel(KafkaCluster cluster, String schemaName) { String globalConfig = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}"; final var values = new LinkedMultiValueMap(); @@ -292,21 +289,20 @@ public class SchemaRegistryService { values) .retrieve() .bodyToMono(InternalCompatibilityLevel.class) - .map(mapper::toCompatibilityLevel) .onErrorResume(error -> Mono.empty()); } - public Mono getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) { + public Mono getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) { return this.getSchemaCompatibilityLevel(cluster, null); } - private Mono getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster, + private Mono getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster, String schemaName) { return this.getSchemaCompatibilityLevel(cluster, schemaName) .switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(cluster)); } - public Mono checksSchemaCompatibility( + public Mono checksSchemaCompatibility( KafkaCluster cluster, String schemaName, Mono newSchemaSubject) { return configuredWebClient( cluster, @@ -319,7 +315,6 @@ public class SchemaRegistryService { .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) .bodyToMono(InternalCompatibilityCheck.class) - .map(mapper::toCompatibilityCheckResponse) .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), () -> this.checksSchemaCompatibility(cluster, schemaName, newSchemaSubject)))); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index 265103a7cb..75c3298461 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -3,12 +3,10 @@ package com.provectus.kafka.ui.service; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import com.google.common.annotations.VisibleForTesting; import com.provectus.kafka.ui.exception.TopicMetadataException; import com.provectus.kafka.ui.exception.TopicNotFoundException; import com.provectus.kafka.ui.exception.TopicRecreationException; import com.provectus.kafka.ui.exception.ValidationException; -import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.Feature; import com.provectus.kafka.ui.model.InternalLogDirStats; import com.provectus.kafka.ui.model.InternalPartition; @@ -21,15 +19,9 @@ import com.provectus.kafka.ui.model.PartitionsIncreaseDTO; import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO; import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO; import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO; -import com.provectus.kafka.ui.model.SortOrderDTO; -import com.provectus.kafka.ui.model.TopicColumnsToSortDTO; -import com.provectus.kafka.ui.model.TopicConfigDTO; import com.provectus.kafka.ui.model.TopicCreationDTO; -import com.provectus.kafka.ui.model.TopicDTO; -import com.provectus.kafka.ui.model.TopicDetailsDTO; import com.provectus.kafka.ui.model.TopicMessageSchemaDTO; import com.provectus.kafka.ui.model.TopicUpdateDTO; -import com.provectus.kafka.ui.model.TopicsResponseDTO; import com.provectus.kafka.ui.serde.DeserializationService; import com.provectus.kafka.ui.util.JmxClusterUtil; import java.time.Duration; @@ -40,10 +32,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; @@ -61,10 +51,7 @@ import reactor.util.retry.Retry; @RequiredArgsConstructor public class TopicsService { - private static final Integer DEFAULT_PAGE_SIZE = 25; - private final AdminClientService adminClientService; - private final ClusterMapper clusterMapper; private final DeserializationService deserializationService; private final MetricsCache metricsCache; @Value("${topic.recreate.maxRetries:15}") @@ -76,25 +63,7 @@ public class TopicsService { @Value("${topic.load.after.create.delay.ms:500}") private int loadTopicAfterCreateDelayInMs; - public Mono getTopics(KafkaCluster cluster, - Optional pageNum, - Optional nullablePerPage, - Optional showInternal, - Optional search, - Optional sortBy, - Optional sortOrder) { - return adminClientService.get(cluster).flatMap(ac -> - new Pagination(ac, metricsCache.get(cluster)) - .getPage(pageNum, nullablePerPage, showInternal, search, sortBy, sortOrder) - .flatMap(page -> - loadTopics(cluster, page.getTopics()) - .map(topics -> - new TopicsResponseDTO() - .topics(topics.stream().map(clusterMapper::toTopic).collect(toList())) - .pageCount(page.getTotalPages())))); - } - - private Mono> loadTopics(KafkaCluster c, List topics) { + public Mono> loadTopics(KafkaCluster c, List topics) { if (topics.isEmpty()) { return Mono.just(List.of()); } @@ -186,18 +155,14 @@ public class TopicsService { .map(InternalPartitionsOffsets::new); } - public Mono getTopicDetails(KafkaCluster cluster, String topicName) { - return loadTopic(cluster, topicName).map(clusterMapper::toTopicDetails); + public Mono getTopicDetails(KafkaCluster cluster, String topicName) { + return loadTopic(cluster, topicName); } - public Mono> getTopicConfigs(KafkaCluster cluster, String topicName) { + public Mono> getTopicConfigs(KafkaCluster cluster, String topicName) { return adminClientService.get(cluster) .flatMap(ac -> ac.getTopicsConfig(List.of(topicName))) - .map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new)) - .map(lst -> lst.stream() - .map(InternalTopicConfig::from) - .map(clusterMapper::toTopicConfig) - .collect(toList())); + .map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new)); } private Mono createTopic(KafkaCluster c, ReactiveAdminClient adminClient, @@ -214,13 +179,12 @@ public class TopicsService { .flatMap(topicData -> loadTopicAfterCreation(c, topicData.getName())); } - public Mono createTopic(KafkaCluster cluster, Mono topicCreation) { + public Mono createTopic(KafkaCluster cluster, Mono topicCreation) { return adminClientService.get(cluster) - .flatMap(ac -> createTopic(cluster, ac, topicCreation)) - .map(clusterMapper::toTopic); + .flatMap(ac -> createTopic(cluster, ac, topicCreation)); } - public Mono recreateTopic(KafkaCluster cluster, String topicName) { + public Mono recreateTopic(KafkaCluster cluster, String topicName) { return loadTopic(cluster, topicName) .flatMap(t -> deleteTopic(cluster, topicName) .thenReturn(t) @@ -246,7 +210,7 @@ public class TopicsService { new TopicRecreationException(topicName, recreateMaxRetries * recreateDelayInSeconds)) ) - .flatMap(a -> loadTopicAfterCreation(cluster, topicName)).map(clusterMapper::toTopic) + .flatMap(a -> loadTopicAfterCreation(cluster, topicName)) ) ); } @@ -260,11 +224,10 @@ public class TopicsService { .then(loadTopic(cluster, topicName))); } - public Mono updateTopic(KafkaCluster cl, String topicName, + public Mono updateTopic(KafkaCluster cl, String topicName, Mono topicUpdate) { return topicUpdate - .flatMap(t -> updateTopic(cl, topicName, t)) - .map(clusterMapper::toTopic); + .flatMap(t -> updateTopic(cl, topicName, t)); } private Mono changeReplicationFactor( @@ -464,7 +427,7 @@ public class TopicsService { .getTopicSchema(topicName); } - public Mono cloneTopic( + public Mono cloneTopic( KafkaCluster cluster, String topicName, String newTopicName) { return loadTopic(cluster, topicName).flatMap(topic -> adminClientService.get(cluster) @@ -480,97 +443,27 @@ public class TopicsService { ) ).thenReturn(newTopicName) .flatMap(a -> loadTopicAfterCreation(cluster, newTopicName)) - .map(clusterMapper::toTopic) ); } - @VisibleForTesting - @lombok.Value - static class Pagination { - ReactiveAdminClient adminClient; - MetricsCache.Metrics metrics; + public Mono> getTopicsForPagination(KafkaCluster cluster) { + MetricsCache.Metrics metrics = metricsCache.get(cluster); + return filterExisting(cluster, metrics.getTopicDescriptions().keySet()) + .map(lst -> lst.stream() + .map(topicName -> + InternalTopic.from( + metrics.getTopicDescriptions().get(topicName), + metrics.getTopicConfigs().getOrDefault(topicName, List.of()), + InternalPartitionsOffsets.empty(), + metrics.getJmxMetrics(), + metrics.getLogDirInfo())) + .collect(toList()) + ); + } - @lombok.Value - static class Page { - List topics; - int totalPages; - } - - Mono getPage( - Optional pageNum, - Optional nullablePerPage, - Optional showInternal, - Optional search, - Optional sortBy, - Optional sortOrder) { - return geTopicsForPagination() - .map(paginatingTopics -> { - Predicate positiveInt = i -> i > 0; - int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); - var topicsToSkip = (pageNum.filter(positiveInt).orElse(1) - 1) * perPage; - var comparator = sortOrder.isEmpty() || !sortOrder.get().equals(SortOrderDTO.DESC) - ? getComparatorForTopic(sortBy) : getComparatorForTopic(sortBy).reversed(); - List topics = paginatingTopics.stream() - .filter(topic -> !topic.isInternal() - || showInternal.map(i -> topic.isInternal() == i).orElse(true)) - .filter(topic -> - search - .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s)) - .orElse(true)) - .sorted(comparator) - .collect(toList()); - var totalPages = (topics.size() / perPage) - + (topics.size() % perPage == 0 ? 0 : 1); - - List topicsToRender = topics.stream() - .skip(topicsToSkip) - .limit(perPage) - .map(InternalTopic::getName) - .collect(toList()); - - return new Page(topicsToRender, totalPages); - }); - } - - private Comparator getComparatorForTopic( - Optional sortBy) { - var defaultComparator = Comparator.comparing(InternalTopic::getName); - if (sortBy.isEmpty()) { - return defaultComparator; - } - switch (sortBy.get()) { - case TOTAL_PARTITIONS: - return Comparator.comparing(InternalTopic::getPartitionCount); - case OUT_OF_SYNC_REPLICAS: - return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas()); - case REPLICATION_FACTOR: - return Comparator.comparing(InternalTopic::getReplicationFactor); - case SIZE: - return Comparator.comparing(InternalTopic::getSegmentSize); - case NAME: - default: - return defaultComparator; - } - } - - private Mono> filterExisting(Collection topics) { - return adminClient.listTopics(true) - .map(existing -> existing.stream().filter(topics::contains).collect(toList())); - } - - private Mono> geTopicsForPagination() { - return filterExisting(metrics.getTopicDescriptions().keySet()) - .map(lst -> lst.stream() - .map(topicName -> - InternalTopic.from( - metrics.getTopicDescriptions().get(topicName), - metrics.getTopicConfigs().getOrDefault(topicName, List.of()), - InternalPartitionsOffsets.empty(), - metrics.getJmxMetrics(), - metrics.getLogDirInfo())) - .collect(toList()) - ); - } + private Mono> filterExisting(KafkaCluster cluster, Collection topics) { + return adminClientService.get(cluster).flatMap(ac -> ac.listTopics(true)) + .map(existing -> existing.stream().filter(topics::contains).collect(toList())); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java index 558a5bafb9..67ab240339 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.provectus.kafka.ui.controller.SchemasController; +import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.InternalSchemaRegistry; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.SchemaSubjectDTO; @@ -22,8 +23,9 @@ public class SchemaRegistryPaginationTest { private final SchemaRegistryService schemaRegistryService = mock(SchemaRegistryService.class); private final ClustersStorage clustersStorage = mock(ClustersStorage.class); + private final ClusterMapper clusterMapper = mock(ClusterMapper.class); - private SchemasController controller; + private final SchemasController controller = new SchemasController(clusterMapper, schemaRegistryService); private void init(String[] subjects) { when(schemaRegistryService.getAllSubjectNames(isA(KafkaCluster.class))) @@ -34,7 +36,6 @@ public class SchemaRegistryPaginationTest { .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME))); when(schemaRegistryService.getLatestSchemaVersionBySubject(isA(KafkaCluster.class), isA(String.class))) .thenAnswer(a -> Mono.just(new SchemaSubjectDTO().subject(a.getArgument(1)))); - this.controller = new SchemasController(schemaRegistryService); this.controller.setClustersStorage(clustersStorage); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java index c0954777dc..76b1e8c4cb 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java @@ -1,17 +1,31 @@ package com.provectus.kafka.ui.service; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.provectus.kafka.ui.controller.TopicsController; +import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.mapper.ClusterMapperImpl; +import com.provectus.kafka.ui.model.InternalLogDirStats; +import com.provectus.kafka.ui.model.InternalPartitionsOffsets; +import com.provectus.kafka.ui.model.InternalSchemaRegistry; +import com.provectus.kafka.ui.model.InternalTopic; +import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.SortOrderDTO; import com.provectus.kafka.ui.model.TopicColumnsToSortDTO; -import java.util.Collection; +import com.provectus.kafka.ui.model.TopicDTO; +import com.provectus.kafka.ui.util.JmxClusterUtil; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.clients.admin.TopicDescription; @@ -21,20 +35,26 @@ import reactor.core.publisher.Mono; class TopicsServicePaginationTest { - private TopicsService.Pagination pagination; + private static final String LOCAL_KAFKA_CLUSTER_NAME = "local"; - private void init(Collection topicsInCache) { - ReactiveAdminClient adminClient = when(mock(ReactiveAdminClient.class).listTopics(true)) - .thenReturn(Mono.just(topicsInCache.stream().map(TopicDescription::name) - .collect(Collectors.toSet()))) - .getMock(); + private final TopicsService topicsService = mock(TopicsService.class); + private final ClustersStorage clustersStorage = mock(ClustersStorage.class); + private final ClusterMapper clusterMapper = new ClusterMapperImpl(); - MetricsCache.Metrics metricsCache = MetricsCache.Metrics.empty().toBuilder() - .topicDescriptions( - topicsInCache.stream().collect(Collectors.toMap(TopicDescription::name, d -> d))) - .build(); + private final TopicsController topicsController = new TopicsController(topicsService, clusterMapper); - pagination = new TopicsService.Pagination(adminClient, metricsCache); + private void init(Map topicsInCache) { + + when(clustersStorage.getClusterByName(isA(String.class))) + .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME))); + when(topicsService.getTopicsForPagination(isA(KafkaCluster.class))) + .thenReturn(Mono.just(new ArrayList<>(topicsInCache.values()))); + when(topicsService.loadTopics(isA(KafkaCluster.class), anyList())) + .thenAnswer(a -> { + List lst = a.getArgument(1); + return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList())); + }); + this.topicsController.setClustersStorage(clustersStorage); } @Test @@ -43,35 +63,49 @@ class TopicsServicePaginationTest { IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) .map(name -> new TopicDescription(name, false, List.of())) - .collect(Collectors.toList()) + .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null, + JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty())) + .collect(Collectors.toMap(InternalTopic::getName, Function.identity())) ); - var topics = pagination.getPage( - Optional.empty(), Optional.empty(), Optional.empty(), - Optional.empty(), Optional.empty(), Optional.empty()).block(); - assertThat(topics.getTotalPages()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).isSorted(); + var topics = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null, + null, null, null).block(); + + assertThat(topics.getBody().getPageCount()).isEqualTo(4); + assertThat(topics.getBody().getTopics()).hasSize(25); + assertThat(topics.getBody().getTopics()) + .isSortedAccordingTo(Comparator.comparing(TopicDTO::getName)); + } + + private KafkaCluster buildKafkaCluster(String clusterName) { + return KafkaCluster.builder() + .name(clusterName) + .schemaRegistry(InternalSchemaRegistry.builder().build()) + .build(); } @Test public void shouldListFirst25TopicsSortedByNameDescendingOrder() { - var topicDescriptions = IntStream.rangeClosed(1, 100).boxed() + var internalTopics = IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) .map(name -> new TopicDescription(name, false, List.of())) - .collect(Collectors.toList()); - init(topicDescriptions); + .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null, + JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty())) + .collect(Collectors.toMap(InternalTopic::getName, Function.identity())); + init(internalTopics); - var topics = pagination.getPage( - Optional.empty(), Optional.empty(), Optional.empty(), - Optional.empty(), Optional.of(TopicColumnsToSortDTO.NAME), Optional.of(SortOrderDTO.DESC)).block(); - assertThat(topics.getTotalPages()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).isSortedAccordingTo(Comparator.reverseOrder()); - assertThat(topics.getTopics()).containsExactlyElementsOf( - topicDescriptions.stream() - .map(TopicDescription::name) - .sorted(Comparator.reverseOrder()) + var topics = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null, + TopicColumnsToSortDTO.NAME, SortOrderDTO.DESC, null).block(); + + assertThat(topics.getBody().getPageCount()).isEqualTo(4); + assertThat(topics.getBody().getTopics()).hasSize(25); + assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName).reversed()); + assertThat(topics.getBody().getTopics()).containsExactlyElementsOf( + internalTopics.values().stream() + .map(clusterMapper::toTopic) + .sorted(Comparator.comparing(TopicDTO::getName).reversed()) .limit(25) .collect(Collectors.toList()) ); @@ -83,14 +117,17 @@ class TopicsServicePaginationTest { IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) .map(name -> new TopicDescription(name, false, List.of())) - .collect(Collectors.toList()) + .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null, + JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty())) + .collect(Collectors.toMap(InternalTopic::getName, Function.identity())) ); - var topics = pagination.getPage(Optional.of(4), Optional.of(33), - Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()).block(); - assertThat(topics.getTotalPages()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(1) - .first().isEqualTo("99"); + var topics = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, 33, null, null, null, null, null).block(); + + assertThat(topics.getBody().getPageCount()).isEqualTo(4); + assertThat(topics.getBody().getTopics()).hasSize(1); + assertThat(topics.getBody().getTopics().get(0).getName().equals("99")); } @Test @@ -99,14 +136,17 @@ class TopicsServicePaginationTest { IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) .map(name -> new TopicDescription(name, false, List.of())) - .collect(Collectors.toList()) + .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null, + JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty())) + .collect(Collectors.toMap(InternalTopic::getName, Function.identity())) ); - var topics = pagination.getPage(Optional.of(0), Optional.of(-1), - Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()).block(); - assertThat(topics.getTotalPages()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).isSorted(); + var topics = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, null, null, null, null, null).block(); + + assertThat(topics.getBody().getPageCount()).isEqualTo(4); + assertThat(topics.getBody().getTopics()).hasSize(25); + assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName)); } @Test @@ -115,87 +155,103 @@ class TopicsServicePaginationTest { IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of())) - .collect(Collectors.toList()) + .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null, + JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty())) + .collect(Collectors.toMap(InternalTopic::getName, Function.identity())) ); - var topics = pagination.getPage( - Optional.empty(), Optional.empty(), Optional.of(true), - Optional.empty(), Optional.empty(), Optional.empty()).block(); - assertThat(topics.getTotalPages()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).isSorted(); - } + var topics = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, true, null, + null, null, null).block(); + assertThat(topics.getBody().getPageCount()).isEqualTo(4); + assertThat(topics.getBody().getTopics()).hasSize(25); + assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName)); + } @Test public void shouldListOnlyNonInternalTopics() { + init( IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) - .map(name -> new TopicDescription(name, false, List.of())) - .collect(Collectors.toList()) + .map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of())) + .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null, + JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty())) + .collect(Collectors.toMap(InternalTopic::getName, Function.identity())) ); - var topics = pagination.getPage( - Optional.empty(), Optional.empty(), Optional.of(true), - Optional.empty(), Optional.empty(), Optional.empty()).block(); - assertThat(topics.getTotalPages()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).isSorted(); - } + var topics = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, -1, false, null, + null, null, null).block(); + assertThat(topics.getBody().getPageCount()).isEqualTo(4); + assertThat(topics.getBody().getTopics()).hasSize(5); + assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName)); + } @Test public void shouldListOnlyTopicsContainingOne() { + init( IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) .map(name -> new TopicDescription(name, false, List.of())) - .collect(Collectors.toList()) + .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null, + JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty())) + .collect(Collectors.toMap(InternalTopic::getName, Function.identity())) ); - var topics = pagination.getPage( - Optional.empty(), Optional.empty(), Optional.empty(), - Optional.of("1"), Optional.empty(), Optional.empty()).block(); - assertThat(topics.getTotalPages()).isEqualTo(1); - assertThat(topics.getTopics()).hasSize(20); - assertThat(topics.getTopics()).isSorted(); + var topics = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, "1", + null, null, null).block(); + + assertThat(topics.getBody().getPageCount()).isEqualTo(1); + assertThat(topics.getBody().getTopics()).hasSize(20); + assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName)); } @Test public void shouldListTopicsOrderedByPartitionsCount() { - List topicDescriptions = IntStream.rangeClosed(1, 100).boxed() + Map internalTopics = IntStream.rangeClosed(1, 100).boxed() .map(i -> new TopicDescription(UUID.randomUUID().toString(), false, IntStream.range(0, i) .mapToObj(p -> new TopicPartitionInfo(p, null, List.of(), List.of())) .collect(Collectors.toList()))) - .collect(Collectors.toList()); + .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(), + JmxClusterUtil.JmxMetrics.empty(), InternalLogDirStats.empty())) + .collect(Collectors.toMap(InternalTopic::getName, Function.identity())); - init(topicDescriptions); + init(internalTopics); - var topics = pagination.getPage( - Optional.empty(), Optional.empty(), Optional.empty(), - Optional.empty(), Optional.of(TopicColumnsToSortDTO.TOTAL_PARTITIONS), Optional.empty()).block(); - assertThat(topics.getTotalPages()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).containsExactlyElementsOf( - topicDescriptions.stream() - .map(TopicDescription::name) + var topicsSortedAsc = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, + null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, null, null).block(); + + assertThat(topicsSortedAsc.getBody().getPageCount()).isEqualTo(4); + assertThat(topicsSortedAsc.getBody().getTopics()).hasSize(25); + assertThat(topicsSortedAsc.getBody().getTopics()).containsExactlyElementsOf( + internalTopics.values().stream() + .map(clusterMapper::toTopic) + .sorted(Comparator.comparing(TopicDTO::getPartitionCount)) .limit(25) - .collect(Collectors.toList())); + .collect(Collectors.toList()) + ); - var topicsSortedDesc = pagination.getPage( - Optional.empty(), Optional.empty(), Optional.empty(), - Optional.empty(), Optional.of(TopicColumnsToSortDTO.TOTAL_PARTITIONS), Optional.of(SortOrderDTO.DESC)).block(); - assertThat(topicsSortedDesc.getTotalPages()).isEqualTo(4); - assertThat(topicsSortedDesc.getTopics()).hasSize(25); - assertThat(topicsSortedDesc.getTopics()).containsExactlyElementsOf( - topicDescriptions.stream() - .sorted((a, b) -> b.partitions().size() - a.partitions().size()) - .map(TopicDescription::name) + var topicsSortedDesc = topicsController + .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, + null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, SortOrderDTO.DESC, null).block(); + + assertThat(topicsSortedDesc.getBody().getPageCount()).isEqualTo(4); + assertThat(topicsSortedDesc.getBody().getTopics()).hasSize(25); + assertThat(topicsSortedDesc.getBody().getTopics()).containsExactlyElementsOf( + internalTopics.values().stream() + .map(clusterMapper::toTopic) + .sorted(Comparator.comparing(TopicDTO::getPartitionCount).reversed()) .limit(25) - .collect(Collectors.toList())); + .collect(Collectors.toList()) + ); } }