Provide topic offsets in list (#647)
This commit is contained in:
parent
df1d3bbfc7
commit
71e1370deb
2 changed files with 71 additions and 12 deletions
|
@ -108,7 +108,7 @@ public class ClusterService {
|
||||||
var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
|
var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
|
||||||
var cluster = clustersStorage.getClusterByName(name)
|
var cluster = clustersStorage.getClusterByName(name)
|
||||||
.orElseThrow(ClusterNotFoundException::new);
|
.orElseThrow(ClusterNotFoundException::new);
|
||||||
List<Topic> topics = cluster.getTopics().values().stream()
|
List<InternalTopic> topics = cluster.getTopics().values().stream()
|
||||||
.filter(topic -> !topic.isInternal()
|
.filter(topic -> !topic.isInternal()
|
||||||
|| showInternal
|
|| showInternal
|
||||||
.map(i -> topic.isInternal() == i)
|
.map(i -> topic.isInternal() == i)
|
||||||
|
@ -118,7 +118,6 @@ public class ClusterService {
|
||||||
.map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
|
.map(s -> StringUtils.containsIgnoreCase(topic.getName(), s))
|
||||||
.orElse(true))
|
.orElse(true))
|
||||||
.sorted(getComparatorForTopic(sortBy))
|
.sorted(getComparatorForTopic(sortBy))
|
||||||
.map(clusterMapper::toTopic)
|
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
var totalPages = (topics.size() / perPage)
|
var totalPages = (topics.size() / perPage)
|
||||||
+ (topics.size() % perPage == 0 ? 0 : 1);
|
+ (topics.size() % perPage == 0 ? 0 : 1);
|
||||||
|
@ -128,6 +127,13 @@ public class ClusterService {
|
||||||
topics.stream()
|
topics.stream()
|
||||||
.skip(topicsToSkip)
|
.skip(topicsToSkip)
|
||||||
.limit(perPage)
|
.limit(perPage)
|
||||||
|
.map(t ->
|
||||||
|
clusterMapper.toTopic(
|
||||||
|
t.toBuilder().partitions(
|
||||||
|
kafkaService.getTopicPartitions(cluster, t)
|
||||||
|
).build()
|
||||||
|
)
|
||||||
|
)
|
||||||
.collect(Collectors.toList())
|
.collect(Collectors.toList())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package com.provectus.kafka.ui.service;
|
package com.provectus.kafka.ui.service;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.mapper.ClusterMapper;
|
import com.provectus.kafka.ui.mapper.ClusterMapper;
|
||||||
|
@ -31,13 +32,14 @@ class ClusterServiceTest {
|
||||||
private ClusterService clusterService;
|
private ClusterService clusterService;
|
||||||
@Mock
|
@Mock
|
||||||
private ClustersStorage clustersStorage;
|
private ClustersStorage clustersStorage;
|
||||||
|
@Mock
|
||||||
|
private KafkaService kafkaService;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldListFirst25Topics() {
|
public void shouldListFirst25Topics() {
|
||||||
var topicName = UUID.randomUUID().toString();
|
var topicName = UUID.randomUUID().toString();
|
||||||
|
|
||||||
when(clustersStorage.getClusterByName(topicName))
|
final KafkaCluster cluster = KafkaCluster.builder()
|
||||||
.thenReturn(Optional.of(KafkaCluster.builder()
|
|
||||||
.topics(
|
.topics(
|
||||||
IntStream.rangeClosed(1, 100).boxed()
|
IntStream.rangeClosed(1, 100).boxed()
|
||||||
.map(Objects::toString)
|
.map(Objects::toString)
|
||||||
|
@ -46,7 +48,16 @@ class ClusterServiceTest {
|
||||||
.name(e)
|
.name(e)
|
||||||
.build()))
|
.build()))
|
||||||
)
|
)
|
||||||
.build()));
|
.build();
|
||||||
|
|
||||||
|
when(clustersStorage.getClusterByName(topicName))
|
||||||
|
.thenReturn(Optional.of(cluster));
|
||||||
|
|
||||||
|
when(
|
||||||
|
kafkaService.getTopicPartitions(any(), any())
|
||||||
|
).thenReturn(
|
||||||
|
Map.of()
|
||||||
|
);
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName,
|
var topics = clusterService.getTopics(topicName,
|
||||||
Optional.empty(), Optional.empty(), Optional.empty(),
|
Optional.empty(), Optional.empty(), Optional.empty(),
|
||||||
|
@ -72,6 +83,13 @@ class ClusterServiceTest {
|
||||||
)
|
)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
|
when(
|
||||||
|
kafkaService.getTopicPartitions(any(), any())
|
||||||
|
).thenReturn(
|
||||||
|
Map.of()
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33),
|
var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33),
|
||||||
Optional.empty(), Optional.empty(), Optional.empty());
|
Optional.empty(), Optional.empty(), Optional.empty());
|
||||||
assertThat(topics.getPageCount()).isEqualTo(4);
|
assertThat(topics.getPageCount()).isEqualTo(4);
|
||||||
|
@ -95,6 +113,13 @@ class ClusterServiceTest {
|
||||||
)
|
)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
|
when(
|
||||||
|
kafkaService.getTopicPartitions(any(), any())
|
||||||
|
).thenReturn(
|
||||||
|
Map.of()
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1),
|
var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1),
|
||||||
Optional.empty(), Optional.empty(), Optional.empty());
|
Optional.empty(), Optional.empty(), Optional.empty());
|
||||||
assertThat(topics.getPageCount()).isEqualTo(4);
|
assertThat(topics.getPageCount()).isEqualTo(4);
|
||||||
|
@ -119,6 +144,13 @@ class ClusterServiceTest {
|
||||||
)
|
)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
|
when(
|
||||||
|
kafkaService.getTopicPartitions(any(), any())
|
||||||
|
).thenReturn(
|
||||||
|
Map.of()
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName,
|
var topics = clusterService.getTopics(topicName,
|
||||||
Optional.empty(), Optional.empty(), Optional.of(true),
|
Optional.empty(), Optional.empty(), Optional.of(true),
|
||||||
Optional.empty(), Optional.empty());
|
Optional.empty(), Optional.empty());
|
||||||
|
@ -145,6 +177,13 @@ class ClusterServiceTest {
|
||||||
)
|
)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
|
when(
|
||||||
|
kafkaService.getTopicPartitions(any(), any())
|
||||||
|
).thenReturn(
|
||||||
|
Map.of()
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName,
|
var topics = clusterService.getTopics(topicName,
|
||||||
Optional.empty(), Optional.empty(), Optional.of(true),
|
Optional.empty(), Optional.empty(), Optional.of(true),
|
||||||
Optional.empty(), Optional.empty());
|
Optional.empty(), Optional.empty());
|
||||||
|
@ -170,6 +209,13 @@ class ClusterServiceTest {
|
||||||
)
|
)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
|
when(
|
||||||
|
kafkaService.getTopicPartitions(any(), any())
|
||||||
|
).thenReturn(
|
||||||
|
Map.of()
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName,
|
var topics = clusterService.getTopics(topicName,
|
||||||
Optional.empty(), Optional.empty(), Optional.empty(),
|
Optional.empty(), Optional.empty(), Optional.empty(),
|
||||||
Optional.of("1"), Optional.empty());
|
Optional.of("1"), Optional.empty());
|
||||||
|
@ -195,6 +241,13 @@ class ClusterServiceTest {
|
||||||
)
|
)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
|
when(
|
||||||
|
kafkaService.getTopicPartitions(any(), any())
|
||||||
|
).thenReturn(
|
||||||
|
Map.of()
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
var topics = clusterService.getTopics(topicName,
|
var topics = clusterService.getTopics(topicName,
|
||||||
Optional.empty(), Optional.empty(), Optional.empty(),
|
Optional.empty(), Optional.empty(), Optional.empty(),
|
||||||
Optional.empty(), Optional.of(TopicColumnsToSort.TOTAL_PARTITIONS));
|
Optional.empty(), Optional.of(TopicColumnsToSort.TOTAL_PARTITIONS));
|
||||||
|
|
Loading…
Add table
Reference in a new issue