瀏覽代碼

#188 added pagination for get topics api (#249)

* added pagination for get topics api

* frontend fix

* - fixed merge conflicts
- renamed pageSize to perPage
Ramazan Yapparov 4 年之前
父節點
當前提交
a8ed4ff37f

+ 19 - 0
kafka-ui-api/pom.xml

@@ -147,6 +147,25 @@
             <version>${junit-jupiter-engine.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 6 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -1,12 +1,8 @@
 package com.provectus.kafka.ui.controller;
 
 import com.provectus.kafka.ui.api.TopicsApi;
+import com.provectus.kafka.ui.model.*;
 import com.provectus.kafka.ui.service.ClusterService;
-import com.provectus.kafka.ui.model.Topic;
-import com.provectus.kafka.ui.model.TopicConfig;
-import com.provectus.kafka.ui.model.TopicDetails;
-import com.provectus.kafka.ui.model.TopicFormData;
-import javax.validation.Valid;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.http.HttpStatus;
@@ -16,6 +12,9 @@ import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import javax.validation.Valid;
+import java.util.Optional;
+
 @RestController
 @RequiredArgsConstructor
 @Log4j2
@@ -59,8 +58,8 @@ public class TopicsController implements TopicsApi {
   }
 
   @Override
-  public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
-    return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));
+  public Mono<ResponseEntity<TopicsResponse>> getTopics(String clusterName, @Valid Integer page, @Valid Integer perPage, ServerWebExchange exchange) {
+    return Mono.just(ResponseEntity.ok(clusterService.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(perPage))));
   }
 
   @Override

+ 16 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java

@@ -19,12 +19,14 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.*;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 @Service
 @RequiredArgsConstructor
 public class ClusterService {
+    private static final Integer DEFAULT_PAGE_SIZE = 20;
 
     private final ClustersStorage clustersStorage;
     private final ClusterMapper clusterMapper;
@@ -62,14 +64,22 @@ public class ClusterService {
     }
 
 
-    public List<Topic> getTopics(String name) {
-        return clustersStorage.getClusterByName(name)
-                .map(c ->
-                        c.getTopics().values().stream()
+    public TopicsResponse getTopics(String name, Optional<Integer> page, Optional<Integer> nullablePerPage) {
+        Predicate<Integer> positiveInt = i -> i > 0;
+        int perPage = nullablePerPage.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
+        var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * perPage;
+        var cluster = clustersStorage.getClusterByName(name).orElseThrow(() -> new NotFoundException("No such cluster"));
+        var totalPages = (cluster.getTopics().size() / perPage) + (cluster.getTopics().size() % perPage == 0 ? 0 : 1);
+        return new TopicsResponse()
+                .pageCount(totalPages)
+                .topics(
+                        cluster.getTopics().values().stream()
+                                .sorted(Comparator.comparing(InternalTopic::getName))
+                                .skip(topicsToSkip)
+                                .limit(perPage)
                                 .map(clusterMapper::toTopic)
-                                .sorted(Comparator.comparing(Topic::getName))
                                 .collect(Collectors.toList())
-                ).orElse(Collections.emptyList());
+                );
     }
 
     public Optional<TopicDetails> getTopicDetails(String name, String topicName) {

+ 101 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ClusterServiceTest.java

@@ -0,0 +1,101 @@
+package com.provectus.kafka.ui.service;
+
+import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.model.InternalTopic;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.Topic;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mapstruct.factory.Mappers;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class ClusterServiceTest {
+    @InjectMocks
+    private ClusterService clusterService;
+
+    @Mock
+    private ClustersStorage clustersStorage;
+    @Spy
+    private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class);
+
+    @Test
+    public void shouldListFirst20Topics() {
+        var topicName = UUID.randomUUID().toString();
+
+        when(clustersStorage.getClusterByName(topicName))
+                .thenReturn(Optional.of(KafkaCluster.builder()
+                        .topics(
+                                IntStream.rangeClosed(1, 100).boxed()
+                                        .map(Objects::toString)
+                                        .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
+                                                .partitions(Map.of())
+                                                .name(e)
+                                                .build()))
+                        )
+                        .build()));
+
+        var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty());
+        assertThat(topics.getPageCount()).isEqualTo(5);
+        assertThat(topics.getTopics()).hasSize(20);
+        assertThat(topics.getTopics()).map(Topic::getName).isSorted();
+    }
+
+    @Test
+    public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
+        var topicName = UUID.randomUUID().toString();
+
+        when(clustersStorage.getClusterByName(topicName))
+                .thenReturn(Optional.of(KafkaCluster.builder()
+                        .topics(
+                                IntStream.rangeClosed(1, 100).boxed()
+                                        .map(Objects::toString)
+                                        .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
+                                                .partitions(Map.of())
+                                                .name(e)
+                                                .build()))
+                        )
+                        .build()));
+
+        var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33));
+        assertThat(topics.getPageCount()).isEqualTo(4);
+        assertThat(topics.getTopics()).hasSize(1)
+                .first().extracting(Topic::getName).isEqualTo("99");
+    }
+
+    @Test
+    public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
+        var topicName = UUID.randomUUID().toString();
+
+        when(clustersStorage.getClusterByName(topicName))
+                .thenReturn(Optional.of(KafkaCluster.builder()
+                        .topics(
+                                IntStream.rangeClosed(1, 100).boxed()
+                                        .map(Objects::toString)
+                                        .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
+                                                .partitions(Map.of())
+                                                .name(e)
+                                                .build()))
+                        )
+                        .build()));
+
+        var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1));
+        assertThat(topics.getPageCount()).isEqualTo(5);
+        assertThat(topics.getTopics()).hasSize(20);
+        assertThat(topics.getTopics()).map(Topic::getName).isSorted();
+    }
+}

