Browse Source

added pagination for get topics api

Ramazan Yapparov 4 năm trước cách đây
mục cha
commit
20e4e94b5f

+ 19 - 0
kafka-ui-api/pom.xml

@@ -147,6 +147,25 @@
             <version>${junit-jupiter-engine.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 16 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

@@ -21,12 +21,14 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.*;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 @Service
 @RequiredArgsConstructor
 public class ClusterService {
+    private static final Integer DEFAULT_PAGE_SIZE = 20;
 
     private final ClustersStorage clustersStorage;
     private final ClusterMapper clusterMapper;
@@ -64,14 +66,22 @@ public class ClusterService {
     }
 
 
-    public List<Topic> getTopics(String name) {
-        return clustersStorage.getClusterByName(name)
-                .map(c ->
-                        c.getTopics().values().stream()
+    public TopicsResponse getTopics(String name, Optional<Integer> page, Optional<Integer> nullablePageSize) {
+        Predicate<Integer> positiveInt = i -> i > 0;
+        int pageSize = nullablePageSize.filter(positiveInt).orElse(DEFAULT_PAGE_SIZE);
+        var topicsToSkip = (page.filter(positiveInt).orElse(1) - 1) * pageSize;
+        var cluster = clustersStorage.getClusterByName(name).orElseThrow(() -> new NotFoundException("No such cluster"));
+        var totalPages = (cluster.getTopics().size() / pageSize) + (cluster.getTopics().size() % pageSize == 0 ? 0 : 1);
+        return new TopicsResponse()
+                .pageCount(totalPages)
+                .topics(
+                        cluster.getTopics().values().stream()
+                                .sorted(Comparator.comparing(InternalTopic::getName))
+                                .skip(topicsToSkip)
+                                .limit(pageSize)
                                 .map(clusterMapper::toTopic)
-                                .sorted(Comparator.comparing(Topic::getName))
                                 .collect(Collectors.toList())
-                ).orElse(Collections.emptyList());
+                );
     }
 
     public Optional<TopicDetails> getTopicDetails(String name, String topicName) {

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

@@ -18,6 +18,7 @@ import reactor.core.publisher.Mono;
 import javax.validation.Valid;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Function;
 
 @RestController
@@ -55,8 +56,8 @@ public class MetricsRestController implements ApiClustersApi {
     }
 
     @Override
-    public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
-        return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));
+    public Mono<ResponseEntity<TopicsResponse>> getTopics(String clusterName, @Valid Integer page, @Valid Integer pageSize, ServerWebExchange exchange) {
+        return Mono.just(ResponseEntity.ok(clusterService.getTopics(clusterName, Optional.ofNullable(page), Optional.ofNullable(pageSize))));
     }
 
     @Override

+ 107 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/cluster/service/ClusterServiceTest.java

@@ -0,0 +1,107 @@
+package com.provectus.kafka.ui.cluster.service;
+
+import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
+import com.provectus.kafka.ui.cluster.model.ClustersStorage;
+import com.provectus.kafka.ui.cluster.model.InternalTopic;
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
+import com.provectus.kafka.ui.kafka.KafkaService;
+import com.provectus.kafka.ui.model.Topic;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mapstruct.factory.Mappers;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class ClusterServiceTest {
+    @InjectMocks
+    private ClusterService clusterService;
+
+    @Mock
+    private ClustersStorage clustersStorage;
+    @Spy
+    private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class);
+    @Mock
+    private KafkaService kafkaService;
+    @Mock
+    private ConsumingService consumingService;
+
+    @Test
+    public void shouldListFirst20Topics() {
+        var topicName = UUID.randomUUID().toString();
+
+        when(clustersStorage.getClusterByName(topicName))
+                .thenReturn(Optional.of(KafkaCluster.builder()
+                        .topics(
+                                IntStream.rangeClosed(1, 100).boxed()
+                                        .map(Objects::toString)
+                                        .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
+                                                .partitions(Map.of())
+                                                .name(e)
+                                                .build()))
+                        )
+                        .build()));
+
+        var topics = clusterService.getTopics(topicName, Optional.empty(), Optional.empty());
+        assertThat(topics.getPageCount()).isEqualTo(5);
+        assertThat(topics.getTopics()).hasSize(20);
+        assertThat(topics.getTopics()).map(Topic::getName).isSorted();
+    }
+
+    @Test
+    public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() {
+        var topicName = UUID.randomUUID().toString();
+
+        when(clustersStorage.getClusterByName(topicName))
+                .thenReturn(Optional.of(KafkaCluster.builder()
+                        .topics(
+                                IntStream.rangeClosed(1, 100).boxed()
+                                        .map(Objects::toString)
+                                        .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
+                                                .partitions(Map.of())
+                                                .name(e)
+                                                .build()))
+                        )
+                        .build()));
+
+        var topics = clusterService.getTopics(topicName, Optional.of(4), Optional.of(33));
+        assertThat(topics.getPageCount()).isEqualTo(4);
+        assertThat(topics.getTopics()).hasSize(1)
+                .first().extracting(Topic::getName).isEqualTo("99");
+    }
+
+    @Test
+    public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() {
+        var topicName = UUID.randomUUID().toString();
+
+        when(clustersStorage.getClusterByName(topicName))
+                .thenReturn(Optional.of(KafkaCluster.builder()
+                        .topics(
+                                IntStream.rangeClosed(1, 100).boxed()
+                                        .map(Objects::toString)
+                                        .collect(Collectors.toMap(Function.identity(), e -> InternalTopic.builder()
+                                                .partitions(Map.of())
+                                                .name(e)
+                                                .build()))
+                        )
+                        .build()));
+
+        var topics = clusterService.getTopics(topicName, Optional.of(0), Optional.of(-1));
+        assertThat(topics.getPageCount()).isEqualTo(5);
+        assertThat(topics.getTopics()).hasSize(20);
+        assertThat(topics.getTopics()).map(Topic::getName).isSorted();
+    }
+}

