TopicsServicePaginationTest.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package com.provectus.kafka.ui.service;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import static org.mockito.ArgumentMatchers.anyList;
  4. import static org.mockito.ArgumentMatchers.isA;
  5. import static org.mockito.Mockito.mock;
  6. import static org.mockito.Mockito.when;
  7. import com.provectus.kafka.ui.controller.TopicsController;
  8. import com.provectus.kafka.ui.mapper.ClusterMapper;
  9. import com.provectus.kafka.ui.mapper.ClusterMapperImpl;
  10. import com.provectus.kafka.ui.model.InternalLogDirStats;
  11. import com.provectus.kafka.ui.model.InternalPartitionsOffsets;
  12. import com.provectus.kafka.ui.model.InternalTopic;
  13. import com.provectus.kafka.ui.model.KafkaCluster;
  14. import com.provectus.kafka.ui.model.Metrics;
  15. import com.provectus.kafka.ui.model.SortOrderDTO;
  16. import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
  17. import com.provectus.kafka.ui.model.TopicDTO;
  18. import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
  19. import com.provectus.kafka.ui.service.audit.AuditService;
  20. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  21. import com.provectus.kafka.ui.util.AccessControlServiceMock;
  22. import java.util.ArrayList;
  23. import java.util.Comparator;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.Objects;
  27. import java.util.Optional;
  28. import java.util.UUID;
  29. import java.util.function.Function;
  30. import java.util.stream.Collectors;
  31. import java.util.stream.IntStream;
  32. import org.apache.kafka.clients.admin.TopicDescription;
  33. import org.apache.kafka.common.TopicPartitionInfo;
  34. import org.junit.jupiter.api.Test;
  35. import reactor.core.publisher.Mono;
  36. class TopicsServicePaginationTest {
  37. private static final String LOCAL_KAFKA_CLUSTER_NAME = "local";
  38. private final TopicsService topicsService = mock(TopicsService.class);
  39. private final ClustersStorage clustersStorage = mock(ClustersStorage.class);
  40. private final ClusterMapper clusterMapper = new ClusterMapperImpl();
  41. private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
  42. private final TopicsController topicsController =
  43. new TopicsController(topicsService, mock(TopicAnalysisService.class), clusterMapper);
  44. private void init(Map<String, InternalTopic> topicsInCache) {
  45. when(clustersStorage.getClusterByName(isA(String.class)))
  46. .thenReturn(Optional.of(buildKafkaCluster(LOCAL_KAFKA_CLUSTER_NAME)));
  47. when(topicsService.getTopicsForPagination(isA(KafkaCluster.class)))
  48. .thenReturn(Mono.just(new ArrayList<>(topicsInCache.values())));
  49. when(topicsService.loadTopics(isA(KafkaCluster.class), anyList()))
  50. .thenAnswer(a -> {
  51. List<String> lst = a.getArgument(1);
  52. return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
  53. });
  54. topicsController.setAccessControlService(accessControlService);
  55. topicsController.setAuditService(mock(AuditService.class));
  56. topicsController.setClustersStorage(clustersStorage);
  57. }
  58. @Test
  59. public void shouldListFirst25Topics() {
  60. init(
  61. IntStream.rangeClosed(1, 100).boxed()
  62. .map(Objects::toString)
  63. .map(name -> new TopicDescription(name, false, List.of()))
  64. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  65. Metrics.empty(), null, null, "_"))
  66. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  67. );
  68. var topics = topicsController
  69. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null,
  70. null, null, null).block();
  71. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  72. assertThat(topics.getBody().getTopics()).hasSize(25);
  73. assertThat(topics.getBody().getTopics())
  74. .isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  75. }
  76. private KafkaCluster buildKafkaCluster(String clusterName) {
  77. return KafkaCluster.builder()
  78. .name(clusterName)
  79. .build();
  80. }
  81. @Test
  82. public void shouldListFirst25TopicsSortedByNameDescendingOrder() {
  83. var internalTopics = IntStream.rangeClosed(1, 100).boxed()
  84. .map(Objects::toString)
  85. .map(name -> new TopicDescription(name, false, List.of()))
  86. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  87. Metrics.empty(), null, null, "_"))
  88. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
  89. init(internalTopics);
  90. var topics = topicsController
  91. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, null,
  92. TopicColumnsToSortDTO.NAME, SortOrderDTO.DESC, null).block();
  93. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  94. assertThat(topics.getBody().getTopics()).hasSize(25);
  95. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName).reversed());
  96. assertThat(topics.getBody().getTopics()).containsExactlyElementsOf(
  97. internalTopics.values().stream()
  98. .map(clusterMapper::toTopic)
  99. .sorted(Comparator.comparing(TopicDTO::getName).reversed())
  100. .limit(25)
  101. .collect(Collectors.toList())
  102. );
  103. }
  104. @Test
  105. public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
  106. init(
  107. IntStream.rangeClosed(1, 100).boxed()
  108. .map(Objects::toString)
  109. .map(name -> new TopicDescription(name, false, List.of()))
  110. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  111. Metrics.empty(), null, null, "_"))
  112. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  113. );
  114. var topics = topicsController
  115. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, 33, null, null, null, null, null).block();
  116. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  117. assertThat(topics.getBody().getTopics()).hasSize(1);
  118. assertThat(topics.getBody().getTopics().get(0).getName()).isEqualTo("99");
  119. }
  120. @Test
  121. public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
  122. init(
  123. IntStream.rangeClosed(1, 100).boxed()
  124. .map(Objects::toString)
  125. .map(name -> new TopicDescription(name, false, List.of()))
  126. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  127. Metrics.empty(), null, null, "_"))
  128. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  129. );
  130. var topics = topicsController
  131. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, null, null, null, null, null).block();
  132. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  133. assertThat(topics.getBody().getTopics()).hasSize(25);
  134. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  135. }
  136. @Test
  137. public void shouldListBotInternalAndNonInternalTopics() {
  138. init(
  139. IntStream.rangeClosed(1, 100).boxed()
  140. .map(Objects::toString)
  141. .map(name -> new TopicDescription(name, Integer.parseInt(name) % 10 == 0, List.of()))
  142. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  143. Metrics.empty(), null, null, "_"))
  144. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  145. );
  146. var topics = topicsController
  147. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 0, -1, true, null,
  148. null, null, null).block();
  149. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  150. assertThat(topics.getBody().getTopics()).hasSize(25);
  151. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  152. }
  153. @Test
  154. public void shouldListOnlyNonInternalTopics() {
  155. init(
  156. IntStream.rangeClosed(1, 100).boxed()
  157. .map(Objects::toString)
  158. .map(name -> new TopicDescription(name, Integer.parseInt(name) % 5 == 0, List.of()))
  159. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  160. Metrics.empty(), null, null, "_"))
  161. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  162. );
  163. var topics = topicsController
  164. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, 4, -1, false, null,
  165. null, null, null).block();
  166. assertThat(topics.getBody().getPageCount()).isEqualTo(4);
  167. assertThat(topics.getBody().getTopics()).hasSize(5);
  168. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  169. }
  170. @Test
  171. public void shouldListOnlyTopicsContainingOne() {
  172. init(
  173. IntStream.rangeClosed(1, 100).boxed()
  174. .map(Objects::toString)
  175. .map(name -> new TopicDescription(name, false, List.of()))
  176. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), null,
  177. Metrics.empty(), null, null, "_"))
  178. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()))
  179. );
  180. var topics = topicsController
  181. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null, "1",
  182. null, null, null).block();
  183. assertThat(topics.getBody().getPageCount()).isEqualTo(1);
  184. assertThat(topics.getBody().getTopics()).hasSize(20);
  185. assertThat(topics.getBody().getTopics()).isSortedAccordingTo(Comparator.comparing(TopicDTO::getName));
  186. }
  187. @Test
  188. public void shouldListTopicsOrderedByPartitionsCount() {
  189. Map<String, InternalTopic> internalTopics = IntStream.rangeClosed(1, 100).boxed()
  190. .map(i -> new TopicDescription(UUID.randomUUID().toString(), false,
  191. IntStream.range(0, i)
  192. .mapToObj(p ->
  193. new TopicPartitionInfo(p, null, List.of(), List.of()))
  194. .collect(Collectors.toList())))
  195. .map(topicDescription -> InternalTopic.from(topicDescription, List.of(), InternalPartitionsOffsets.empty(),
  196. Metrics.empty(), null, null, "_"))
  197. .collect(Collectors.toMap(InternalTopic::getName, Function.identity()));
  198. init(internalTopics);
  199. var topicsSortedAsc = topicsController
  200. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
  201. null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, null, null).block();
  202. assertThat(topicsSortedAsc.getBody().getPageCount()).isEqualTo(4);
  203. assertThat(topicsSortedAsc.getBody().getTopics()).hasSize(25);
  204. assertThat(topicsSortedAsc.getBody().getTopics()).containsExactlyElementsOf(
  205. internalTopics.values().stream()
  206. .map(clusterMapper::toTopic)
  207. .sorted(Comparator.comparing(TopicDTO::getPartitionCount))
  208. .limit(25)
  209. .collect(Collectors.toList())
  210. );
  211. var topicsSortedDesc = topicsController
  212. .getTopics(LOCAL_KAFKA_CLUSTER_NAME, null, null, null,
  213. null, TopicColumnsToSortDTO.TOTAL_PARTITIONS, SortOrderDTO.DESC, null).block();
  214. assertThat(topicsSortedDesc.getBody().getPageCount()).isEqualTo(4);
  215. assertThat(topicsSortedDesc.getBody().getTopics()).hasSize(25);
  216. assertThat(topicsSortedDesc.getBody().getTopics()).containsExactlyElementsOf(
  217. internalTopics.values().stream()
  218. .map(clusterMapper::toTopic)
  219. .sorted(Comparator.comparing(TopicDTO::getPartitionCount).reversed())
  220. .limit(25)
  221. .collect(Collectors.toList())
  222. );
  223. }
  224. }