123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- package com.provectus.kafka.ui.service;
- import static org.assertj.core.api.Assertions.assertThat;
- import static org.mockito.ArgumentMatchers.any;
- import static org.mockito.Mockito.when;
- import com.provectus.kafka.ui.mapper.ClusterMapper;
- import com.provectus.kafka.ui.model.InternalTopic;
- import com.provectus.kafka.ui.model.InternalTopicConfig;
- import com.provectus.kafka.ui.model.KafkaCluster;
- import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
- import com.provectus.kafka.ui.model.TopicDTO;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.Optional;
- import java.util.UUID;
- import java.util.function.Function;
- import java.util.stream.Collectors;
- import java.util.stream.IntStream;
- import org.apache.kafka.clients.admin.ConfigEntry;
- import org.junit.jupiter.api.Test;
- import org.junit.jupiter.api.extension.ExtendWith;
- import org.mapstruct.factory.Mappers;
- import org.mockito.InjectMocks;
- import org.mockito.Mock;
- import org.mockito.Spy;
- import org.mockito.junit.jupiter.MockitoExtension;
- @ExtendWith(MockitoExtension.class)
- class ClusterServiceTest {
- @Spy
- private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class);
- @InjectMocks
- private ClusterService clusterService;
- @Mock
- private ClustersStorage clustersStorage;
- @Mock
- private KafkaService kafkaService;
- @Test
- public void shouldListFirst25Topics() {
- var topicName = UUID.randomUUID().toString();
- final KafkaCluster cluster = KafkaCluster.builder()
- .topics(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
- .partitions(Map.of())
- .name(e)
- .build()))
- )
- .build();
- when(clustersStorage.getClusterByName(topicName))
- .thenReturn(Optional.of(cluster));
- when(
- kafkaService.getTopicPartitions(any(), any())
- ).thenReturn(
- Map.of()
- );
- var topics = clusterService.getTopics(topicName,
- Optional.empty(), Optional.empty(), Optional.empty(),
- Optional.empty(), Optional.empty());
- assertThat(topics.getPageCount()).isEqualTo(4);
- assertThat(topics.getTopics()).hasSize(25);
- assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
- }
- @Test
- public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
- var topicName = UUID.randomUUID().toString();
- when(clustersStorage.getClusterByName(topicName))
- .thenReturn(Optional.of(KafkaCluster.builder()
- .topics(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
- .partitions(Map.of())
- .name(e)
- .build()))
- )
- .build()));
- when(
- kafkaService.getTopicPartitions(any(), any())
- ).thenReturn(
- Map.of()
- );
- var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33),
- Optional.empty(), Optional.empty(), Optional.empty());
- assertThat(topics.getPageCount()).isEqualTo(4);
- assertThat(topics.getTopics()).hasSize(1)
- .first().extracting(TopicDTO::getName).isEqualTo("99");
- }
- @Test
- public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
- var topicName = UUID.randomUUID().toString();
- when(clustersStorage.getClusterByName(topicName))
- .thenReturn(Optional.of(KafkaCluster.builder()
- .topics(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
- .partitions(Map.of())
- .name(e)
- .build()))
- )
- .build()));
- when(
- kafkaService.getTopicPartitions(any(), any())
- ).thenReturn(
- Map.of()
- );
- var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1),
- Optional.empty(), Optional.empty(), Optional.empty());
- assertThat(topics.getPageCount()).isEqualTo(4);
- assertThat(topics.getTopics()).hasSize(25);
- assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
- }
- @Test
- public void shouldListBotInternalAndNonInternalTopics() {
- var topicName = UUID.randomUUID().toString();
- when(clustersStorage.getClusterByName(topicName))
- .thenReturn(Optional.of(KafkaCluster.builder()
- .topics(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
- .partitions(Map.of())
- .name(e)
- .internal(Integer.parseInt(e) % 10 == 0)
- .build()))
- )
- .build()));
- when(
- kafkaService.getTopicPartitions(any(), any())
- ).thenReturn(
- Map.of()
- );
- var topics = clusterService.getTopics(topicName,
- Optional.empty(), Optional.empty(), Optional.of(true),
- Optional.empty(), Optional.empty());
- assertThat(topics.getPageCount()).isEqualTo(4);
- assertThat(topics.getTopics()).hasSize(25);
- assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
- }
- @Test
- public void shouldListOnlyNonInternalTopics() {
- var topicName = UUID.randomUUID().toString();
- when(clustersStorage.getClusterByName(topicName))
- .thenReturn(Optional.of(KafkaCluster.builder()
- .topics(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
- .partitions(Map.of())
- .name(e)
- .internal(Integer.parseInt(e) % 10 == 0)
- .build()))
- )
- .build()));
- when(
- kafkaService.getTopicPartitions(any(), any())
- ).thenReturn(
- Map.of()
- );
- var topics = clusterService.getTopics(topicName,
- Optional.empty(), Optional.empty(), Optional.of(true),
- Optional.empty(), Optional.empty());
- assertThat(topics.getPageCount()).isEqualTo(4);
- assertThat(topics.getTopics()).hasSize(25);
- assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
- }
- @Test
- public void shouldListOnlyTopicsContainingOne() {
- var topicName = UUID.randomUUID().toString();
- when(clustersStorage.getClusterByName(topicName))
- .thenReturn(Optional.of(KafkaCluster.builder()
- .topics(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
- .partitions(Map.of())
- .name(e)
- .build()))
- )
- .build()));
- when(
- kafkaService.getTopicPartitions(any(), any())
- ).thenReturn(
- Map.of()
- );
- var topics = clusterService.getTopics(topicName,
- Optional.empty(), Optional.empty(), Optional.empty(),
- Optional.of("1"), Optional.empty());
- assertThat(topics.getPageCount()).isEqualTo(1);
- assertThat(topics.getTopics()).hasSize(20);
- assertThat(topics.getTopics()).map(TopicDTO::getName).isSorted();
- }
- @Test
- public void shouldListTopicsOrderedByPartitionsCount() {
- var topicName = UUID.randomUUID().toString();
- when(clustersStorage.getClusterByName(topicName))
- .thenReturn(Optional.of(KafkaCluster.builder()
- .topics(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
- .partitions(Map.of())
- .name(e)
- .partitionCount(100 - Integer.parseInt(e))
- .build()))
- )
- .build()));
- when(
- kafkaService.getTopicPartitions(any(), any())
- ).thenReturn(
- Map.of()
- );
- var topics = clusterService.getTopics(topicName,
- Optional.empty(), Optional.empty(), Optional.empty(),
- Optional.empty(), Optional.of(TopicColumnsToSortDTO.TOTAL_PARTITIONS));
- assertThat(topics.getPageCount()).isEqualTo(4);
- assertThat(topics.getTopics()).hasSize(25);
- assertThat(topics.getTopics()).map(TopicDTO::getPartitionCount).isSorted();
- }
- @Test
- public void shouldRetrieveTopicConfigs() {
- var topicName = UUID.randomUUID().toString();
- when(clustersStorage.getClusterByName(topicName))
- .thenReturn(Optional.of(KafkaCluster.builder()
- .topics(
- IntStream.rangeClosed(1, 100).boxed()
- .map(Objects::toString)
- .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
- .name(e)
- .topicConfigs(
- List.of(InternalTopicConfig.builder()
- .name("testName")
- .value("testValue")
- .defaultValue("testDefaultValue")
- .source(ConfigEntry.ConfigSource.DEFAULT_CONFIG)
- .isReadOnly(true)
- .isSensitive(true)
- .synonyms(List.of())
- .build()
- )
- )
- .build()))
- )
- .build()));
- var configs = clusterService.getTopicConfigs(topicName, "1");
- var topicConfig = configs.isPresent() ? configs.get().get(0) : null;
- assertThat(configs.isPresent()).isTrue();
- assertThat(topicConfig.getName()).isEqualTo("testName");
- assertThat(topicConfig.getValue()).isEqualTo("testValue");
- assertThat(topicConfig.getDefaultValue()).isEqualTo("testDefaultValue");
- assertThat(topicConfig.getSource().getValue())
- .isEqualTo(ConfigEntry.ConfigSource.DEFAULT_CONFIG.name());
- assertThat(topicConfig.getSynonyms()).isNotNull();
- assertThat(topicConfig.getIsReadOnly()).isTrue();
- assertThat(topicConfig.getIsSensitive()).isTrue();
- }
- }
|