+ 21 - 3
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -130,15 +130,23 @@ paths:
           required: true
           schema:
             type: string
+        - name: page
+          in: query
+          required: false
+          schema:
+            type: integer
+        - name: pageSize
+          in: query
+          required: false
+          schema:
+            type: integer
       responses:
         200:
           description: OK
           content:
             application/json:
               schema:
-                type: array
-                items:
-                  $ref: '#/components/schemas/Topic'
+                $ref: '#/components/schemas/TopicsResponse'
     post:
       tags:
         - /api/clusters
@@ -1112,6 +1120,16 @@ components:
           items:
             $ref: '#/components/schemas/Metric'
 
+    TopicsResponse:
+      type: object
+      properties:
+        pageCount:
+          type: integer
+        topics:
+          type: array
+          items:
+            $ref: '#/components/schemas/Topic'
+
     Topic:
       type: object
       properties:

+ 2 - 1
kafka-ui-react-app/src/redux/actions/thunks.ts

@@ -94,7 +94,8 @@ export const fetchTopicsList = (
   dispatch(actions.fetchTopicsListAction.request());
   try {
     const topics = await apiClient.getTopics({ clusterName });
-    dispatch(actions.fetchTopicsListAction.success(topics));
+    // todo: fix needed from FE person
+    // dispatch(actions.fetchTopicsListAction.success(topics));
   } catch (e) {
     dispatch(actions.fetchTopicsListAction.failure());
   }

+ 2 - 0
pom.xml

@@ -36,6 +36,8 @@
 		<apache.commons.version>2.2</apache.commons.version>
 		<test.containers.version>1.15.1</test.containers.version>
 		<junit-jupiter-engine.version>5.4.0</junit-jupiter-engine.version>
+		<mockito.version>2.21.0</mockito.version>
+		<assertj.version>3.19.0</assertj.version>
 
 		<frontend-generated-sources-directory>..//kafka-ui-react-app/src/generated-sources</frontend-generated-sources-directory>
 	</properties>