From 7365cfe3948279079f6ca855175f7b596bbd910e Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Fri, 21 Apr 2023 13:19:38 +0400 Subject: [PATCH 1/4] BE: Make paging sizes configurable (#3685) Co-authored-by: iliax Co-authored-by: Roman Zabaluev --- .../kafka/ui/config/ClustersProperties.java | 2 ++ .../ui/controller/MessagesController.java | 7 +--- .../kafka/ui/service/MessagesService.java | 36 ++++++++++++++++--- .../main/resources/swagger/kafka-ui-api.yaml | 4 +++ 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 24b60b5711..1d5cc5393c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -58,6 +58,8 @@ public class ClustersProperties { Integer pollTimeoutMs; Integer partitionPollTimeout; Integer noDataEmptyPolls; + Integer maxPageSize; + Integer defaultPageSize; } @Data diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index 1ba511ab07..aa9d7d5315 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -43,9 +43,6 @@ import reactor.core.scheduler.Schedulers; @Slf4j public class MessagesController extends AbstractController implements MessagesApi { - private static final int MAX_LOAD_RECORD_LIMIT = 100; - private static final int DEFAULT_LOAD_RECORD_LIMIT = 20; - private final MessagesService messagesService; private final DeserializationService deserializationService; private final AccessControlService accessControlService; @@ -91,8 +88,6 @@ public class MessagesController extends AbstractController implements MessagesAp seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING; seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD; filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS; - int recordsLimit = - Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT); var positions = new ConsumerPosition( seekType, @@ -103,7 +98,7 @@ public class MessagesController extends AbstractController implements MessagesAp ResponseEntity.ok( messagesService.loadMessages( getCluster(clusterName), topicName, positions, q, filterQueryType, - recordsLimit, seekDirection, keySerde, valueSerde) + limit, seekDirection, keySerde, valueSerde) ) ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 4f9f0f59f4..f6ad42c110 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service; import com.google.common.util.concurrent.RateLimiter; +import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; import com.provectus.kafka.ui.emitter.MessageFilters; @@ -20,13 +21,13 @@ import com.provectus.kafka.ui.serdes.ProducerRecordCreator; import com.provectus.kafka.ui.util.SslPropertiesUtil; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import javax.annotation.Nullable; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.OffsetSpec; @@ -44,16 +45,35 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @Service -@RequiredArgsConstructor @Slf4j public class MessagesService { + private static final int DEFAULT_MAX_PAGE_SIZE = 500; + private static final int DEFAULT_PAGE_SIZE = 100; // limiting UI messages rate to 20/sec in tailing mode - public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20; + private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20; private final AdminClientService adminClientService; private final DeserializationService deserializationService; private final ConsumerGroupService consumerGroupService; + private final int maxPageSize; + private final int defaultPageSize; + + public MessagesService(AdminClientService adminClientService, + DeserializationService deserializationService, + ConsumerGroupService consumerGroupService, + ClustersProperties properties) { + this.adminClientService = adminClientService; + this.deserializationService = deserializationService; + this.consumerGroupService = consumerGroupService; + + var pollingProps = Optional.ofNullable(properties.getPolling()) + .orElseGet(ClustersProperties.PollingProperties::new); + this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize()) + .orElse(DEFAULT_MAX_PAGE_SIZE); + this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize()) + .orElse(DEFAULT_PAGE_SIZE); + } private Mono withExistingTopic(KafkaCluster cluster, String topicName) { return adminClientService.get(cluster) @@ -139,7 +159,7 @@ public class MessagesService { ConsumerPosition consumerPosition, @Nullable String query, MessageFilterTypeDTO filterQueryType, - int limit, + @Nullable Integer pageSize, SeekDirectionDTO seekDirection, @Nullable String keySerde, @Nullable String valueSerde) { @@ -147,7 +167,13 @@ public class MessagesService { .flux() .publishOn(Schedulers.boundedElastic()) .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query, - filterQueryType, limit, seekDirection, keySerde, valueSerde)); + filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde)); + } + + private int fixPageSize(@Nullable Integer pageSize) { + return Optional.ofNullable(pageSize) + .filter(ps -> ps > 0 && ps <= maxPageSize) + .orElse(defaultPageSize); } private Flux loadMessagesImpl(KafkaCluster cluster, diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 0d54fa7e79..4bd3d2207c 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -3486,6 +3486,10 @@ components: type: integer noDataEmptyPolls: type: integer + maxPageSize: + type: integer + defaultPageSize: + type: integer adminClientTimeout: type: integer internalTopicPrefix: From 0e1f4ddfcf83d27a3caedc37fb2f4a5e110c36bd Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 21 Apr 2023 12:01:29 +0000 Subject: [PATCH 2/4] Bump mockito.version from 5.1.1 to 5.3.0 (#3694) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 42ebd9addc..b5fb102355 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ 5.9.1 - 5.1.1 + 5.3.0 4.10.0 1.17.5 From a33e7064ee3231dc729f5cb1d5a299626088202d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 21 Apr 2023 12:28:14 +0000 Subject: [PATCH 3/4] Bump confluent.version from 7.3.0 to 7.3.3 (#3641) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b5fb102355..beb5744e81 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ 3.19.0 1.11.1 1.12.19 - 7.3.0 + 7.3.3 3.1.0 3.0.13 2.14.0 From aed6c16496ddc1dbc83daceaf3c0efc296083a23 Mon Sep 17 00:00:00 2001 From: David Bejanyan <58771979+David-DB88@users.noreply.github.com> Date: Fri, 21 Apr 2023 17:58:11 +0400 Subject: [PATCH 4/4] FE: Chore: Fix TSC error on submit callback type (#3699) --- .../Topics/Topic/Messages/Filters/AddEditFilterContainer.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/AddEditFilterContainer.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/AddEditFilterContainer.tsx index 757b6e171d..557db159ba 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/AddEditFilterContainer.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/AddEditFilterContainer.tsx @@ -27,7 +27,7 @@ export interface AddEditFilterContainerProps { inputDisplayNameDefaultValue?: string; inputCodeDefaultValue?: string; isAdd?: boolean; - submitCallback?: (values: AddMessageFilters) => Promise; + submitCallback?: (values: AddMessageFilters) => void; } const AddEditFilterContainer: React.FC = ({