From 43a0e383cff97ed7227c10d69f19185172b5a6e7 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Wed, 17 Nov 2021 13:10:55 +0300 Subject: [PATCH] Metrics & cache refactoring (#1036) Metrics retrieval refactoring: * metrics data moved to separate class with its own storage * topics list pagination logic changed for better actuality * some mappings moved to InternalXXX classed --- .../ui/controller/ClustersController.java | 8 +- .../kafka/ui/controller/TopicsController.java | 16 +- .../kafka/ui/mapper/ClusterMapper.java | 30 +- .../kafka/ui/model/CleanupPolicy.java | 3 +- .../kafka/ui/model/InternalBrokerConfig.java | 11 + .../kafka/ui/model/InternalClusterState.java | 63 +++ .../kafka/ui/model/InternalLogDirStats.java | 68 +++ .../kafka/ui/model/InternalPartition.java | 8 - .../ui/model/InternalPartitionsOffsets.java | 33 ++ .../kafka/ui/model/InternalTopic.java | 90 +++- .../kafka/ui/model/InternalTopicConfig.java | 19 + .../kafka/ui/model/KafkaCluster.java | 4 - .../kafka/ui/model/PartitionsStats.java | 38 ++ .../kafka/ui/service/BrokerService.java | 37 +- .../kafka/ui/service/ClusterService.java | 32 +- .../ui/service/ClustersMetricsScheduler.java | 11 +- .../kafka/ui/service/ClustersStorage.java | 43 +- .../kafka/ui/service/FeatureService.java | 40 +- .../kafka/ui/service/MessagesService.java | 7 +- .../kafka/ui/service/MetricsCache.java | 94 ++++ .../kafka/ui/service/MetricsService.java | 261 ++--------- .../kafka/ui/service/ReactiveAdminClient.java | 78 +++- .../kafka/ui/service/TopicsService.java | 411 ++++++++++-------- .../kafka/ui/service/ZookeeperService.java | 1 + .../provectus/kafka/ui/util/ClusterUtil.java | 102 +---- .../provectus/kafka/ui/util/Constants.java | 5 - .../kafka/ui/util/JmxClusterUtil.java | 9 + .../service/TopicsServicePaginationTest.java | 164 +++++++ .../kafka/ui/service/TopicsServiceTest.java | 225 ---------- 29 files changed, 992 insertions(+), 919 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionsStats.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/Constants.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java delete mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServiceTest.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java index 56670e5858..fa99bbb7ea 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java @@ -16,13 +16,13 @@ import reactor.core.publisher.Mono; @RestController @RequiredArgsConstructor @Log4j2 -public class ClustersController implements ClustersApi { +public class ClustersController extends AbstractController implements ClustersApi { private final ClusterService clusterService; @Override public Mono> getClusterMetrics(String clusterName, ServerWebExchange exchange) { - return clusterService.getClusterMetrics(clusterName) + return clusterService.getClusterMetrics(getCluster(clusterName)) .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()); } @@ -30,7 +30,7 @@ public class ClustersController implements ClustersApi { @Override public Mono> getClusterStats(String clusterName, ServerWebExchange exchange) { - return clusterService.getClusterStats(clusterName) + return clusterService.getClusterStats(getCluster(clusterName)) .map(ResponseEntity::ok) .onErrorReturn(ResponseEntity.notFound().build()); } @@ -43,6 +43,6 @@ public class ClustersController implements ClustersApi { @Override public Mono> updateClusterInfo(String clusterName, ServerWebExchange exchange) { - return clusterService.updateCluster(clusterName).map(ResponseEntity::ok); + return clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index d5295c274e..c3fd6c4859 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -48,18 +48,16 @@ public class TopicsController extends AbstractController implements TopicsApi { @Override public Mono>> getTopicConfigs( String clusterName, String topicName, ServerWebExchange exchange) { - return Mono.just( - ResponseEntity.ok( - Flux.fromIterable(topicsService.getTopicConfigs(getCluster(clusterName), topicName)))); + return topicsService.getTopicConfigs(getCluster(clusterName), topicName) + .map(Flux::fromIterable) + .map(ResponseEntity::ok); } @Override public Mono> getTopicDetails( String clusterName, String topicName, ServerWebExchange exchange) { - return Mono.just( - ResponseEntity.ok( - topicsService.getTopicDetails(getCluster(clusterName), topicName)) - ); + return topicsService.getTopicDetails(getCluster(clusterName), topicName) + .map(ResponseEntity::ok); } @Override @@ -69,7 +67,7 @@ public class TopicsController extends AbstractController implements TopicsApi { @Valid String search, @Valid TopicColumnsToSortDTO orderBy, ServerWebExchange exchange) { - return Mono.just(ResponseEntity.ok(topicsService + return topicsService .getTopics( getCluster(clusterName), Optional.ofNullable(page), @@ -77,7 +75,7 @@ public class TopicsController extends AbstractController implements TopicsApi { Optional.ofNullable(showInternal), Optional.ofNullable(search), Optional.ofNullable(orderBy) - ))); + ).map(ResponseEntity::ok); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index 896a042d29..71ff6aa5b2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -15,7 +15,7 @@ import com.provectus.kafka.ui.model.ConnectDTO; import com.provectus.kafka.ui.model.Feature; import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; -import com.provectus.kafka.ui.model.InternalClusterMetrics; +import com.provectus.kafka.ui.model.InternalClusterState; import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalSchemaRegistry; @@ -31,6 +31,7 @@ import com.provectus.kafka.ui.model.TopicDTO; import com.provectus.kafka.ui.model.TopicDetailsDTO; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel; +import com.provectus.kafka.ui.util.JmxClusterUtil; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; @@ -46,26 +47,18 @@ import org.mapstruct.Named; @Mapper(componentModel = "spring") public interface ClusterMapper { - @Mapping(target = "brokerCount", source = "metrics.brokerCount") - @Mapping(target = "status", source = "metrics.status") - @Mapping(target = "version", source = "metrics.version") - @Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount") - @Mapping(target = "topicCount", source = "metrics.topicCount") - @Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec") - @Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec") - ClusterDTO toCluster(KafkaCluster cluster); + ClusterDTO toCluster(InternalClusterState clusterState); @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName = "resolvePath") @Mapping(target = "properties", source = "properties", qualifiedByName = "setProperties") @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry") KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); - @Mapping(target = "diskUsage", source = "internalBrokerDiskUsage", - qualifiedByName = "mapDiskUsage") - ClusterStatsDTO toClusterStats(InternalClusterMetrics metrics); + ClusterStatsDTO toClusterStats(InternalClusterState clusterState); - @Mapping(target = "items", source = "metrics") - ClusterMetricsDTO toClusterMetrics(InternalClusterMetrics metrics); + default ClusterMetricsDTO toClusterMetrics(JmxClusterUtil.JmxMetrics jmxMetrics) { + return new ClusterMetricsDTO().items(jmxMetrics.getMetrics()); + } BrokerMetricsDTO toBrokerMetrics(JmxBrokerMetrics metrics); @@ -146,15 +139,6 @@ public interface ClusterMapper { return brokerDiskUsage; } - @Named("mapDiskUsage") - default List mapDiskUsage(Map brokers) { - if (brokers == null) { - return null; - } - return brokers.entrySet().stream().map(e -> this.map(e.getKey(), e.getValue())) - .collect(Collectors.toList()); - } - @Named("resolvePath") default Path resolvePath(String path) { if (path != null) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java index 6e75c3edec..3f7cdfca4c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/CleanupPolicy.java @@ -33,7 +33,6 @@ public enum CleanupPolicy { ) ) ).findFirst() - .orElseThrow(() -> - new IllegalEntityStateException("Unknown cleanup policy value: " + string)); + .orElse(UNKNOWN); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerConfig.java index b159b39696..8f352699d0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerConfig.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerConfig.java @@ -15,4 +15,15 @@ public class InternalBrokerConfig { private final boolean isSensitive; private final boolean isReadOnly; private final List synonyms; + + public static InternalBrokerConfig from(ConfigEntry configEntry) { + InternalBrokerConfig.InternalBrokerConfigBuilder builder = InternalBrokerConfig.builder() + .name(configEntry.name()) + .value(configEntry.value()) + .source(configEntry.source()) + .isReadOnly(configEntry.isReadOnly()) + .isSensitive(configEntry.isSensitive()) + .synonyms(configEntry.synonyms()); + return builder.build(); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java new file mode 100644 index 0000000000..e9163ae28c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java @@ -0,0 +1,63 @@ +package com.provectus.kafka.ui.model; + +import com.provectus.kafka.ui.service.MetricsCache; +import com.provectus.kafka.ui.util.ClusterUtil; +import java.math.BigDecimal; +import java.util.List; +import java.util.stream.Collectors; +import lombok.Data; + +@Data +public class InternalClusterState { + private String name; + private ServerStatusDTO status; + private Integer topicCount; + private Integer brokerCount; + private Integer zooKeeperStatus; + private Integer activeControllers; + private Integer onlinePartitionCount; + private Integer offlinePartitionCount; + private Integer inSyncReplicasCount; + private Integer outOfSyncReplicasCount; + private Integer underReplicatedPartitionCount; + private List diskUsage; + private String version; + private List features; + private BigDecimal bytesInPerSec; + private BigDecimal bytesOutPerSec; + + public InternalClusterState(KafkaCluster cluster, MetricsCache.Metrics metrics) { + name = cluster.getName(); + status = metrics.getStatus(); + topicCount = metrics.getTopicDescriptions().size(); + brokerCount = metrics.getClusterDescription().getNodes().size(); + zooKeeperStatus = ClusterUtil.convertToIntServerStatus(metrics.getZkStatus().getStatus()); + activeControllers = metrics.getClusterDescription().getController() != null ? 1 : 0; + version = metrics.getVersion(); + + if (metrics.getLogDirInfo() != null) { + diskUsage = metrics.getLogDirInfo().getBrokerStats().entrySet().stream() + .map(e -> new BrokerDiskUsageDTO() + .brokerId(e.getKey()) + .segmentSize(e.getValue().getSegmentSize()) + .segmentCount(e.getValue().getSegmentsCount())) + .collect(Collectors.toList()); + } + + features = metrics.getFeatures(); + + bytesInPerSec = metrics.getJmxMetrics().getBytesInPerSec().values().stream() + .reduce(BigDecimal.ZERO, BigDecimal::add); + + bytesOutPerSec = metrics.getJmxMetrics().getBytesOutPerSec().values().stream() + .reduce(BigDecimal.ZERO, BigDecimal::add); + + var partitionsStats = new PartitionsStats(metrics.getTopicDescriptions().values()); + onlinePartitionCount = partitionsStats.getOnlinePartitionCount(); + offlinePartitionCount = partitionsStats.getOfflinePartitionCount(); + inSyncReplicasCount = partitionsStats.getInSyncReplicasCount(); + outOfSyncReplicasCount = partitionsStats.getOutOfSyncReplicasCount(); + underReplicatedPartitionCount = partitionsStats.getUnderReplicatedPartitionCount(); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java new file mode 100644 index 0000000000..34ec3d59e3 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalLogDirStats.java @@ -0,0 +1,68 @@ +package com.provectus.kafka.ui.model; + +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.summarizingLong; +import static java.util.stream.Collectors.toList; + +import java.util.List; +import java.util.LongSummaryStatistics; +import java.util.Map; +import lombok.Value; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; + +@Value +public class InternalLogDirStats { + + @Value + public static class SegmentStats { + long segmentSize; + int segmentsCount; + + public SegmentStats(LongSummaryStatistics s) { + segmentSize = s.getSum(); + segmentsCount = (int) s.getCount(); + } + } + + Map partitionsStats; + Map topicStats; + Map brokerStats; + + public static InternalLogDirStats empty() { + return new InternalLogDirStats(Map.of()); + } + + public InternalLogDirStats(Map> log) { + final List> topicPartitions = + log.entrySet().stream().flatMap(b -> + b.getValue().entrySet().stream().flatMap(topicMap -> + topicMap.getValue().replicaInfos.entrySet().stream() + .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size)) + ) + ).collect(toList()); + + partitionsStats = topicPartitions.stream().collect( + groupingBy( + Tuple2::getT2, + collectingAndThen( + summarizingLong(Tuple3::getT3), SegmentStats::new))); + + topicStats = + topicPartitions.stream().collect( + groupingBy( + t -> t.getT2().topic(), + collectingAndThen( + summarizingLong(Tuple3::getT3), SegmentStats::new))); + + brokerStats = topicPartitions.stream().collect( + groupingBy( + Tuple2::getT1, + collectingAndThen( + summarizingLong(Tuple3::getT3), SegmentStats::new))); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java index a1a96c0e4e..76f916cb10 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartition.java @@ -13,7 +13,6 @@ public class InternalPartition { private final int inSyncReplicasCount; private final int replicasCount; - // should be updated manually on partitions return private final long offsetMin; private final long offsetMax; @@ -21,12 +20,5 @@ public class InternalPartition { private final long segmentSize; private final long segmentCount; - public InternalPartition withOffsets(long min, long max) { - return toBuilder().offsetMin(min).offsetMax(max).build(); - } - - public InternalPartition withSegmentStats(long segmentSize, long segmentCount) { - return toBuilder().segmentSize(segmentSize).segmentCount(segmentCount).build(); - } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java new file mode 100644 index 0000000000..9fb54a300e --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalPartitionsOffsets.java @@ -0,0 +1,33 @@ +package com.provectus.kafka.ui.model; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import java.util.Map; +import java.util.Optional; +import lombok.Value; +import org.apache.kafka.common.TopicPartition; + + +public class InternalPartitionsOffsets { + + @Value + public static class Offsets { + Long earliest; + Long latest; + } + + private final Table offsets = HashBasedTable.create(); + + public InternalPartitionsOffsets(Map offsetsMap) { + offsetsMap.forEach((tp, o) -> this.offsets.put(tp.topic(), tp.partition(), o)); + } + + public static InternalPartitionsOffsets empty() { + return new InternalPartitionsOffsets(Map.of()); + } + + public Optional get(String topic, int partition) { + return Optional.ofNullable(offsets.get(topic, partition)); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java index feb554635b..6d6c550063 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java @@ -1,10 +1,15 @@ package com.provectus.kafka.ui.model; +import com.provectus.kafka.ui.util.JmxClusterUtil; import java.math.BigDecimal; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import lombok.Builder; import lombok.Data; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; @Data @Builder(toBuilder = true) @@ -32,12 +37,87 @@ public class InternalTopic { private final long segmentSize; private final long segmentCount; - public InternalTopic withSegmentStats(long segmentSize, long segmentCount) { - return toBuilder().segmentSize(segmentSize).segmentCount(segmentCount).build(); - } + public static InternalTopic from(TopicDescription topicDescription, + List configs, + InternalPartitionsOffsets partitionsOffsets, + JmxClusterUtil.JmxMetrics jmxMetrics, + InternalLogDirStats logDirInfo) { + var topic = InternalTopic.builder(); + topic.internal( + topicDescription.isInternal() || topicDescription.name().startsWith("_") + ); + topic.name(topicDescription.name()); - public InternalTopic withIoRates(BigDecimal bytesInPerSec, BigDecimal bytesOutPerSec) { - return toBuilder().bytesInPerSec(bytesInPerSec).bytesOutPerSec(bytesOutPerSec).build(); + List partitions = topicDescription.partitions().stream() + .map(partition -> { + var partitionDto = InternalPartition.builder(); + + partitionDto.leader(partition.leader() != null ? partition.leader().id() : null); + partitionDto.partition(partition.partition()); + partitionDto.inSyncReplicasCount(partition.isr().size()); + partitionDto.replicasCount(partition.replicas().size()); + List replicas = partition.replicas().stream() + .map(r -> new InternalReplica(r.id(), + partition.leader() != null && partition.leader().id() != r.id(), + partition.isr().contains(r))) + .collect(Collectors.toList()); + partitionDto.replicas(replicas); + + partitionsOffsets.get(topicDescription.name(), partition.partition()) + .ifPresent(offsets -> { + partitionDto.offsetMin(offsets.getEarliest()); + partitionDto.offsetMax(offsets.getLatest()); + }); + + var segmentStats = + logDirInfo.getPartitionsStats().get( + new TopicPartition(topicDescription.name(), partition.partition())); + if (segmentStats != null) { + partitionDto.segmentCount(segmentStats.getSegmentsCount()); + partitionDto.segmentSize(segmentStats.getSegmentSize()); + } + + return partitionDto.build(); + }) + .collect(Collectors.toList()); + + topic.partitions(partitions.stream().collect( + Collectors.toMap(InternalPartition::getPartition, t -> t))); + + var partitionsStats = new PartitionsStats(topicDescription); + topic.replicas(partitionsStats.getReplicasCount()); + topic.partitionCount(partitionsStats.getPartitionsCount()); + topic.inSyncReplicas(partitionsStats.getInSyncReplicasCount()); + topic.underReplicatedPartitions(partitionsStats.getUnderReplicatedPartitionCount()); + + topic.replicationFactor( + topicDescription.partitions().isEmpty() + ? 0 + : topicDescription.partitions().get(0).replicas().size() + ); + + var segmentStats = logDirInfo.getTopicStats().get(topicDescription.name()); + if (segmentStats != null) { + topic.segmentCount(segmentStats.getSegmentsCount()); + topic.segmentSize(segmentStats.getSegmentSize()); + } + + topic.bytesOutPerSec(jmxMetrics.getBytesOutPerSec().get(topicDescription.name())); + topic.bytesOutPerSec(jmxMetrics.getBytesOutPerSec().get(topicDescription.name())); + + topic.topicConfigs( + configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList())); + + topic.cleanUpPolicy( + configs.stream() + .filter(config -> config.name().equals("cleanup.policy")) + .findFirst() + .map(ConfigEntry::value) + .map(CleanupPolicy::fromString) + .orElse(CleanupPolicy.UNKNOWN) + ); + + return topic.build(); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java index 7402d90216..294894ebc2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java @@ -1,11 +1,14 @@ package com.provectus.kafka.ui.model; +import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS; +import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; import java.util.List; import lombok.Builder; import lombok.Data; import org.apache.kafka.clients.admin.ConfigEntry; + @Data @Builder public class InternalTopicConfig { @@ -16,4 +19,20 @@ public class InternalTopicConfig { private final boolean isSensitive; private final boolean isReadOnly; private final List synonyms; + + public static InternalTopicConfig from(ConfigEntry configEntry) { + InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder() + .name(configEntry.name()) + .value(configEntry.value()) + .source(configEntry.source()) + .isReadOnly(configEntry.isReadOnly()) + .isSensitive(configEntry.isSensitive()) + .synonyms(configEntry.synonyms()); + if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) { + builder.defaultValue(configEntry.value()); + } else { + builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name())); + } + return builder.build(); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index aa49ca395e..56bbabdf89 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -26,14 +26,10 @@ public class KafkaCluster { private final List kafkaConnect; private final String schemaNameTemplate; private final String keySchemaNameTemplate; - private final List features; private final Path protobufFile; private final String protobufMessageName; private final Map protobufMessageNameByTopic; private final Properties properties; private final Boolean readOnly; private final Boolean disableLogDirsCollection; - - // state & metrics - private final InternalClusterMetrics metrics; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionsStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionsStats.java new file mode 100644 index 0000000000..bacac40819 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/PartitionsStats.java @@ -0,0 +1,38 @@ +package com.provectus.kafka.ui.model; + +import java.util.Collection; +import java.util.List; +import lombok.Data; +import org.apache.kafka.clients.admin.TopicDescription; + +@Data +public class PartitionsStats { + + private int partitionsCount; + private int replicasCount; + private int onlinePartitionCount; + private int offlinePartitionCount; + private int inSyncReplicasCount; + private int outOfSyncReplicasCount; + private int underReplicatedPartitionCount; + + public PartitionsStats(TopicDescription description) { + this(List.of(description)); + } + + public PartitionsStats(Collection topicDescriptions) { + topicDescriptions.stream() + .flatMap(t -> t.partitions().stream()) + .forEach(p -> { + partitionsCount++; + replicasCount += p.replicas().size(); + onlinePartitionCount += p.leader() != null ? 1 : 0; + offlinePartitionCount += p.leader() == null ? 1 : 0; + inSyncReplicasCount += p.isr().size(); + outOfSyncReplicasCount += (p.replicas().size() - p.isr().size()); + if (p.replicas().size() > p.isr().size()) { + underReplicatedPartitionCount++; + } + }); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java index 01b332cc6d..59043f8161 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java @@ -1,6 +1,5 @@ package com.provectus.kafka.ui.service; -import com.provectus.kafka.ui.exception.IllegalEntityStateException; import com.provectus.kafka.ui.exception.InvalidRequestApiException; import com.provectus.kafka.ui.exception.LogDirNotFoundApiException; import com.provectus.kafka.ui.exception.NotFoundException; @@ -14,8 +13,6 @@ import com.provectus.kafka.ui.model.BrokerMetricsDTO; import com.provectus.kafka.ui.model.BrokersLogdirsDTO; import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.util.ClusterUtil; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -40,14 +37,14 @@ import reactor.core.publisher.Mono; @Log4j2 public class BrokerService { + private final MetricsCache metricsCache; private final AdminClientService adminClientService; private final DescribeLogDirsMapper describeLogDirsMapper; private final ClusterMapper clusterMapper; private Mono>> loadBrokersConfig( KafkaCluster cluster, List brokersIds) { - return adminClientService.get(cluster) - .flatMap(ac -> ac.loadBrokersConfig(brokersIds)); + return adminClientService.get(cluster).flatMap(ac -> ac.loadBrokersConfig(brokersIds)); } private Mono> loadBrokersConfig( @@ -55,28 +52,19 @@ public class BrokerService { return loadBrokersConfig(cluster, Collections.singletonList(brokerId)) .map(map -> map.values().stream() .findFirst() - .orElseThrow(() -> new IllegalEntityStateException( - String.format("Config for broker %s not found", brokerId))) - ); - } - - public Mono> getBrokerConfigMap(KafkaCluster cluster, - Integer brokerId) { - return loadBrokersConfig(cluster, brokerId) - .map(list -> list.stream() - .collect(Collectors.toMap( - ConfigEntry::name, - ClusterUtil::mapToInternalBrokerConfig))); + .orElseThrow(() -> new NotFoundException( + String.format("Config for broker %s not found", brokerId)))); } private Flux getBrokersConfig(KafkaCluster cluster, Integer brokerId) { - if (!cluster.getMetrics().getBrokers().contains(brokerId)) { + if (metricsCache.get(cluster).getClusterDescription().getNodes() + .stream().noneMatch(node -> node.id() == brokerId)) { return Flux.error( new NotFoundException(String.format("Broker with id %s not found", brokerId))); } return loadBrokersConfig(cluster, brokerId) .map(list -> list.stream() - .map(ClusterUtil::mapToInternalBrokerConfig) + .map(InternalBrokerConfig::from) .collect(Collectors.toList())) .flatMapMany(Flux::fromIterable); } @@ -139,7 +127,10 @@ public class BrokerService { KafkaCluster cluster, List reqBrokers) { return adminClientService.get(cluster) .flatMap(admin -> { - List brokers = new ArrayList<>(cluster.getMetrics().getBrokers()); + List brokers = metricsCache.get(cluster).getClusterDescription().getNodes() + .stream() + .map(Node::id) + .collect(Collectors.toList()); if (reqBrokers != null && !reqBrokers.isEmpty()) { brokers.retainAll(reqBrokers); } @@ -162,9 +153,9 @@ public class BrokerService { .map(clusterMapper::toBrokerConfig); } - public Mono getBrokerMetrics(KafkaCluster cluster, Integer id) { - return Mono.just(cluster.getMetrics().getInternalBrokerMetrics()) - .map(m -> m.get(id)) + public Mono getBrokerMetrics(KafkaCluster cluster, Integer brokerId) { + return Mono.justOrEmpty( + metricsCache.get(cluster).getJmxMetrics().getInternalBrokerMetrics().get(brokerId)) .map(clusterMapper::toBrokerMetrics); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 2b2dc33e59..b4c2c97413 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -1,10 +1,10 @@ package com.provectus.kafka.ui.service; -import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.model.ClusterDTO; import com.provectus.kafka.ui.model.ClusterMetricsDTO; import com.provectus.kafka.ui.model.ClusterStatsDTO; +import com.provectus.kafka.ui.model.InternalClusterState; import com.provectus.kafka.ui.model.KafkaCluster; import java.util.List; import java.util.stream.Collectors; @@ -18,6 +18,7 @@ import reactor.core.publisher.Mono; @Log4j2 public class ClusterService { + private final MetricsCache metricsCache; private final ClustersStorage clustersStorage; private final ClusterMapper clusterMapper; private final MetricsService metricsService; @@ -25,32 +26,25 @@ public class ClusterService { public List getClusters() { return clustersStorage.getKafkaClusters() .stream() - .map(clusterMapper::toCluster) + .map(c -> clusterMapper.toCluster(new InternalClusterState(c, metricsCache.get(c)))) .collect(Collectors.toList()); } - public Mono getClusterStats(String name) { + public Mono getClusterStats(KafkaCluster cluster) { return Mono.justOrEmpty( - clustersStorage.getClusterByName(name) - .map(KafkaCluster::getMetrics) - .map(clusterMapper::toClusterStats) + clusterMapper.toClusterStats( + new InternalClusterState(cluster, metricsCache.get(cluster))) ); } - public Mono getClusterMetrics(String name) { - return Mono.justOrEmpty( - clustersStorage.getClusterByName(name) - .map(KafkaCluster::getMetrics) - .map(clusterMapper::toClusterMetrics) - ); + public Mono getClusterMetrics(KafkaCluster cluster) { + return Mono.just( + clusterMapper.toClusterMetrics( + metricsCache.get(cluster).getJmxMetrics())); } - public Mono updateCluster(String clusterName) { - return clustersStorage.getClusterByName(clusterName) - .map(cluster -> metricsService.updateClusterMetrics(cluster) - .doOnNext(updatedCluster -> clustersStorage - .setKafkaCluster(updatedCluster.getName(), updatedCluster)) - .map(clusterMapper::toCluster)) - .orElse(Mono.error(new ClusterNotFoundException())); + public Mono updateCluster(KafkaCluster cluster) { + return metricsService.updateCache(cluster) + .map(metrics -> clusterMapper.toCluster(new InternalClusterState(cluster, metrics))); } } \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java index 5dd1938fea..0555243bb1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersMetricsScheduler.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service; import java.util.Map; +import javax.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.scheduling.annotation.Scheduled; @@ -17,7 +18,11 @@ public class ClustersMetricsScheduler { private final MetricsService metricsService; - @Scheduled(fixedRateString = "${kafka.update-metrics-rate-millis:30000}") + @PostConstruct //need to fill metrics before application startup to prevent invalid state render + @Scheduled( + fixedRateString = "${kafka.update-metrics-rate-millis:30000}", + initialDelayString = "${kafka.update-metrics-rate-millis:30000}" + ) public void updateMetrics() { Flux.fromIterable(clustersStorage.getKafkaClustersMap().entrySet()) .parallel() @@ -25,9 +30,9 @@ public class ClustersMetricsScheduler { .map(Map.Entry::getValue) .flatMap(cluster -> { log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName()); - return metricsService.updateClusterMetrics(cluster); + return metricsService.updateCache(cluster) + .doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName())); }) - .doOnNext(s -> clustersStorage.setKafkaCluster(s.getName(), s)) .then() .block(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java index 4462e86121..b77135f034 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java @@ -2,11 +2,8 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.mapper.ClusterMapper; -import com.provectus.kafka.ui.model.InternalClusterMetrics; -import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.KafkaCluster; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -29,10 +26,7 @@ public class ClustersStorage { public void init() { for (ClustersProperties.Cluster clusterProperties : clusterProperties.getClusters()) { KafkaCluster cluster = clusterMapper.toKafkaCluster(clusterProperties); - kafkaClusters.put( - clusterProperties.getName(), - cluster.toBuilder().metrics(InternalClusterMetrics.empty()).build() - ); + kafkaClusters.put(clusterProperties.getName(), cluster); } } @@ -44,41 +38,6 @@ public class ClustersStorage { return Optional.ofNullable(kafkaClusters.get(clusterName)); } - public KafkaCluster setKafkaCluster(String key, KafkaCluster kafkaCluster) { - this.kafkaClusters.put(key, kafkaCluster); - return kafkaCluster; - } - - public void onTopicDeleted(String clusterName, String topicToDelete) { - var cluster = kafkaClusters.get(clusterName); - var topics = Optional.ofNullable(cluster.getMetrics().getTopics()) - .map(HashMap::new) - .orElseGet(HashMap::new); - topics.remove(topicToDelete); - setUpdatedTopics(cluster, topics); - } - - public void onTopicUpdated(String clusterName, InternalTopic updatedTopic) { - var cluster = kafkaClusters.get(clusterName); - var topics = Optional.ofNullable(cluster.getMetrics().getTopics()) - .map(HashMap::new) - .orElseGet(HashMap::new); - topics.put(updatedTopic.getName(), updatedTopic); - setUpdatedTopics(cluster, topics); - } - - private void setUpdatedTopics(KafkaCluster cluster, Map topics) { - setKafkaCluster( - cluster.getName(), - cluster.toBuilder() - .metrics( - cluster.getMetrics().toBuilder() - .topics(topics) - .build()) - .build() - ); - } - public Map getKafkaClustersMap() { return kafkaClusters; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index 760741bd38..29fc34b644 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -1,13 +1,13 @@ package com.provectus.kafka.ui.service; -import static com.provectus.kafka.ui.util.Constants.DELETE_TOPIC_ENABLE; - import com.provectus.kafka.ui.model.Feature; import com.provectus.kafka.ui.model.KafkaCluster; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.function.Predicate; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.kafka.common.Node; @@ -20,9 +20,11 @@ import reactor.core.publisher.Mono; @Log4j2 public class FeatureService { - private final BrokerService brokerService; + private static final String DELETE_TOPIC_ENABLED_SERVER_PROPERTY = "delete.topic.enable"; - public Mono> getAvailableFeatures(KafkaCluster cluster) { + private final AdminClientService adminClientService; + + public Mono> getAvailableFeatures(KafkaCluster cluster, @Nullable Node controller) { List> features = new ArrayList<>(); if (Optional.ofNullable(cluster.getKafkaConnect()) @@ -39,23 +41,25 @@ public class FeatureService { features.add(Mono.just(Feature.SCHEMA_REGISTRY)); } - features.add( - isTopicDeletionEnabled(cluster) - .flatMap(r -> r ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty()) - ); + if (controller != null) { + features.add( + isTopicDeletionEnabled(cluster, controller) + .flatMap(r -> r ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty()) + ); + } return Flux.fromIterable(features).flatMap(m -> m).collectList(); } - private Mono isTopicDeletionEnabled(KafkaCluster cluster) { - return brokerService.getController(cluster) - .map(Node::id) - .flatMap(broker -> brokerService.getBrokerConfigMap(cluster, broker)) - .map(config -> { - if (config != null && config.get(DELETE_TOPIC_ENABLE) != null) { - return Boolean.parseBoolean(config.get(DELETE_TOPIC_ENABLE).getValue()); - } - return false; - }); + private Mono isTopicDeletionEnabled(KafkaCluster cluster, Node controller) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id()))) + .map(config -> + config.values().stream() + .flatMap(Collection::stream) + .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY)) + .map(e -> Boolean.parseBoolean(e.value())) + .findFirst() + .orElse(false)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index ce4fa3e71d..abc1f6cb57 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -55,10 +55,11 @@ public class MessagesService { private final AdminClientService adminClientService; private final DeserializationService deserializationService; private final ConsumerGroupService consumerGroupService; + private final MetricsCache metricsCache; public Mono deleteTopicMessages(KafkaCluster cluster, String topicName, List partitionsToInclude) { - if (!cluster.getMetrics().getTopics().containsKey(topicName)) { + if (!metricsCache.get(cluster).getTopicDescriptions().containsKey(topicName)) { throw new TopicNotFoundException(); } return offsetsForDeletion(cluster, topicName, partitionsToInclude) @@ -84,8 +85,8 @@ public class MessagesService { throw new ValidationException("Invalid message: both key and value can't be null"); } if (msg.getPartition() != null - && msg.getPartition() > cluster.getMetrics().getTopics() - .get(topic).getPartitionCount() - 1) { + && msg.getPartition() > metricsCache.get(cluster).getTopicDescriptions() + .get(topic).partitions().size() - 1) { throw new ValidationException("Invalid partition"); } RecordSerDe serde = diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java new file mode 100644 index 0000000000..c915159bfa --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsCache.java @@ -0,0 +1,94 @@ +package com.provectus.kafka.ui.service; + +import com.provectus.kafka.ui.model.Feature; +import com.provectus.kafka.ui.model.InternalLogDirStats; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.model.ServerStatusDTO; +import com.provectus.kafka.ui.util.JmxClusterUtil; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import lombok.Builder; +import lombok.Value; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.TopicDescription; +import org.springframework.stereotype.Component; + +@Component +public class MetricsCache { + + @Value + @Builder(toBuilder = true) + public static class Metrics { + ServerStatusDTO status; + Throwable lastKafkaException; + String version; + List features; + ZookeeperService.ZkStatus zkStatus; + ReactiveAdminClient.ClusterDescription clusterDescription; + JmxClusterUtil.JmxMetrics jmxMetrics; + InternalLogDirStats logDirInfo; + Map topicDescriptions; + Map> topicConfigs; + } + + public static Metrics empty() { + return Metrics.builder() + .status(ServerStatusDTO.OFFLINE) + .version("Unknown") + .features(List.of()) + .zkStatus(new ZookeeperService.ZkStatus(ServerStatusDTO.OFFLINE, null)) + .clusterDescription( + new ReactiveAdminClient.ClusterDescription(null, null, List.of(), Set.of())) + .jmxMetrics(JmxClusterUtil.JmxMetrics.empty()) + .logDirInfo(InternalLogDirStats.empty()) + .topicDescriptions(Map.of()) + .topicConfigs(Map.of()) + .build(); + } + + private final Map cache = new ConcurrentHashMap<>(); + + public synchronized void replace(KafkaCluster c, Metrics stats) { + cache.put(c.getName(), stats); + } + + public synchronized void update(KafkaCluster c, + Map descriptions, + Map> configs) { + var metrics = get(c); + var updatedDescriptions = new HashMap<>(metrics.getTopicDescriptions()); + updatedDescriptions.putAll(descriptions); + var updatedConfigs = new HashMap<>(metrics.getTopicConfigs()); + updatedConfigs.putAll(configs); + replace( + c, + metrics.toBuilder() + .topicDescriptions(updatedDescriptions) + .topicConfigs(updatedConfigs) + .build() + ); + } + + public synchronized void onTopicDelete(KafkaCluster c, String topic) { + var metrics = get(c); + var updatedDescriptions = new HashMap<>(metrics.getTopicDescriptions()); + updatedDescriptions.remove(topic); + var updatedConfigs = new HashMap<>(metrics.getTopicConfigs()); + updatedConfigs.remove(topic); + replace( + c, + metrics.toBuilder() + .topicDescriptions(updatedDescriptions) + .topicConfigs(updatedConfigs) + .build() + ); + } + + public Metrics get(KafkaCluster c) { + return cache.get(c.getName()); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java index 4d163b43c4..514da582c7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MetricsService.java @@ -1,38 +1,18 @@ package com.provectus.kafka.ui.service; -import static com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription; -import static com.provectus.kafka.ui.service.ZookeeperService.ZkStatus; -import static com.provectus.kafka.ui.util.JmxClusterUtil.JmxMetrics; -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.summarizingLong; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; - -import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; -import com.provectus.kafka.ui.model.InternalClusterMetrics; -import com.provectus.kafka.ui.model.InternalPartition; -import com.provectus.kafka.ui.model.InternalTopic; +import com.provectus.kafka.ui.model.Feature; +import com.provectus.kafka.ui.model.InternalLogDirStats; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.ServerStatusDTO; -import com.provectus.kafka.ui.util.ClusterUtil; import com.provectus.kafka.ui.util.JmxClusterUtil; -import java.math.BigDecimal; import java.util.List; -import java.util.LongSummaryStatistics; import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import lombok.Builder; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.TopicDescription; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuple3; -import reactor.util.function.Tuples; @Service @RequiredArgsConstructor @@ -43,218 +23,55 @@ public class MetricsService { private final JmxClusterUtil jmxClusterUtil; private final AdminClientService adminClientService; private final FeatureService featureService; - private final TopicsService topicsService; + private final MetricsCache cache; - /** - * Updates cluster's metrics and topics structure. - * - * @param cluster to be updated - * @return cluster with up-to-date metrics and topics structure - */ - public Mono updateClusterMetrics(KafkaCluster cluster) { - return getMetrics(cluster) - .map(m -> cluster.toBuilder().metrics(m).build()) - .zipWith(featureService.getAvailableFeatures(cluster), - (c, features) -> c.toBuilder().features(features).build()); + public Mono updateCache(KafkaCluster c) { + return getMetrics(c).doOnSuccess(m -> cache.replace(c, m)); } - private Mono getMetrics(KafkaCluster cluster) { + private Mono getMetrics(KafkaCluster cluster) { return adminClientService.get(cluster).flatMap(ac -> - ac.describeCluster().flatMap( - description -> Mono.just( - MetricsCollector.builder() + ac.describeCluster().flatMap(description -> + Mono.zip( + List.of( + jmxClusterUtil.getBrokerMetrics(cluster, description.getNodes()), + zookeeperService.getZkStatus(cluster), + getLogDirInfo(cluster, ac), + featureService.getAvailableFeatures(cluster, description.getController()), + loadTopicConfigs(cluster), + describeTopics(cluster)), + results -> + MetricsCache.Metrics.builder() + .status(ServerStatusDTO.ONLINE) .clusterDescription(description) .version(ac.getVersion()) + .jmxMetrics((JmxClusterUtil.JmxMetrics) results[0]) + .zkStatus((ZookeeperService.ZkStatus) results[1]) + .logDirInfo((InternalLogDirStats) results[2]) + .features((List) results[3]) + .topicConfigs((Map>) results[4]) + .topicDescriptions((Map) results[5]) .build() - ) - .zipWith(jmxClusterUtil.getBrokerMetrics(cluster, description.getNodes()), - (b, jmx) -> b.toBuilder().jmxMetrics(jmx).build()) - .zipWith(zookeeperService.getZkStatus(cluster), - (b, status) -> b.toBuilder().zkStatus(status).build())) - .zipWith(topicsService.getTopicsData(ac), - (b, td) -> b.toBuilder().topicsData(td).build()) - .zipWith(getLogDirInfo(cluster, ac), - (b, ld) -> b.toBuilder().logDirResult(ld).build()) - .map(MetricsCollector::build) - ) - .doOnError(e -> - log.error("Failed to collect cluster {} info", cluster.getName(), e) - ).onErrorResume( - e -> Mono.just(cluster.getMetrics().toBuilder() - .status(ServerStatusDTO.OFFLINE) - .lastKafkaException(e) - .build()) - ); + )) + .doOnError(e -> + log.error("Failed to collect cluster {} info", cluster.getName(), e)) + .onErrorResume( + e -> Mono.just(MetricsCache.empty().toBuilder().lastKafkaException(e).build()))); } - @Builder(toBuilder = true) - private static class MetricsCollector { - String version; - ClusterDescription clusterDescription; - JmxMetrics jmxMetrics; - List topicsData; - ZkStatus zkStatus; - Optional logDirResult; // empty if log dir collection disabled - - InternalClusterMetrics build() { - var metricsBuilder = InternalClusterMetrics.builder(); - metricsBuilder.version(version); - metricsBuilder.status(ServerStatusDTO.ONLINE); - metricsBuilder.lastKafkaException(null); - - metricsBuilder.zookeeperStatus(zkStatus.getStatus()); - metricsBuilder.zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zkStatus.getStatus())); - metricsBuilder.lastZookeeperException(zkStatus.getError()); - - metricsBuilder.brokers( - clusterDescription.getNodes().stream().map(Node::id).collect(toList())); - metricsBuilder.brokerCount(clusterDescription.getNodes().size()); - metricsBuilder.activeControllers(clusterDescription.getController() != null ? 1 : 0); - - fillTopicsMetrics(metricsBuilder, topicsData); - fillJmxMetrics(metricsBuilder, jmxMetrics); - - logDirResult.ifPresent(r -> r.enrichWithLogDirInfo(metricsBuilder)); - - return metricsBuilder.build(); - } - } - - private static void fillJmxMetrics( - InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder, - JmxMetrics jmxMetrics) { - metricsBuilder.metrics(jmxMetrics.getMetrics()); - metricsBuilder.internalBrokerMetrics(jmxMetrics.getInternalBrokerMetrics()); - - metricsBuilder.bytesInPerSec( - jmxMetrics.getBytesInPerSec().values().stream() - .reduce(BigDecimal.ZERO, BigDecimal::add)); - - metricsBuilder.bytesOutPerSec( - jmxMetrics.getBytesOutPerSec().values().stream() - .reduce(BigDecimal.ZERO, BigDecimal::add)); - - metricsBuilder.topics( - metricsBuilder.build().getTopics().values().stream() - .map(t -> - t.withIoRates( - jmxMetrics.getBytesInPerSec().get(t.getName()), - jmxMetrics.getBytesOutPerSec().get(t.getName())) - ).collect(Collectors.toMap(InternalTopic::getName, t -> t)) - ); - } - - private Mono> getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) { + private Mono getLogDirInfo(KafkaCluster cluster, ReactiveAdminClient c) { if (cluster.getDisableLogDirsCollection() == null || !cluster.getDisableLogDirsCollection()) { - return c.describeLogDirs().map(LogDirInfo::new).map(Optional::of); + return c.describeLogDirs().map(InternalLogDirStats::new); } - return Mono.just(Optional.empty()); + return Mono.just(InternalLogDirStats.empty()); } - private static void fillTopicsMetrics( - InternalClusterMetrics.InternalClusterMetricsBuilder builder, - List topics) { - - int underReplicatedPartitions = 0; - int inSyncReplicasCount = 0; - int outOfSyncReplicasCount = 0; - int onlinePartitionCount = 0; - int offlinePartitionCount = 0; - - for (InternalTopic topic : topics) { - underReplicatedPartitions += topic.getUnderReplicatedPartitions(); - inSyncReplicasCount += topic.getInSyncReplicas(); - outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas()); - onlinePartitionCount += - topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1) - .sum(); - offlinePartitionCount += - topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1) - .sum(); - } - - builder - .underReplicatedPartitionCount(underReplicatedPartitions) - .inSyncReplicasCount(inSyncReplicasCount) - .outOfSyncReplicasCount(outOfSyncReplicasCount) - .onlinePartitionCount(onlinePartitionCount) - .offlinePartitionCount(offlinePartitionCount) - .topicCount(topics.size()) - .topics(topics.stream().collect(Collectors.toMap(InternalTopic::getName, t -> t))); + private Mono> describeTopics(KafkaCluster c) { + return adminClientService.get(c).flatMap(ReactiveAdminClient::describeTopics); } - private static class LogDirInfo { - - private final Map partitionsStats; - private final Map topicStats; - private final Map brokerStats; - - LogDirInfo(Map> log) { - final List> topicPartitions = - log.entrySet().stream().flatMap(b -> - b.getValue().entrySet().stream().flatMap(topicMap -> - topicMap.getValue().replicaInfos.entrySet().stream() - .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size)) - ) - ).collect(toList()); - - partitionsStats = topicPartitions.stream().collect( - groupingBy( - Tuple2::getT2, - summarizingLong(Tuple3::getT3))); - - topicStats = - topicPartitions.stream().collect( - groupingBy( - t -> t.getT2().topic(), - summarizingLong(Tuple3::getT3))); - - brokerStats = topicPartitions.stream().collect( - groupingBy( - Tuple2::getT1, - summarizingLong(Tuple3::getT3))); - } - - private InternalTopic enrichTopicWithSegmentStats(InternalTopic topic) { - LongSummaryStatistics stats = topicStats.get(topic.getName()); - return topic.withSegmentStats(stats.getSum(), stats.getCount()) - .toBuilder() - .partitions( - topic.getPartitions().entrySet().stream().map(e -> - Tuples.of(e.getKey(), - enrichPartitionWithSegmentsData(topic.getName(), e.getValue())) - ).collect(toMap(Tuple2::getT1, Tuple2::getT2)) - ).build(); - } - - private InternalPartition enrichPartitionWithSegmentsData(String topic, - InternalPartition partition) { - final LongSummaryStatistics stats = - partitionsStats.get(new TopicPartition(topic, partition.getPartition())); - return partition.withSegmentStats(stats.getSum(), stats.getCount()); - } - - private Map getBrokersDiskUsage() { - return brokerStats.entrySet().stream().map(e -> - Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder() - .segmentSize(e.getValue().getSum()) - .segmentCount(e.getValue().getCount()) - .build() - ) - ).collect(toMap(Tuple2::getT1, Tuple2::getT2)); - } - - private Map enrichTopics(Map topics) { - return topics.values().stream() - .map(this::enrichTopicWithSegmentStats) - .collect(Collectors.toMap(InternalTopic::getName, t -> t)); - } - - public void enrichWithLogDirInfo( - InternalClusterMetrics.InternalClusterMetricsBuilder builder) { - builder - .topics(enrichTopics(builder.build().getTopics())) - .internalBrokerDiskUsage(getBrokersDiskUsage()); - } + private Mono>> loadTopicConfigs(KafkaCluster c) { + return adminClientService.get(c).flatMap(ReactiveAdminClient::getTopicsConfig); } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index fadccd5a0d..88aa085773 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -15,7 +15,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -37,6 +39,7 @@ import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -45,8 +48,11 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; @Log4j2 @@ -112,20 +118,24 @@ public class ReactiveAdminClient implements Closeable { return version; } + public Mono>> getTopicsConfig() { + return listTopics(true).flatMap(this::getTopicsConfig); + } + public Mono>> getTopicsConfig(Collection topicNames) { List resources = topicNames.stream() .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .collect(toList()); - return toMono( + return toMonoWithExceptionFilter( client.describeConfigs( resources, - new DescribeConfigsOptions().includeSynonyms(true) - ).all()) - .map(config -> config.entrySet().stream() - .collect(toMap( - c -> c.getKey().name(), - c -> new ArrayList<>(c.getValue().entries())))); + new DescribeConfigsOptions().includeSynonyms(true)).values(), + UnknownTopicOrPartitionException.class + ).map(config -> config.entrySet().stream() + .collect(toMap( + c -> c.getKey().name(), + c -> List.copyOf(c.getValue().entries())))); } public Mono>> loadBrokersConfig(List brokerIds) { @@ -139,8 +149,56 @@ public class ReactiveAdminClient implements Closeable { c -> new ArrayList<>(c.getValue().entries())))); } + public Mono> describeTopics() { + return listTopics(true).flatMap(this::describeTopics); + } + public Mono> describeTopics(Collection topics) { - return toMono(client.describeTopics(topics).all()); + return toMonoWithExceptionFilter( + client.describeTopics(topics).values(), + UnknownTopicOrPartitionException.class + ); + } + + /** + * Kafka API often returns Map responses with KafkaFuture values. If we do allOf() + * logic resulting Mono will be failing if any of Futures finished with error. + * In some situations it is not what we what, ex. we call describeTopics(List names) method and + * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put + * such topics in resulting map. + *

+ * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures + * finished with clazz exception. + */ + private Mono> toMonoWithExceptionFilter(Map> values, + Class clazz) { + if (values.isEmpty()) { + return Mono.just(Map.of()); + } + + List>> monos = values.entrySet().stream() + .map(e -> toMono(e.getValue()).map(r -> Tuples.of(e.getKey(), r))) + .collect(toList()); + + return Mono.create(sink -> { + var finishedCnt = new AtomicInteger(); + var results = new ConcurrentHashMap(); + monos.forEach(mono -> mono.subscribe( + r -> { + results.put(r.getT1(), r.getT2()); + if (finishedCnt.incrementAndGet() == monos.size()) { + sink.success(results); + } + }, + th -> { + if (!th.getClass().isAssignableFrom(clazz)) { + sink.error(th); + } else if (finishedCnt.incrementAndGet() == monos.size()) { + sink.success(results); + } + } + )); + }); } public Mono>> describeLogDirs() { @@ -193,10 +251,6 @@ public class ReactiveAdminClient implements Closeable { ))); } - public Mono getClusterVersion() { - return getClusterVersionImpl(client); - } - public Mono deleteConsumerGroups(Collection groupIds) { return toMono(client.deleteConsumerGroups(groupIds).all()) .onErrorResume(GroupIdNotFoundException.class, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index 9b01d43de9..2a505cd00e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -1,12 +1,17 @@ package com.provectus.kafka.ui.service; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +import com.google.common.annotations.VisibleForTesting; import com.provectus.kafka.ui.exception.TopicMetadataException; import com.provectus.kafka.ui.exception.TopicNotFoundException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.mapper.ClusterMapper; -import com.provectus.kafka.ui.model.CleanupPolicy; import com.provectus.kafka.ui.model.Feature; +import com.provectus.kafka.ui.model.InternalLogDirStats; import com.provectus.kafka.ui.model.InternalPartition; +import com.provectus.kafka.ui.model.InternalPartitionsOffsets; import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; @@ -24,23 +29,26 @@ import com.provectus.kafka.ui.model.TopicMessageSchemaDTO; import com.provectus.kafka.ui.model.TopicUpdateDTO; import com.provectus.kafka.ui.model.TopicsResponseDTO; import com.provectus.kafka.ui.serde.DeserializationService; -import com.provectus.kafka.ui.util.ClusterUtil; +import com.provectus.kafka.ui.util.JmxClusterUtil; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.function.Predicate; -import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; +import lombok.Value; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.springframework.stereotype.Service; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Service @@ -50,102 +58,110 @@ public class TopicsService { private static final Integer DEFAULT_PAGE_SIZE = 25; private final AdminClientService adminClientService; - private final ConsumerGroupService consumerGroupService; - private final ClustersStorage clustersStorage; private final ClusterMapper clusterMapper; private final DeserializationService deserializationService; + private final MetricsCache metricsCache; - public TopicsResponseDTO getTopics(KafkaCluster cluster, - Optional page, - Optional nullablePerPage, - Optional showInternal, - Optional search, - Optional sortBy) { - Predicate positiveInt = i -> i > 0; - int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); - var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage; - List topics = cluster.getMetrics().getTopics().values().stream() - .filter(topic -> !topic.isInternal() - || showInternal - .map(i -> topic.isInternal() == i) - .orElse(true)) - .filter(topic -> - search - .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s)) - .orElse(true)) - .sorted(getComparatorForTopic(sortBy)) - .collect(Collectors.toList()); - var totalPages = (topics.size() / perPage) - + (topics.size() % perPage == 0 ? 0 : 1); - return new TopicsResponseDTO() - .pageCount(totalPages) - .topics( - topics.stream() - .skip(topicsToSkip) - .limit(perPage) - .map(t -> - clusterMapper.toTopic( - t.toBuilder().partitions(getTopicPartitions(cluster, t)).build() - ) - ) - .collect(Collectors.toList()) - ); + public Mono getTopics(KafkaCluster cluster, + Optional pageNum, + Optional nullablePerPage, + Optional showInternal, + Optional search, + Optional sortBy) { + return adminClientService.get(cluster).flatMap(ac -> + new Pagination(ac, metricsCache.get(cluster)) + .getPage(pageNum, nullablePerPage, showInternal, search, sortBy) + .flatMap(page -> + loadTopics(cluster, page.getTopics()) + .map(topics -> + new TopicsResponseDTO() + .topics(topics.stream().map(clusterMapper::toTopic).collect(toList())) + .pageCount(page.getTotalPages())))); } - private Comparator getComparatorForTopic(Optional sortBy) { - var defaultComparator = Comparator.comparing(InternalTopic::getName); - if (sortBy.isEmpty()) { - return defaultComparator; - } - switch (sortBy.get()) { - case TOTAL_PARTITIONS: - return Comparator.comparing(InternalTopic::getPartitionCount); - case OUT_OF_SYNC_REPLICAS: - return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas()); - case REPLICATION_FACTOR: - return Comparator.comparing(InternalTopic::getReplicationFactor); - case NAME: - default: - return defaultComparator; + private Mono> loadTopics(KafkaCluster c, List topics) { + if (topics.isEmpty()) { + return Mono.just(List.of()); } + return adminClientService.get(c) + .flatMap(ac -> + ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics), + (descriptions, configs) -> { + metricsCache.update(c, descriptions, configs); + return getPartitionOffsets(descriptions, ac).map(offsets -> { + var metrics = metricsCache.get(c); + return createList( + topics, + descriptions, + configs, + offsets, + metrics.getJmxMetrics(), + metrics.getLogDirInfo() + ); + }); + })).flatMap(Function.identity()); } - public TopicDetailsDTO getTopicDetails(KafkaCluster cluster, String topicName) { - var topic = getTopic(cluster, topicName); - var upToDatePartitions = getTopicPartitions(cluster, topic); - topic = topic.toBuilder().partitions(upToDatePartitions).build(); - return clusterMapper.toTopicDetails(topic); + private Mono loadTopic(KafkaCluster c, String topicName) { + return loadTopics(c, List.of(topicName)) + .map(lst -> lst.stream().findFirst().orElseThrow(TopicNotFoundException::new)); } - @SneakyThrows - public Mono> getTopicsData(ReactiveAdminClient client) { - return client.listTopics(true) - .flatMap(topics -> getTopicsData(client, topics).collectList()); + private List createList(List orderedNames, + Map descriptions, + Map> configs, + InternalPartitionsOffsets partitionsOffsets, + JmxClusterUtil.JmxMetrics jmxMetrics, + InternalLogDirStats logDirInfo) { + return orderedNames.stream() + .filter(descriptions::containsKey) + .map(t -> InternalTopic.from( + descriptions.get(t), + configs.getOrDefault(t, List.of()), + partitionsOffsets, + jmxMetrics, + logDirInfo + )) + .collect(toList()); } - private Flux getTopicsData(ReactiveAdminClient client, Collection topics) { - final Mono>> configsMono = - loadTopicsConfig(client, topics); + private Mono getPartitionOffsets(Map + descriptions, + ReactiveAdminClient ac) { + var topicPartitions = descriptions.values().stream() + .flatMap(desc -> + desc.partitions().stream().map(p -> new TopicPartition(desc.name(), p.partition()))) + .collect(toList()); - return client.describeTopics(topics) - .map(m -> m.values().stream() - .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())) - .flatMap(internalTopics -> configsMono - .map(configs -> mergeWithConfigs(internalTopics, configs).values())) - .flatMapMany(Flux::fromIterable); + return ac.listOffsets(topicPartitions, OffsetSpec.earliest()) + .zipWith(ac.listOffsets(topicPartitions, OffsetSpec.latest()), + (earliest, latest) -> + topicPartitions.stream() + .filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp)) + .map(tp -> + Map.entry(tp, + new InternalPartitionsOffsets.Offsets( + earliest.get(tp), latest.get(tp)))) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue))) + .map(InternalPartitionsOffsets::new); } - public List getTopicConfigs(KafkaCluster cluster, String topicName) { - var configs = getTopic(cluster, topicName).getTopicConfigs(); - return configs.stream() - .map(clusterMapper::toTopicConfig) - .collect(Collectors.toList()); + public Mono getTopicDetails(KafkaCluster cluster, String topicName) { + return loadTopic(cluster, topicName).map(clusterMapper::toTopicDetails); } + public Mono> getTopicConfigs(KafkaCluster cluster, String topicName) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.getTopicsConfig(List.of(topicName))) + .map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new)) + .map(lst -> lst.stream() + .map(InternalTopicConfig::from) + .map(clusterMapper::toTopicConfig) + .collect(toList())); + } - @SneakyThrows - private Mono createTopic(ReactiveAdminClient adminClient, - Mono topicCreation) { + private Mono createTopic(KafkaCluster c, ReactiveAdminClient adminClient, + Mono topicCreation) { return topicCreation.flatMap(topicData -> adminClient.createTopic( topicData.getName(), @@ -155,73 +171,39 @@ public class TopicsService { ).thenReturn(topicData) ) .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage()))) - .flatMap(topicData -> getUpdatedTopic(adminClient, topicData.getName())) - .switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic"))); + .flatMap(topicData -> loadTopic(c, topicData.getName())); } - public Mono createTopic( - KafkaCluster cluster, Mono topicCreation) { - return adminClientService.get(cluster).flatMap(ac -> createTopic(ac, topicCreation)) - .doOnNext(t -> clustersStorage.onTopicUpdated(cluster.getName(), t)) + public Mono createTopic(KafkaCluster cluster, Mono topicCreation) { + return adminClientService.get(cluster) + .flatMap(ac -> createTopic(cluster, ac, topicCreation)) .map(clusterMapper::toTopic); } - private Map mergeWithConfigs( - List topics, Map> configs) { - return topics.stream() - .map(t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build()) - .map(t -> t.toBuilder().cleanUpPolicy( - CleanupPolicy.fromString(t.getTopicConfigs().stream() - .filter(config -> config.getName().equals("cleanup.policy")) - .findFirst() - .orElseGet(() -> InternalTopicConfig.builder().value("unknown").build()) - .getValue())).build()) - .collect(Collectors.toMap( - InternalTopic::getName, - e -> e - )); - } - - public Mono getUpdatedTopic(ReactiveAdminClient ac, String topicName) { - return getTopicsData(ac, List.of(topicName)).next(); - } - - public Mono updateTopic(KafkaCluster cluster, + private Mono updateTopic(KafkaCluster cluster, String topicName, TopicUpdateDTO topicUpdate) { return adminClientService.get(cluster) .flatMap(ac -> - ac.updateTopicConfig(topicName, - topicUpdate.getConfigs()).then(getUpdatedTopic(ac, topicName))); + ac.updateTopicConfig(topicName, topicUpdate.getConfigs()) + .then(loadTopic(cluster, topicName))); } public Mono updateTopic(KafkaCluster cl, String topicName, Mono topicUpdate) { return topicUpdate .flatMap(t -> updateTopic(cl, topicName, t)) - .doOnNext(t -> clustersStorage.onTopicUpdated(cl.getName(), t)) .map(clusterMapper::toTopic); } - @SneakyThrows - private Mono>> loadTopicsConfig( - ReactiveAdminClient client, Collection topicNames) { - return client.getTopicsConfig(topicNames) - .map(configs -> - configs.entrySet().stream().collect(Collectors.toMap( - Map.Entry::getKey, - c -> c.getValue().stream() - .map(ClusterUtil::mapToInternalTopicConfig) - .collect(Collectors.toList())))); - } - private Mono changeReplicationFactor( + KafkaCluster cluster, ReactiveAdminClient adminClient, String topicName, Map> reassignments ) { return adminClient.alterPartitionReassignments(reassignments) - .then(getUpdatedTopic(adminClient, topicName)); + .then(loadTopic(cluster, topicName)); } /** @@ -231,11 +213,12 @@ public class TopicsService { KafkaCluster cluster, String topicName, ReplicationFactorChangeDTO replicationFactorChange) { - return adminClientService.get(cluster) + return loadTopic(cluster, topicName).flatMap(topic -> adminClientService.get(cluster) .flatMap(ac -> { - Integer actual = getTopic(cluster, topicName).getReplicationFactor(); + Integer actual = topic.getReplicationFactor(); Integer requested = replicationFactorChange.getTotalReplicationFactor(); - Integer brokersCount = cluster.getMetrics().getBrokerCount(); + Integer brokersCount = metricsCache.get(cluster).getClusterDescription() + .getNodes().size(); if (requested.equals(actual)) { return Mono.error( @@ -248,25 +231,24 @@ public class TopicsService { String.format("Requested replication factor %s more than brokers count %s.", requested, brokersCount))); } - return changeReplicationFactor(ac, topicName, - getPartitionsReassignments(cluster, topicName, + return changeReplicationFactor(cluster, ac, topicName, + getPartitionsReassignments(cluster, topic, replicationFactorChange)); }) - .doOnNext(topic -> clustersStorage.onTopicUpdated(cluster.getName(), topic)) .map(t -> new ReplicationFactorChangeResponseDTO() .topicName(t.getName()) - .totalReplicationFactor(t.getReplicationFactor())); + .totalReplicationFactor(t.getReplicationFactor()))); } private Map> getPartitionsReassignments( KafkaCluster cluster, - String topicName, + InternalTopic topic, ReplicationFactorChangeDTO replicationFactorChange) { // Current assignment map (Partition number -> List of brokers) - Map> currentAssignment = getCurrentAssignment(cluster, topicName); + Map> currentAssignment = getCurrentAssignment(topic); // Brokers map (Broker id -> count) Map brokersUsage = getBrokersMap(cluster, currentAssignment); - int currentReplicationFactor = getTopic(cluster, topicName).getReplicationFactor(); + int currentReplicationFactor = topic.getReplicationFactor(); // If we should to increase Replication factor if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) { @@ -276,7 +258,7 @@ public class TopicsService { var brokers = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) - .collect(Collectors.toList()); + .collect(toList()); // Iterate brokers and try to add them in assignment // while (partition replicas count != requested replication factor) @@ -304,13 +286,13 @@ public class TopicsService { var brokersUsageList = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) .map(Map.Entry::getKey) - .collect(Collectors.toList()); + .collect(toList()); // Iterate brokers and try to remove them from assignment // while (partition replicas count != requested replication factor) for (Integer broker : brokersUsageList) { // Check is the broker the leader of partition - if (!getTopic(cluster, topicName).getPartitions().get(partition).getLeader() + if (!topic.getPartitions().get(partition).getLeader() .equals(broker)) { brokers.remove(broker); brokersUsage.merge(broker, -1, Integer::sum); @@ -328,26 +310,28 @@ public class TopicsService { } // Return result map - return currentAssignment.entrySet().stream().collect(Collectors.toMap( - e -> new TopicPartition(topicName, e.getKey()), + return currentAssignment.entrySet().stream().collect(toMap( + e -> new TopicPartition(topic.getName(), e.getKey()), e -> Optional.of(new NewPartitionReassignment(e.getValue())) )); } - private Map> getCurrentAssignment(KafkaCluster cluster, String topicName) { - return getTopic(cluster, topicName).getPartitions().values().stream() - .collect(Collectors.toMap( + private Map> getCurrentAssignment(InternalTopic topic) { + return topic.getPartitions().values().stream() + .collect(toMap( InternalPartition::getPartition, p -> p.getReplicas().stream() .map(InternalReplica::getBroker) - .collect(Collectors.toList()) + .collect(toList()) )); } private Map getBrokersMap(KafkaCluster cluster, Map> currentAssignment) { - Map result = cluster.getMetrics().getBrokers().stream() - .collect(Collectors.toMap( + Map result = metricsCache.get(cluster).getClusterDescription().getNodes() + .stream() + .map(Node::id) + .collect(toMap( c -> c, c -> 0 )); @@ -361,9 +345,9 @@ public class TopicsService { KafkaCluster cluster, String topicName, PartitionsIncreaseDTO partitionsIncrease) { - return adminClientService.get(cluster) - .flatMap(ac -> { - Integer actualCount = getTopic(cluster, topicName).getPartitionCount(); + return loadTopic(cluster, topicName).flatMap(topic -> + adminClientService.get(cluster).flatMap(ac -> { + Integer actualCount = topic.getPartitionCount(); Integer requestedCount = partitionsIncrease.getTotalPartitionsCount(); if (requestedCount < actualCount) { @@ -383,55 +367,24 @@ public class TopicsService { NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount()) ); return ac.createPartitions(newPartitionsMap) - .then(getUpdatedTopic(ac, topicName)); + .then(loadTopic(cluster, topicName)); }) - .doOnNext(t -> clustersStorage.onTopicUpdated(cluster.getName(), t)) .map(t -> new PartitionsIncreaseResponseDTO() .topicName(t.getName()) - .totalPartitionsCount(t.getPartitionCount())); - } - - private Map getTopicPartitions(KafkaCluster c, InternalTopic topic) { - var tps = topic.getPartitions().values().stream() - .map(t -> new TopicPartition(topic.getName(), t.getPartition())) - .collect(Collectors.toList()); - Map partitions = - topic.getPartitions().values().stream().collect(Collectors.toMap( - InternalPartition::getPartition, - tp -> tp - )); - - try (var consumer = consumerGroupService.createConsumer(c)) { - final Map earliest = consumer.beginningOffsets(tps); - final Map latest = consumer.endOffsets(tps); - - return tps.stream() - .map(tp -> partitions.get(tp.partition()) - .withOffsets( - earliest.getOrDefault(tp, -1L), - latest.getOrDefault(tp, -1L) - ) - ).collect(Collectors.toMap( - InternalPartition::getPartition, - tp -> tp - )); - } catch (Exception e) { - return Collections.emptyMap(); - } + .totalPartitionsCount(t.getPartitionCount()))); } public Mono deleteTopic(KafkaCluster cluster, String topicName) { - var topicDetails = getTopicDetails(cluster, topicName); - if (cluster.getFeatures().contains(Feature.TOPIC_DELETION)) { + if (metricsCache.get(cluster).getFeatures().contains(Feature.TOPIC_DELETION)) { return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName)) - .doOnSuccess(t -> clustersStorage.onTopicDeleted(cluster.getName(), topicName)); + .doOnSuccess(t -> metricsCache.onTopicDelete(cluster, topicName)); } else { return Mono.error(new ValidationException("Topic deletion restricted")); } } public TopicMessageSchemaDTO getTopicSchema(KafkaCluster cluster, String topicName) { - if (!cluster.getMetrics().getTopics().containsKey(topicName)) { + if (!metricsCache.get(cluster).getTopicDescriptions().containsKey(topicName)) { throw new TopicNotFoundException(); } return deserializationService @@ -439,12 +392,88 @@ public class TopicsService { .getTopicSchema(topicName); } - private InternalTopic getTopic(KafkaCluster c, String topicName) { - var topic = c.getMetrics().getTopics().get(topicName); - if (topic == null) { - throw new TopicNotFoundException(); + @VisibleForTesting + @Value + static class Pagination { + ReactiveAdminClient adminClient; + MetricsCache.Metrics metrics; + + @Value + static class Page { + List topics; + int totalPages; + } + + Mono getPage( + Optional pageNum, + Optional nullablePerPage, + Optional showInternal, + Optional search, + Optional sortBy) { + return geTopicsForPagination() + .map(paginatingTopics -> { + Predicate positiveInt = i -> i > 0; + int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE); + var topicsToSkip = (pageNum.filter(positiveInt).orElse(1) - 1) * perPage; + List topics = paginatingTopics.stream() + .filter(topic -> !topic.isInternal() + || showInternal.map(i -> topic.isInternal() == i).orElse(true)) + .filter(topic -> + search + .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s)) + .orElse(true)) + .sorted(getComparatorForTopic(sortBy)) + .collect(toList()); + var totalPages = (topics.size() / perPage) + + (topics.size() % perPage == 0 ? 0 : 1); + + List topicsToRender = topics.stream() + .skip(topicsToSkip) + .limit(perPage) + .map(InternalTopic::getName) + .collect(toList()); + + return new Page(topicsToRender, totalPages); + }); + } + + private Comparator getComparatorForTopic( + Optional sortBy) { + var defaultComparator = Comparator.comparing(InternalTopic::getName); + if (sortBy.isEmpty()) { + return defaultComparator; + } + switch (sortBy.get()) { + case TOTAL_PARTITIONS: + return Comparator.comparing(InternalTopic::getPartitionCount); + case OUT_OF_SYNC_REPLICAS: + return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas()); + case REPLICATION_FACTOR: + return Comparator.comparing(InternalTopic::getReplicationFactor); + case NAME: + default: + return defaultComparator; + } + } + + private Mono> filterExisting(Collection topics) { + return adminClient.listTopics(true) + .map(existing -> existing.stream().filter(topics::contains).collect(toList())); + } + + private Mono> geTopicsForPagination() { + return filterExisting(metrics.getTopicDescriptions().keySet()) + .map(lst -> lst.stream() + .map(topicName -> + InternalTopic.from( + metrics.getTopicDescriptions().get(topicName), + metrics.getTopicConfigs().getOrDefault(topicName, List.of()), + InternalPartitionsOffsets.empty(), + metrics.getJmxMetrics(), + metrics.getLogDirInfo())) + .collect(toList()) + ); } - return topic; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java index 04b7ecc890..94ba9f5cab 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ZookeeperService.java @@ -15,6 +15,7 @@ import org.apache.zookeeper.ZooKeeper; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; @Service @RequiredArgsConstructor diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index 84fcd3543c..d2e1a18b0e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -1,19 +1,11 @@ package com.provectus.kafka.ui.util; -import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS; -import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; - import com.provectus.kafka.ui.model.BrokerDTO; import com.provectus.kafka.ui.model.ConsumerGroupDTO; import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; import com.provectus.kafka.ui.model.ConsumerGroupStateDTO; import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO; -import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalConsumerGroup; -import com.provectus.kafka.ui.model.InternalPartition; -import com.provectus.kafka.ui.model.InternalReplica; -import com.provectus.kafka.ui.model.InternalTopic; -import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.MessageFormatDTO; import com.provectus.kafka.ui.model.ServerStatusDTO; import com.provectus.kafka.ui.model.TopicMessageDTO; @@ -24,16 +16,13 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.log4j.Log4j2; -import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; -import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; @@ -41,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Bytes; + @Log4j2 public class ClusterUtil { @@ -163,89 +153,6 @@ public class ClusterUtil { } } - - public static InternalTopicConfig mapToInternalTopicConfig(ConfigEntry configEntry) { - InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder() - .name(configEntry.name()) - .value(configEntry.value()) - .source(configEntry.source()) - .isReadOnly(configEntry.isReadOnly()) - .isSensitive(configEntry.isSensitive()) - .synonyms(configEntry.synonyms()); - if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) { - builder.defaultValue(configEntry.value()); - } else { - builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name())); - } - return builder.build(); - } - - public static InternalBrokerConfig mapToInternalBrokerConfig(ConfigEntry configEntry) { - InternalBrokerConfig.InternalBrokerConfigBuilder builder = InternalBrokerConfig.builder() - .name(configEntry.name()) - .value(configEntry.value()) - .source(configEntry.source()) - .isReadOnly(configEntry.isReadOnly()) - .isSensitive(configEntry.isSensitive()) - .synonyms(configEntry.synonyms()); - return builder.build(); - } - - public static InternalTopic mapToInternalTopic(TopicDescription topicDescription) { - var topic = InternalTopic.builder(); - topic.internal( - topicDescription.isInternal() || topicDescription.name().startsWith("_") - ); - topic.name(topicDescription.name()); - - List partitions = topicDescription.partitions().stream().map( - partition -> { - var partitionDto = InternalPartition.builder(); - partitionDto.leader(partition.leader().id()); - partitionDto.partition(partition.partition()); - partitionDto.inSyncReplicasCount(partition.isr().size()); - partitionDto.replicasCount(partition.replicas().size()); - List replicas = partition.replicas().stream().map( - r -> new InternalReplica(r.id(), partition.leader().id() != r.id(), - partition.isr().contains(r))) - .collect(Collectors.toList()); - partitionDto.replicas(replicas); - return partitionDto.build(); - }) - .collect(Collectors.toList()); - - int urpCount = partitions.stream() - .flatMap(partition -> partition.getReplicas().stream()) - .filter(p -> !p.isInSync()).mapToInt(e -> 1) - .sum(); - - int inSyncReplicasCount = partitions.stream() - .mapToInt(InternalPartition::getInSyncReplicasCount) - .sum(); - - int replicasCount = partitions.stream() - .mapToInt(InternalPartition::getReplicasCount) - .sum(); - - topic.partitions(partitions.stream().collect(Collectors.toMap( - InternalPartition::getPartition, - t -> t - ))); - topic.replicas(replicasCount); - topic.partitionCount(topicDescription.partitions().size()); - topic.inSyncReplicas(inSyncReplicasCount); - - topic.replicationFactor( - topicDescription.partitions().isEmpty() - ? 0 - : topicDescription.partitions().get(0).replicas().size() - ); - - topic.underReplicatedPartitions(urpCount); - - return topic.build(); - } - public static int convertToIntServerStatus(ServerStatusDTO serverStatus) { return serverStatus.equals(ServerStatusDTO.ONLINE) ? 1 : 0; } @@ -305,13 +212,6 @@ public class ClusterUtil { } } - - public static Map toSingleMap(Stream> streamOfMaps) { - return streamOfMaps - .reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow(); - } - public static Optional filterConsumerGroupTopic( InternalConsumerGroup consumerGroup, Optional topic) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/Constants.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/Constants.java deleted file mode 100644 index 15b24c66f7..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/Constants.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.provectus.kafka.ui.util; - -public class Constants { - public static final String DELETE_TOPIC_ENABLE = "delete.topic.enable"; -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java index 5f928db2ff..1be1e1a8d8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/JmxClusterUtil.java @@ -56,6 +56,15 @@ public class JmxClusterUtil { Map bytesOutPerSec; Map internalBrokerMetrics; List metrics; + + public static JmxMetrics empty() { + return JmxClusterUtil.JmxMetrics.builder() + .bytesInPerSec(Map.of()) + .bytesOutPerSec(Map.of()) + .internalBrokerMetrics(Map.of()) + .metrics(List.of()) + .build(); + } } public Mono getBrokerMetrics(KafkaCluster cluster, Collection nodes) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java new file mode 100644 index 0000000000..1b107bd622 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java @@ -0,0 +1,164 @@ +package com.provectus.kafka.ui.service; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.provectus.kafka.ui.model.TopicColumnsToSortDTO; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartitionInfo; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +class TopicsServicePaginationTest { + + private TopicsService.Pagination pagination; + + private void init(Collection topicsInCache) { + ReactiveAdminClient adminClient = when(mock(ReactiveAdminClient.class).listTopics(true)) + .thenReturn(Mono.just(topicsInCache.stream().map(TopicDescription::name) + .collect(Collectors.toSet()))) + .getMock(); + + MetricsCache.Metrics metricsCache = MetricsCache.empty().toBuilder() + .topicDescriptions( + topicsInCache.stream().collect(Collectors.toMap(TopicDescription::name, d -> d))) + .build(); + + pagination = new TopicsService.Pagination(adminClient, metricsCache); + } + + @Test + public void shouldListFirst25Topics() { + init( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .map(name -> new TopicDescription(name, false, List.of())) + .collect(Collectors.toList()) + ); + + var topics = pagination.getPage( + Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()).block(); + assertThat(topics.getTotalPages()).isEqualTo(4); + assertThat(topics.getTopics()).hasSize(25); + assertThat(topics.getTopics()).isSorted(); + } + + @Test + public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { + init( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .map(name -> new TopicDescription(name, false, List.of())) + .collect(Collectors.toList()) + ); + + var topics = pagination.getPage(Optional.of(4), Optional.of(33), + Optional.empty(), Optional.empty(), Optional.empty()).block(); + assertThat(topics.getTotalPages()).isEqualTo(4); + assertThat(topics.getTopics()).hasSize(1) + .first().isEqualTo("99"); + } + + @Test + public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() { + init( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .map(name -> new TopicDescription(name, false, List.of())) + .collect(Collectors.toList()) + ); + + var topics = pagination.getPage(Optional.of(0), Optional.of(-1), + Optional.empty(), Optional.empty(), Optional.empty()).block(); + assertThat(topics.getTotalPages()).isEqualTo(4); + assertThat(topics.getTopics()).hasSize(25); + assertThat(topics.getTopics()).isSorted(); + } + + @Test + public void shouldListBotInternalAndNonInternalTopics() { + init( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of())) + .collect(Collectors.toList()) + ); + + var topics = pagination.getPage( + Optional.empty(), Optional.empty(), Optional.of(true), + Optional.empty(), Optional.empty()).block(); + assertThat(topics.getTotalPages()).isEqualTo(4); + assertThat(topics.getTopics()).hasSize(25); + assertThat(topics.getTopics()).isSorted(); + } + + + @Test + public void shouldListOnlyNonInternalTopics() { + init( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .map(name -> new TopicDescription(name, false, List.of())) + .collect(Collectors.toList()) + ); + + var topics = pagination.getPage( + Optional.empty(), Optional.empty(), Optional.of(true), + Optional.empty(), Optional.empty()).block(); + assertThat(topics.getTotalPages()).isEqualTo(4); + assertThat(topics.getTopics()).hasSize(25); + assertThat(topics.getTopics()).isSorted(); + } + + + @Test + public void shouldListOnlyTopicsContainingOne() { + init( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .map(name -> new TopicDescription(name, false, List.of())) + .collect(Collectors.toList()) + ); + + var topics = pagination.getPage( + Optional.empty(), Optional.empty(), Optional.empty(), + Optional.of("1"), Optional.empty()).block(); + assertThat(topics.getTotalPages()).isEqualTo(1); + assertThat(topics.getTopics()).hasSize(20); + assertThat(topics.getTopics()).isSorted(); + } + + @Test + public void shouldListTopicsOrderedByPartitionsCount() { + List topicDescriptions = IntStream.rangeClosed(1, 100).boxed() + .map(i -> new TopicDescription(UUID.randomUUID().toString(), false, + IntStream.range(0, i) + .mapToObj(p -> + new TopicPartitionInfo(p, null, List.of(), List.of())) + .collect(Collectors.toList()))) + .collect(Collectors.toList()); + + init(topicDescriptions); + + var topics = pagination.getPage( + Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.of(TopicColumnsToSortDTO.TOTAL_PARTITIONS)).block(); + assertThat(topics.getTotalPages()).isEqualTo(4); + assertThat(topics.getTopics()).hasSize(25); + assertThat(topics.getTopics()).containsExactlyElementsOf( + topicDescriptions.stream() + .map(TopicDescription::name) + .limit(25) + .collect(Collectors.toList())); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServiceTest.java deleted file mode 100644 index e32998d6cd..0000000000 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServiceTest.java +++ /dev/null @@ -1,225 +0,0 @@ -package com.provectus.kafka.ui.service; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.provectus.kafka.ui.mapper.ClusterMapper; -import com.provectus.kafka.ui.model.InternalClusterMetrics; -import com.provectus.kafka.ui.model.InternalTopic; -import com.provectus.kafka.ui.model.InternalTopicConfig; -import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.TopicColumnsToSortDTO; -import com.provectus.kafka.ui.model.TopicDTO; -import com.provectus.kafka.ui.serde.DeserializationService; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mapstruct.factory.Mappers; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Spy; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class TopicsServiceTest { - @Spy - private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class); - @InjectMocks - private TopicsService topicsService; - @Mock - private AdminClientService adminClientService; - @Mock - private ConsumerGroupService consumerGroupService; - @Mock - private ClustersStorage clustersStorage; - - @Mock - private DeserializationService deserializationService; - - @Test - public void shouldListFirst25Topics() { - final KafkaCluster cluster = clusterWithTopics( - IntStream.rangeClosed(1, 100).boxed() - .map(Objects::toString) - .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() - .partitions(Map.of()) - .name(e) - .build())) - ); - - var topics = topicsService.getTopics(cluster, - Optional.empty(), Optional.empty(), Optional.empty(), - Optional.empty(), Optional.empty()); - assertThat(topics.getPageCount()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted(); - } - - @Test - public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { - var cluster = clusterWithTopics( - IntStream.rangeClosed(1, 100).boxed() - .map(Objects::toString) - .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() - .partitions(Map.of()) - .name(e) - .build())) - ); - - var topics = topicsService.getTopics(cluster, Optional.of(4), Optional.of(33), - Optional.empty(), Optional.empty(), Optional.empty()); - assertThat(topics.getPageCount()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(1) - .first().extracting(TopicDTO::getName).isEqualTo("99"); - } - - @Test - public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() { - var cluster = clusterWithTopics( - IntStream.rangeClosed(1, 100).boxed() - .map(Objects::toString) - .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() - .partitions(Map.of()) - .name(e) - .build())) - ); - - var topics = topicsService.getTopics(cluster, Optional.of(0), Optional.of(-1), - Optional.empty(), Optional.empty(), Optional.empty()); - assertThat(topics.getPageCount()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted(); - } - - @Test - public void shouldListBotInternalAndNonInternalTopics() { - var cluster = clusterWithTopics( - IntStream.rangeClosed(1, 100).boxed() - .map(Objects::toString) - .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() - .partitions(Map.of()) - .name(e) - .internal(Integer.parseInt(e) % 10 == 0) - .build())) - ); - - var topics = topicsService.getTopics(cluster, - Optional.empty(), Optional.empty(), Optional.of(true), - Optional.empty(), Optional.empty()); - assertThat(topics.getPageCount()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted(); - } - - - @Test - public void shouldListOnlyNonInternalTopics() { - var cluster = clusterWithTopics( - IntStream.rangeClosed(1, 100).boxed() - .map(Objects::toString) - .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() - .partitions(Map.of()) - .name(e) - .internal(Integer.parseInt(e) % 10 == 0) - .build())) - ); - - var topics = topicsService.getTopics(cluster, - Optional.empty(), Optional.empty(), Optional.of(true), - Optional.empty(), Optional.empty()); - assertThat(topics.getPageCount()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted(); - } - - - @Test - public void shouldListOnlyTopicsContainingOne() { - var cluster = clusterWithTopics( - IntStream.rangeClosed(1, 100).boxed() - .map(Objects::toString) - .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() - .partitions(Map.of()) - .name(e) - .build())) - ); - - var topics = topicsService.getTopics(cluster, - Optional.empty(), Optional.empty(), Optional.empty(), - Optional.of("1"), Optional.empty()); - assertThat(topics.getPageCount()).isEqualTo(1); - assertThat(topics.getTopics()).hasSize(20); - assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted(); - } - - @Test - public void shouldListTopicsOrderedByPartitionsCount() { - var cluster = clusterWithTopics( - IntStream.rangeClosed(1, 100).boxed() - .map(Objects::toString) - .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() - .partitions(Map.of()) - .name(e) - .partitionCount(100 - Integer.parseInt(e)) - .build())) - ); - - var topics = topicsService.getTopics(cluster, - Optional.empty(), Optional.empty(), Optional.empty(), - Optional.empty(), Optional.of(TopicColumnsToSortDTO.TOTAL_PARTITIONS)); - assertThat(topics.getPageCount()).isEqualTo(4); - assertThat(topics.getTopics()).hasSize(25); - assertThat(topics.getTopics()).map(TopicDTO::getPartitionCount).isSorted(); - } - - @Test - public void shouldRetrieveTopicConfigs() { - var cluster = clusterWithTopics( - IntStream.rangeClosed(1, 100).boxed() - .map(Objects::toString) - .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() - .name(e) - .topicConfigs( - List.of(InternalTopicConfig.builder() - .name("testName") - .value("testValue") - .defaultValue("testDefaultValue") - .source(ConfigEntry.ConfigSource.DEFAULT_CONFIG) - .isReadOnly(true) - .isSensitive(true) - .synonyms(List.of()) - .build() - ) - ) - .build())) - ); - - var topicConfigs = topicsService.getTopicConfigs(cluster, "1"); - assertThat(topicConfigs).hasSize(1); - - var topicConfig = topicConfigs.get(0); - assertThat(topicConfig.getName()).isEqualTo("testName"); - assertThat(topicConfig.getValue()).isEqualTo("testValue"); - assertThat(topicConfig.getDefaultValue()).isEqualTo("testDefaultValue"); - assertThat(topicConfig.getSource().getValue()) - .isEqualTo(ConfigEntry.ConfigSource.DEFAULT_CONFIG.name()); - assertThat(topicConfig.getSynonyms()).isNotNull(); - assertThat(topicConfig.getIsReadOnly()).isTrue(); - assertThat(topicConfig.getIsSensitive()).isTrue(); - } - - private KafkaCluster clusterWithTopics(Map topics) { - return KafkaCluster.builder() - .metrics(InternalClusterMetrics.builder() - .topics(topics) - .build()) - .build(); - } - -}