123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- package com.provectus.kafka.ui.service;
- import static org.assertj.core.api.Assertions.assertThat;
- import static org.mockito.ArgumentMatchers.anyList;
- import static org.mockito.ArgumentMatchers.isA;
- import static org.mockito.Mockito.mock;
- import static org.mockito.Mockito.when;
- import com.provectus.kafka.ui.controller.TopicsController;
- import com.provectus.kafka.ui.mapper.ClusterMapper;
- import com.provectus.kafka.ui.mapper.ClusterMapperImpl;
- import com.provectus.kafka.ui.model.InternalLogDirStats;
- import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
- import com.provectus.kafka.ui.model.InternalTopic;
- import com.provectus.kafka.ui.model.KafkaCluster;
- import com.provectus.kafka.ui.model.Metrics;
- import com.provectus.kafka.ui.model.SortOrderDTO;
- import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
- import com.provectus.kafka.ui.model.TopicDTO;
- import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
- import com.provectus.kafka.ui.service.audit.AuditService;
- import com.provectus.kafka.ui.service.rbac.AccessControlService;
- import com.provectus.kafka.ui.util.AccessControlServiceMock;
- import java.util.ArrayList;
- import java.util.Comparator;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.Optional;
- import java.util.UUID;
- import java.util.function.Function;
- import java.util.stream.Collectors;
- import java.util.stream.IntStream;
- import org.apache.kafka.clients.admin.TopicDescription;
- import org.apache.kafka.common.TopicPartitionInfo;
- import org.junit.jupiter.api.Test;
- import reactor.core.publisher.Mono;
- class TopicsServicePaginationTest {
- private static final String LOCAL_KAFKA_CLUSTER_NAME = "local";
- private final TopicsService topicsService = mock(TopicsService.class);
- private final ClustersStorage clustersStorage = mock(ClustersStorage.class);
- private final ClusterMapper clusterMapper = new ClusterMapperImpl();
- private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
- private final TopicsController topicsController = new TopicsController(
- topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService, mock(AuditService.class));
- private void init(Map<String, InternalTopic> topicsInCache) {
- when(clustersStorage.getClusterByName(isA(String.class)))
- .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
- when(topicsService.getTopicsForPagination(isA(KafkaCluster.class)))
- .thenReturn(Mono.just(new ArrayList<>(topicsInCache.values())));
- when(topicsService.loadTopics(isA(KafkaCluster.class), anyList()))
- .thenAnswer(a -> {
- List<String> lst = a.getArgument(1);
- return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
- });
- topicsController.setClustersStorage(clustersStorage);
- }
- @Test
- public void shouldListFirst25Topics() {
- init(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .map(name -> new TopicDescription(name, false, List.of()))
- .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- Metrics.empty(), InternalLogDirStats.empty(), "_"))
- .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
- );
- var topics = topicsController
- .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null,
- null, null, null).block();
- assertThat(topics.getBody().getPageCount()).isEqualTo(4);
- assertThat(topics.getBody().getTopics()).hasSize(25);
- assertThat(topics.getBody().getTopics())
- .isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
- }
- private KafkaCluster buildKafkaCluster(String clusterName) {
- return KafkaCluster.builder()
- .name(clusterName)
- .build();
- }
- @Test
- public void shouldListFirst25TopicsSortedByNameDescendingOrder() {
- var internalTopics = IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .map(name -> new TopicDescription(name, false, List.of()))
- .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- Metrics.empty(), InternalLogDirStats.empty(), "_"))
- .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
- init(internalTopics);
- var topics = topicsController
- .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null,
- TopicColumnsToSortDTO.NAME, SortOrderDTO.DESC, null).block();
- assertThat(topics.getBody().getPageCount()).isEqualTo(4);
- assertThat(topics.getBody().getTopics()).hasSize(25);
- assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName).reversed());
- assertThat(topics.getBody().getTopics()).containsExactlyElementsOf(
- internalTopics.values().stream()
- .map(clusterMapper::toTopic)
- .sorted(Comparator.comparing(TopicDTO::getName).reversed())
- .limit(25)
- .collect(Collectors.toList())
- );
- }
- @Test
- public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
- init(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .map(name -> new TopicDescription(name, false, List.of()))
- .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- Metrics.empty(), InternalLogDirStats.empty(), "_"))
- .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
- );
- var topics = topicsController
- .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, 33, null, null, null, null, null).block();
- assertThat(topics.getBody().getPageCount()).isEqualTo(4);
- assertThat(topics.getBody().getTopics()).hasSize(1);
- assertThat(topics.getBody().getTopics().get(0).getName()).isEqualTo("99");
- }
- @Test
- public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
- init(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .map(name -> new TopicDescription(name, false, List.of()))
- .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- Metrics.empty(), InternalLogDirStats.empty(), "_"))
- .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
- );
- var topics = topicsController
- .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, null, null, null, null, null).block();
- assertThat(topics.getBody().getPageCount()).isEqualTo(4);
- assertThat(topics.getBody().getTopics()).hasSize(25);
- assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
- }
- @Test
- public void shouldListBotInternalAndNonInternalTopics() {
- init(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
- .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- Metrics.empty(), InternalLogDirStats.empty(), "_"))
- .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
- );
- var topics = topicsController
- .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, true, null,
- null, null, null).block();
- assertThat(topics.getBody().getPageCount()).isEqualTo(4);
- assertThat(topics.getBody().getTopics()).hasSize(25);
- assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
- }
- @Test
- public void shouldListOnlyNonInternalTopics() {
- init(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of()))
- .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- Metrics.empty(), InternalLogDirStats.empty(), "_"))
- .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
- );
- var topics = topicsController
- .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, -1, false, null,
- null, null, null).block();
- assertThat(topics.getBody().getPageCount()).isEqualTo(4);
- assertThat(topics.getBody().getTopics()).hasSize(5);
- assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
- }
- @Test
- public void shouldListOnlyTopicsContainingOne() {
- init(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .map(name -> new TopicDescription(name, false, List.of()))
- .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
- Metrics.empty(), InternalLogDirStats.empty(), "_"))
- .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
- );
- var topics = topicsController
- .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, "1",
- null, null, null).block();
- assertThat(topics.getBody().getPageCount()).isEqualTo(1);
- assertThat(topics.getBody().getTopics()).hasSize(20);
- assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
- }
- @Test
- public void shouldListTopicsOrderedByPartitionsCount() {
- Map<String, InternalTopic> internalTopics = IntStream.rangeClosed(1, 100).boxed()
- .map(i -> new TopicDescription(UUID.randomUUID().toString(), false,
- IntStream.range(0, i)
- .mapToObj(p ->
- new TopicPartitionInfo(p, null, List.of(), List.of()))
- .collect(Collectors.toList())))
- .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(),
- Metrics.empty(), InternalLogDirStats.empty(), "_"))
- .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
- init(internalTopics);
- var topicsSortedAsc = topicsController
- .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
- null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, null, null).block();
- assertThat(topicsSortedAsc.getBody().getPageCount()).isEqualTo(4);
- assertThat(topicsSortedAsc.getBody().getTopics()).hasSize(25);
- assertThat(topicsSortedAsc.getBody().getTopics()).containsExactlyElementsOf(
- internalTopics.values().stream()
- .map(clusterMapper::toTopic)
- .sorted(Comparator.comparing(TopicDTO::getPartitionCount))
- .limit(25)
- .collect(Collectors.toList())
- );
- var topicsSortedDesc = topicsController
- .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
- null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, SortOrderDTO.DESC, null).block();
- assertThat(topicsSortedDesc.getBody().getPageCount()).isEqualTo(4);
- assertThat(topicsSortedDesc.getBody().getTopics()).hasSize(25);
- assertThat(topicsSortedDesc.getBody().getTopics()).containsExactlyElementsOf(
- internalTopics.values().stream()
- .map(clusterMapper::toTopic)
- .sorted(Comparator.comparing(TopicDTO::getPartitionCount).reversed())
- .limit(25)
- .collect(Collectors.toList())
- );
- }
- }
|