From 71e1370debf88ab82735871ef76c36495472f4e8 Mon Sep 17 00:00:00 2001 From: German Osin Date: Wed, 7 Jul 2021 22:31:45 +0300 Subject: [PATCH] Provide topic offsets in list (#647) --- .../kafka/ui/service/ClusterService.java | 10 ++- .../kafka/ui/service/ClusterServiceTest.java | 73 ++++++++++++++++--- 2 files changed, 71 insertions(+), 12 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 11217e0a91..3ec3e6bc50 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -108,7 +108,7 @@ public class ClusterService { var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage; var cluster = clustersStorage.getClusterByName(name) .orElseThrow(ClusterNotFoundException::new); - List topics = cluster.getTopics().values().stream() + List topics = cluster.getTopics().values().stream() .filter(topic -> !topic.isInternal() || showInternal .map(i -> topic.isInternal() == i) @@ -118,7 +118,6 @@ public class ClusterService { .map(s -> StringUtils.containsIgnoreCase(topic.getName(), s)) .orElse(true)) .sorted(getComparatorForTopic(sortBy)) - .map(clusterMapper::toTopic) .collect(Collectors.toList()); var totalPages = (topics.size() / perPage) + (topics.size() % perPage == 0 ? 0 : 1); @@ -128,6 +127,13 @@ public class ClusterService { topics.stream() .skip(topicsToSkip) .limit(perPage) + .map(t -> + clusterMapper.toTopic( + t.toBuilder().partitions( + kafkaService.getTopicPartitions(cluster, t) + ).build() + ) + ) .collect(Collectors.toList()) ); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java index 5c8d459992..a7c8ac237f 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; import com.provectus.kafka.ui.mapper.ClusterMapper; @@ -31,22 +32,32 @@ class ClusterServiceTest { private ClusterService clusterService; @Mock private ClustersStorage clustersStorage; + @Mock + private KafkaService kafkaService; @Test public void shouldListFirst25Topics() { var topicName = UUID.randomUUID().toString(); + final KafkaCluster cluster = KafkaCluster.builder() + .topics( + IntStream.rangeClosed(1, 100).boxed() + .map(Objects::toString) + .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() + .partitions(Map.of()) + .name(e) + .build())) + ) + .build(); + when(clustersStorage.getClusterByName(topicName)) - .thenReturn(Optional.of(KafkaCluster.builder() - .topics( - IntStream.rangeClosed(1, 100).boxed() - .map(Objects::toString) - .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder() - .partitions(Map.of()) - .name(e) - .build())) - ) - .build())); + .thenReturn(Optional.of(cluster)); + + when( + kafkaService.getTopicPartitions(any(), any()) + ).thenReturn( + Map.of() + ); var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty(), Optional.empty(), @@ -72,6 +83,13 @@ class ClusterServiceTest { ) .build())); + when( + kafkaService.getTopicPartitions(any(), any()) + ).thenReturn( + Map.of() + ); + + var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33), Optional.empty(), Optional.empty(), Optional.empty()); assertThat(topics.getPageCount()).isEqualTo(4); @@ -95,6 +113,13 @@ class ClusterServiceTest { ) .build())); + when( + kafkaService.getTopicPartitions(any(), any()) + ).thenReturn( + Map.of() + ); + + var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1), Optional.empty(), Optional.empty(), Optional.empty()); assertThat(topics.getPageCount()).isEqualTo(4); @@ -119,6 +144,13 @@ class ClusterServiceTest { ) .build())); + when( + kafkaService.getTopicPartitions(any(), any()) + ).thenReturn( + Map.of() + ); + + var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty(), Optional.of(true), Optional.empty(), Optional.empty()); @@ -145,6 +177,13 @@ class ClusterServiceTest { ) .build())); + when( + kafkaService.getTopicPartitions(any(), any()) + ).thenReturn( + Map.of() + ); + + var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty(), Optional.of(true), Optional.empty(), Optional.empty()); @@ -170,6 +209,13 @@ class ClusterServiceTest { ) .build())); + when( + kafkaService.getTopicPartitions(any(), any()) + ).thenReturn( + Map.of() + ); + + var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty(), Optional.empty(), Optional.of("1"), Optional.empty()); @@ -195,6 +241,13 @@ class ClusterServiceTest { ) .build())); + when( + kafkaService.getTopicPartitions(any(), any()) + ).thenReturn( + Map.of() + ); + + var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.of(TopicColumnsToSort.TOTAL_PARTITIONS));