+ 21 - 3
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -130,15 +130,23 @@ paths:
           required: true
           schema:
             type: string
+        - name: page
+          in: query
+          required: false
+          schema:
+            type: integer
+        - name: perPage
+          in: query
+          required: false
+          schema:
+            type: integer
       responses:
         200:
           description: OK
           content:
             application/json:
               schema:
-                type: array
-                items:
-                  $ref: '#/components/schemas/Topic'
+                $ref: '#/components/schemas/TopicsResponse'
     post:
       tags:
         - Topics
@@ -1140,6 +1148,16 @@ components:
           items:
             $ref: '#/components/schemas/Metric'
 
+    TopicsResponse:
+      type: object
+      properties:
+        pageCount:
+          type: integer
+        topics:
+          type: array
+          items:
+            $ref: '#/components/schemas/Topic'
+
     Topic:
       type: object
       properties:

+ 1 - 1
kafka-ui-react-app/src/redux/actions/thunks.ts

@@ -104,7 +104,7 @@ export const fetchTopicsList = (
   dispatch(actions.fetchTopicsListAction.request());
   try {
     const topics = await topicsApiClient.getTopics({ clusterName });
-    dispatch(actions.fetchTopicsListAction.success(topics));
+    dispatch(actions.fetchTopicsListAction.success(topics.topics || []));
   } catch (e) {
     dispatch(actions.fetchTopicsListAction.failure());
   }

+ 2 - 0
pom.xml

@@ -36,6 +36,8 @@
 		<apache.commons.version>2.2</apache.commons.version>
 		<test.containers.version>1.15.1</test.containers.version>
 		<junit-jupiter-engine.version>5.4.0</junit-jupiter-engine.version>
+		<mockito.version>2.21.0</mockito.version>
+		<assertj.version>3.19.0</assertj.version>
 
 		<frontend-generated-sources-directory>..//kafka-ui-react-app/src/generated-sources</frontend-generated-sources-directory>
 	</properties>