TopicsServicePaginationTest.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package com.provectus.kafka.ui.service;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import static org.mockito.ArgumentMatchers.anyList;
  4. import static org.mockito.ArgumentMatchers.isA;
  5. import static org.mockito.Mockito.mock;
  6. import static org.mockito.Mockito.when;
  7. import com.provectus.kafka.ui.controller.TopicsController;
  8. import com.provectus.kafka.ui.mapper.ClusterMapper;
  9. import com.provectus.kafka.ui.mapper.ClusterMapperImpl;
  10. import com.provectus.kafka.ui.model.InternalLogDirStats;
  11. import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
  12. import com.provectus.kafka.ui.model.InternalTopic;
  13. import com.provectus.kafka.ui.model.KafkaCluster;
  14. import com.provectus.kafka.ui.model.Metrics;
  15. import com.provectus.kafka.ui.model.SortOrderDTO;
  16. import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
  17. import com.provectus.kafka.ui.model.TopicDTO;
  18. import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
  19. import com.provectus.kafka.ui.service.audit.AuditService;
  20. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  21. import com.provectus.kafka.ui.util.AccessControlServiceMock;
  22. import java.util.ArrayList;
  23. import java.util.Comparator;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.Objects;
  27. import java.util.Optional;
  28. import java.util.UUID;
  29. import java.util.function.Function;
  30. import java.util.stream.Collectors;
  31. import java.util.stream.IntStream;
  32. import org.apache.kafka.clients.admin.TopicDescription;
  33. import org.apache.kafka.common.TopicPartitionInfo;
  34. import org.junit.jupiter.api.Test;
  35. import reactor.core.publisher.Mono;
  36. class TopicsServicePaginationTest {
  37. private static final String LOCAL_KAFKA_CLUSTER_NAME = "local";
  38. private final TopicsService topicsService = mock(TopicsService.class);
  39. private final ClustersStorage clustersStorage = mock(ClustersStorage.class);
  40. private final ClusterMapper clusterMapper = new ClusterMapperImpl();
  41. private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
  42. private final TopicsController topicsController = new TopicsController(
  43. topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService, mock(AuditService.class));
  44. private void init(Map<String, InternalTopic> topicsInCache) {
  45. when(clustersStorage.getClusterByName(isA(String.class)))
  46. .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
  47. when(topicsService.getTopicsForPagination(isA(KafkaCluster.class)))
  48. .thenReturn(Mono.just(new ArrayList<>(topicsInCache.values())));
  49. when(topicsService.loadTopics(isA(KafkaCluster.class), anyList()))
  50. .thenAnswer(a -> {
  51. List<String> lst = a.getArgument(1);
  52. return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
  53. });
  54. topicsController.setClustersStorage(clustersStorage);
  55. }
  56. @Test
  57. public void shouldListFirst25Topics() {
  58. init(
  59. IntStream.rangeClosed(1, 100).boxed()
  60. .map(Objects::toString)
  61. .map(name -> new TopicDescription(name, false, List.of()))
  62. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  63. Metrics.empty(), InternalLogDirStats.empty(), "_"))
  64. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  65. );
  66. var topics = topicsController
  67. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null,
  68. null, null, null).block();
  69. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  70. assertThat(topics.getBody().getTopics()).hasSize(25);
  71. assertThat(topics.getBody().getTopics())
  72. .isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  73. }
  74. private KafkaCluster buildKafkaCluster(String clusterName) {
  75. return KafkaCluster.builder()
  76. .name(clusterName)
  77. .build();
  78. }
  79. @Test
  80. public void shouldListFirst25TopicsSortedByNameDescendingOrder() {
  81. var internalTopics = IntStream.rangeClosed(1, 100).boxed()
  82. .map(Objects::toString)
  83. .map(name -> new TopicDescription(name, false, List.of()))
  84. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  85. Metrics.empty(), InternalLogDirStats.empty(), "_"))
  86. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
  87. init(internalTopics);
  88. var topics = topicsController
  89. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null,
  90. TopicColumnsToSortDTO.NAME, SortOrderDTO.DESC, null).block();
  91. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  92. assertThat(topics.getBody().getTopics()).hasSize(25);
  93. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName).reversed());
  94. assertThat(topics.getBody().getTopics()).containsExactlyElementsOf(
  95. internalTopics.values().stream()
  96. .map(clusterMapper::toTopic)
  97. .sorted(Comparator.comparing(TopicDTO::getName).reversed())
  98. .limit(25)
  99. .collect(Collectors.toList())
  100. );
  101. }
  102. @Test
  103. public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
  104. init(
  105. IntStream.rangeClosed(1, 100).boxed()
  106. .map(Objects::toString)
  107. .map(name -> new TopicDescription(name, false, List.of()))
  108. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  109. Metrics.empty(), InternalLogDirStats.empty(), "_"))
  110. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  111. );
  112. var topics = topicsController
  113. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, 33, null, null, null, null, null).block();
  114. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  115. assertThat(topics.getBody().getTopics()).hasSize(1);
  116. assertThat(topics.getBody().getTopics().get(0).getName()).isEqualTo("99");
  117. }
  118. @Test
  119. public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
  120. init(
  121. IntStream.rangeClosed(1, 100).boxed()
  122. .map(Objects::toString)
  123. .map(name -> new TopicDescription(name, false, List.of()))
  124. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  125. Metrics.empty(), InternalLogDirStats.empty(), "_"))
  126. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  127. );
  128. var topics = topicsController
  129. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, null, null, null, null, null).block();
  130. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  131. assertThat(topics.getBody().getTopics()).hasSize(25);
  132. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  133. }
  134. @Test
  135. public void shouldListBotInternalAndNonInternalTopics() {
  136. init(
  137. IntStream.rangeClosed(1, 100).boxed()
  138. .map(Objects::toString)
  139. .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
  140. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  141. Metrics.empty(), InternalLogDirStats.empty(), "_"))
  142. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  143. );
  144. var topics = topicsController
  145. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, true, null,
  146. null, null, null).block();
  147. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  148. assertThat(topics.getBody().getTopics()).hasSize(25);
  149. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  150. }
  151. @Test
  152. public void shouldListOnlyNonInternalTopics() {
  153. init(
  154. IntStream.rangeClosed(1, 100).boxed()
  155. .map(Objects::toString)
  156. .map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of()))
  157. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  158. Metrics.empty(), InternalLogDirStats.empty(), "_"))
  159. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  160. );
  161. var topics = topicsController
  162. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, -1, false, null,
  163. null, null, null).block();
  164. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  165. assertThat(topics.getBody().getTopics()).hasSize(5);
  166. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  167. }
  168. @Test
  169. public void shouldListOnlyTopicsContainingOne() {
  170. init(
  171. IntStream.rangeClosed(1, 100).boxed()
  172. .map(Objects::toString)
  173. .map(name -> new TopicDescription(name, false, List.of()))
  174. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  175. Metrics.empty(), InternalLogDirStats.empty(), "_"))
  176. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  177. );
  178. var topics = topicsController
  179. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, "1",
  180. null, null, null).block();
  181. assertThat(topics.getBody().getPageCount()).isEqualTo(1);
  182. assertThat(topics.getBody().getTopics()).hasSize(20);
  183. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  184. }
  185. @Test
  186. public void shouldListTopicsOrderedByPartitionsCount() {
  187. Map<String, InternalTopic> internalTopics = IntStream.rangeClosed(1, 100).boxed()
  188. .map(i -> new TopicDescription(UUID.randomUUID().toString(), false,
  189. IntStream.range(0, i)
  190. .mapToObj(p ->
  191. new TopicPartitionInfo(p, null, List.of(), List.of()))
  192. .collect(Collectors.toList())))
  193. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(),
  194. Metrics.empty(), InternalLogDirStats.empty(), "_"))
  195. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
  196. init(internalTopics);
  197. var topicsSortedAsc = topicsController
  198. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
  199. null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, null, null).block();
  200. assertThat(topicsSortedAsc.getBody().getPageCount()).isEqualTo(4);
  201. assertThat(topicsSortedAsc.getBody().getTopics()).hasSize(25);
  202. assertThat(topicsSortedAsc.getBody().getTopics()).containsExactlyElementsOf(
  203. internalTopics.values().stream()
  204. .map(clusterMapper::toTopic)
  205. .sorted(Comparator.comparing(TopicDTO::getPartitionCount))
  206. .limit(25)
  207. .collect(Collectors.toList())
  208. );
  209. var topicsSortedDesc = topicsController
  210. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
  211. null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, SortOrderDTO.DESC, null).block();
  212. assertThat(topicsSortedDesc.getBody().getPageCount()).isEqualTo(4);
  213. assertThat(topicsSortedDesc.getBody().getTopics()).hasSize(25);
  214. assertThat(topicsSortedDesc.getBody().getTopics()).containsExactlyElementsOf(
  215. internalTopics.values().stream()
  216. .map(clusterMapper::toTopic)
  217. .sorted(Comparator.comparing(TopicDTO::getPartitionCount).reversed())
  218. .limit(25)
  219. .collect(Collectors.toList())
  220. );
  221. }
  222. }