diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index 3f8d23a809..443ce32ca2 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -147,6 +147,25 @@
${junit-jupiter-engine.version}
test
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+ org.mockito
+ mockito-junit-jupiter
+ ${mockito.version}
+ test
+
+
+ org.assertj
+ assertj-core
+ ${assertj.version}
+ test
+
+
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
index f4a05fa1db..034ddd9011 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
@@ -1,12 +1,8 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.TopicsApi;
+import com.provectus.kafka.ui.model.*;
import com.provectus.kafka.ui.service.ClusterService;
-import com.provectus.kafka.ui.model.Topic;
-import com.provectus.kafka.ui.model.TopicConfig;
-import com.provectus.kafka.ui.model.TopicDetails;
-import com.provectus.kafka.ui.model.TopicFormData;
-import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.HttpStatus;
@@ -16,6 +12,9 @@ import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import javax.validation.Valid;
+import java.util.Optional;
+
@RestController
@RequiredArgsConstructor
@Log4j2
@@ -59,8 +58,8 @@ public class TopicsController implements TopicsApi {
}
@Override
- public Mono>> getTopics(String clusterName, ServerWebExchange exchange) {
- return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));
+ public Mono> getTopics(String clusterName, @Valid Integer page, @Valid Integer perPage, ServerWebExchange exchange) {
+ return Mono.just(ResponseEntity.ok(clusterService.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(perPage))));
}
@Override
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java
index 17da7d271f..cc49a6d2bc 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java
@@ -19,12 +19,14 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Service
@RequiredArgsConstructor
public class ClusterService {
+ private static final Integer DEFAULT_PAGE_SIZE = 20;
private final ClustersStorage clustersStorage;
private final ClusterMapper clusterMapper;
@@ -62,14 +64,22 @@ public class ClusterService {
}
- public List getTopics(String name) {
- return clustersStorage.getClusterByName(name)
- .map(c ->
- c.getTopics().values().stream()
+ public TopicsResponse getTopics(String name, Optional page, Optional nullablePerPage) {
+ Predicate positiveInt = i -> i > 0;
+ int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
+ var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
+ var cluster = clustersStorage.getClusterByName(name).orElseThrow(() -> new NotFoundException("No such cluster"));
+ var totalPages = (cluster.getTopics().size() / perPage) + (cluster.getTopics().size() % perPage == 0 ? 0 : 1);
+ return new TopicsResponse()
+ .pageCount(totalPages)
+ .topics(
+ cluster.getTopics().values().stream()
+ .sorted(Comparator.comparing(InternalTopic::getName))
+ .skip(topicsToSkip)
+ .limit(perPage)
.map(clusterMapper::toTopic)
- .sorted(Comparator.comparing(Topic::getName))
.collect(Collectors.toList())
- ).orElse(Collections.emptyList());
+ );
}
public Optional getTopicDetails(String name, String topicName) {
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java
new file mode 100644
index 0000000000..23643b4064
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java
@@ -0,0 +1,101 @@
+package com.provectus.kafka.ui.service;
+
+import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.model.InternalTopic;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Topic;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mapstruct.factory.Mappers;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class ClusterServiceTest {
+ @InjectMocks
+ private ClusterService clusterService;
+
+ @Mock
+ private ClustersStorage clustersStorage;
+ @Spy
+ private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class);
+
+ @Test
+ public void shouldListFirst20Topics() {
+ var topicName = UUID.randomUUID().toString();
+
+ when(clustersStorage.getClusterByName(topicName))
+ .thenReturn(Optional.of(KafkaCluster.builder()
+ .topics(
+ IntStream.rangeClosed(1, 100).boxed()
+ .map(Objects::toString)
+ .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
+ .partitions(Map.of())
+ .name(e)
+ .build()))
+ )
+ .build()));
+
+ var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty());
+ assertThat(topics.getPageCount()).isEqualTo(5);
+ assertThat(topics.getTopics()).hasSize(20);
+ assertThat(topics.getTopics()).map(Topic::getName).isSorted();
+ }
+
+ @Test
+ public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
+ var topicName = UUID.randomUUID().toString();
+
+ when(clustersStorage.getClusterByName(topicName))
+ .thenReturn(Optional.of(KafkaCluster.builder()
+ .topics(
+ IntStream.rangeClosed(1, 100).boxed()
+ .map(Objects::toString)
+ .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
+ .partitions(Map.of())
+ .name(e)
+ .build()))
+ )
+ .build()));
+
+ var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33));
+ assertThat(topics.getPageCount()).isEqualTo(4);
+ assertThat(topics.getTopics()).hasSize(1)
+ .first().extracting(Topic::getName).isEqualTo("99");
+ }
+
+ @Test
+ public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
+ var topicName = UUID.randomUUID().toString();
+
+ when(clustersStorage.getClusterByName(topicName))
+ .thenReturn(Optional.of(KafkaCluster.builder()
+ .topics(
+ IntStream.rangeClosed(1, 100).boxed()
+ .map(Objects::toString)
+ .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
+ .partitions(Map.of())
+ .name(e)
+ .build()))
+ )
+ .build()));
+
+ var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1));
+ assertThat(topics.getPageCount()).isEqualTo(5);
+ assertThat(topics.getTopics()).hasSize(20);
+ assertThat(topics.getTopics()).map(Topic::getName).isSorted();
+ }
+}
diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
index e20fb81d90..80c34ce4c4 100644
--- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
+++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
@@ -130,15 +130,23 @@ paths:
required: true
schema:
type: string
+ - name: page
+ in: query
+ required: false
+ schema:
+ type: integer
+ - name: perPage
+ in: query
+ required: false
+ schema:
+ type: integer
responses:
200:
description: OK
content:
application/json:
schema:
- type: array
- items:
- $ref: '#/components/schemas/Topic'
+ $ref: '#/components/schemas/TopicsResponse'
post:
tags:
- Topics
@@ -1140,6 +1148,16 @@ components:
items:
$ref: '#/components/schemas/Metric'
+ TopicsResponse:
+ type: object
+ properties:
+ pageCount:
+ type: integer
+ topics:
+ type: array
+ items:
+ $ref: '#/components/schemas/Topic'
+
Topic:
type: object
properties:
diff --git a/kafka-ui-react-app/src/redux/actions/thunks.ts b/kafka-ui-react-app/src/redux/actions/thunks.ts
index 23171cc1d4..1c49beeb04 100644
--- a/kafka-ui-react-app/src/redux/actions/thunks.ts
+++ b/kafka-ui-react-app/src/redux/actions/thunks.ts
@@ -104,7 +104,7 @@ export const fetchTopicsList = (
dispatch(actions.fetchTopicsListAction.request());
try {
const topics = await topicsApiClient.getTopics({ clusterName });
- dispatch(actions.fetchTopicsListAction.success(topics));
+ dispatch(actions.fetchTopicsListAction.success(topics.topics || []));
} catch (e) {
dispatch(actions.fetchTopicsListAction.failure());
}
diff --git a/pom.xml b/pom.xml
index 62106bc7a2..c0741d2007 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,8 @@
2.2
1.15.1
5.4.0
+ 2.21.0
+ 3.19.0
..//kafka-ui-react-app/src/generated-sources