Merge branch 'master' of github.com:provectus/kafka-ui into issues/2764

This commit is contained in:
tom.kaszuba 2023-04-21 19:25:38 +02:00
commit ae75e4bad5
6 changed files with 41 additions and 14 deletions

View file

@ -58,6 +58,8 @@ public class ClustersProperties {
Integer pollTimeoutMs; Integer pollTimeoutMs;
Integer partitionPollTimeout; Integer partitionPollTimeout;
Integer noDataEmptyPolls; Integer noDataEmptyPolls;
Integer maxPageSize;
Integer defaultPageSize;
} }
@Data @Data

View file

@ -43,9 +43,6 @@ import reactor.core.scheduler.Schedulers;
@Slf4j @Slf4j
public class MessagesController extends AbstractController implements MessagesApi { public class MessagesController extends AbstractController implements MessagesApi {
private static final int MAX_LOAD_RECORD_LIMIT = 100;
private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
private final MessagesService messagesService; private final MessagesService messagesService;
private final DeserializationService deserializationService; private final DeserializationService deserializationService;
private final AccessControlService accessControlService; private final AccessControlService accessControlService;
@ -91,8 +88,6 @@ public class MessagesController extends AbstractController implements MessagesAp
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING; seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD; seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS; filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
int recordsLimit =
Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
var positions = new ConsumerPosition( var positions = new ConsumerPosition(
seekType, seekType,
@ -103,7 +98,7 @@ public class MessagesController extends AbstractController implements MessagesAp
ResponseEntity.ok( ResponseEntity.ok(
messagesService.loadMessages( messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType, getCluster(clusterName), topicName, positions, q, filterQueryType,
recordsLimit, seekDirection, keySerde, valueSerde) limit, seekDirection, keySerde, valueSerde)
) )
); );

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.service; package com.provectus.kafka.ui.service;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters; import com.provectus.kafka.ui.emitter.MessageFilters;
@ -20,13 +21,13 @@ import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.SslPropertiesUtil; import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.UnaryOperator; import java.util.function.UnaryOperator;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.OffsetSpec;
@ -44,16 +45,35 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@Service @Service
@RequiredArgsConstructor
@Slf4j @Slf4j
public class MessagesService { public class MessagesService {
private static final int DEFAULT_MAX_PAGE_SIZE = 500;
private static final int DEFAULT_PAGE_SIZE = 100;
// limiting UI messages rate to 20/sec in tailing mode // limiting UI messages rate to 20/sec in tailing mode
public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20; private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
private final AdminClientService adminClientService; private final AdminClientService adminClientService;
private final DeserializationService deserializationService; private final DeserializationService deserializationService;
private final ConsumerGroupService consumerGroupService; private final ConsumerGroupService consumerGroupService;
private final int maxPageSize;
private final int defaultPageSize;
public MessagesService(AdminClientService adminClientService,
DeserializationService deserializationService,
ConsumerGroupService consumerGroupService,
ClustersProperties properties) {
this.adminClientService = adminClientService;
this.deserializationService = deserializationService;
this.consumerGroupService = consumerGroupService;
var pollingProps = Optional.ofNullable(properties.getPolling())
.orElseGet(ClustersProperties.PollingProperties::new);
this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize())
.orElse(DEFAULT_MAX_PAGE_SIZE);
this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize())
.orElse(DEFAULT_PAGE_SIZE);
}
private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) { private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
return adminClientService.get(cluster) return adminClientService.get(cluster)
@ -139,7 +159,7 @@ public class MessagesService {
ConsumerPosition consumerPosition, ConsumerPosition consumerPosition,
@Nullable String query, @Nullable String query,
MessageFilterTypeDTO filterQueryType, MessageFilterTypeDTO filterQueryType,
int limit, @Nullable Integer pageSize,
SeekDirectionDTO seekDirection, SeekDirectionDTO seekDirection,
@Nullable String keySerde, @Nullable String keySerde,
@Nullable String valueSerde) { @Nullable String valueSerde) {
@ -147,7 +167,13 @@ public class MessagesService {
.flux() .flux()
.publishOn(Schedulers.boundedElastic()) .publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query, .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
filterQueryType, limit, seekDirection, keySerde, valueSerde)); filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
}
private int fixPageSize(@Nullable Integer pageSize) {
return Optional.ofNullable(pageSize)
.filter(ps -> ps > 0 && ps <= maxPageSize)
.orElse(defaultPageSize);
} }
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster, private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,

View file

@ -3486,6 +3486,10 @@ components:
type: integer type: integer
noDataEmptyPolls: noDataEmptyPolls:
type: integer type: integer
maxPageSize:
type: integer
defaultPageSize:
type: integer
adminClientTimeout: adminClientTimeout:
type: integer type: integer
internalTopicPrefix: internalTopicPrefix:

View file

@ -27,7 +27,7 @@ export interface AddEditFilterContainerProps {
inputDisplayNameDefaultValue?: string; inputDisplayNameDefaultValue?: string;
inputCodeDefaultValue?: string; inputCodeDefaultValue?: string;
isAdd?: boolean; isAdd?: boolean;
submitCallback?: (values: AddMessageFilters) => Promise<void>; submitCallback?: (values: AddMessageFilters) => void;
} }
const AddEditFilterContainer: React.FC<AddEditFilterContainerProps> = ({ const AddEditFilterContainer: React.FC<AddEditFilterContainerProps> = ({

View file

@ -26,7 +26,7 @@
<assertj.version>3.19.0</assertj.version> <assertj.version>3.19.0</assertj.version>
<avro.version>1.11.1</avro.version> <avro.version>1.11.1</avro.version>
<byte-buddy.version>1.12.19</byte-buddy.version> <byte-buddy.version>1.12.19</byte-buddy.version>
<confluent.version>7.3.0</confluent.version> <confluent.version>7.3.3</confluent.version>
<datasketches-java.version>3.1.0</datasketches-java.version> <datasketches-java.version>3.1.0</datasketches-java.version>
<groovy.version>3.0.13</groovy.version> <groovy.version>3.0.13</groovy.version>
<jackson.version>2.14.0</jackson.version> <jackson.version>2.14.0</jackson.version>
@ -43,7 +43,7 @@
<!-- Test dependency versions --> <!-- Test dependency versions -->
<junit.version>5.9.1</junit.version> <junit.version>5.9.1</junit.version>
<mockito.version>5.1.1</mockito.version> <mockito.version>5.3.0</mockito.version>
<okhttp3.mockwebserver.version>4.10.0</okhttp3.mockwebserver.version> <okhttp3.mockwebserver.version>4.10.0</okhttp3.mockwebserver.version>
<testcontainers.version>1.17.5</testcontainers.version> <testcontainers.version>1.17.5</testcontainers.version>