|
@@ -1,6 +1,7 @@
|
|
package com.provectus.kafka.ui.service;
|
|
package com.provectus.kafka.ui.service;
|
|
|
|
|
|
import com.google.common.util.concurrent.RateLimiter;
|
|
import com.google.common.util.concurrent.RateLimiter;
|
|
|
|
+import com.provectus.kafka.ui.config.ClustersProperties;
|
|
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
|
|
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
|
|
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
|
|
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
|
|
import com.provectus.kafka.ui.emitter.MessageFilters;
|
|
import com.provectus.kafka.ui.emitter.MessageFilters;
|
|
@@ -20,13 +21,13 @@ import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
|
|
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
|
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.Optional;
|
|
import java.util.Properties;
|
|
import java.util.Properties;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.function.Predicate;
|
|
import java.util.function.Predicate;
|
|
import java.util.function.UnaryOperator;
|
|
import java.util.function.UnaryOperator;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
import javax.annotation.Nullable;
|
|
import javax.annotation.Nullable;
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.kafka.clients.admin.OffsetSpec;
|
|
import org.apache.kafka.clients.admin.OffsetSpec;
|
|
@@ -44,16 +45,35 @@ import reactor.core.publisher.Mono;
|
|
import reactor.core.scheduler.Schedulers;
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
|
|
|
@Service
|
|
@Service
|
|
-@RequiredArgsConstructor
|
|
|
|
@Slf4j
|
|
@Slf4j
|
|
public class MessagesService {
|
|
public class MessagesService {
|
|
|
|
|
|
|
|
+ private static final int DEFAULT_MAX_PAGE_SIZE = 500;
|
|
|
|
+ private static final int DEFAULT_PAGE_SIZE = 100;
|
|
// limiting UI messages rate to 20/sec in tailing mode
|
|
// limiting UI messages rate to 20/sec in tailing mode
|
|
- public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
|
|
|
|
|
|
+ private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
|
|
|
|
|
|
private final AdminClientService adminClientService;
|
|
private final AdminClientService adminClientService;
|
|
private final DeserializationService deserializationService;
|
|
private final DeserializationService deserializationService;
|
|
private final ConsumerGroupService consumerGroupService;
|
|
private final ConsumerGroupService consumerGroupService;
|
|
|
|
+ private final int maxPageSize;
|
|
|
|
+ private final int defaultPageSize;
|
|
|
|
+
|
|
|
|
+ public MessagesService(AdminClientService adminClientService,
|
|
|
|
+ DeserializationService deserializationService,
|
|
|
|
+ ConsumerGroupService consumerGroupService,
|
|
|
|
+ ClustersProperties properties) {
|
|
|
|
+ this.adminClientService = adminClientService;
|
|
|
|
+ this.deserializationService = deserializationService;
|
|
|
|
+ this.consumerGroupService = consumerGroupService;
|
|
|
|
+
|
|
|
|
+ var pollingProps = Optional.ofNullable(properties.getPolling())
|
|
|
|
+ .orElseGet(ClustersProperties.PollingProperties::new);
|
|
|
|
+ this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize())
|
|
|
|
+ .orElse(DEFAULT_MAX_PAGE_SIZE);
|
|
|
|
+ this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize())
|
|
|
|
+ .orElse(DEFAULT_PAGE_SIZE);
|
|
|
|
+ }
|
|
|
|
|
|
private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
|
|
private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
|
|
return adminClientService.get(cluster)
|
|
return adminClientService.get(cluster)
|
|
@@ -139,7 +159,7 @@ public class MessagesService {
|
|
ConsumerPosition consumerPosition,
|
|
ConsumerPosition consumerPosition,
|
|
@Nullable String query,
|
|
@Nullable String query,
|
|
MessageFilterTypeDTO filterQueryType,
|
|
MessageFilterTypeDTO filterQueryType,
|
|
- int limit,
|
|
|
|
|
|
+ @Nullable Integer pageSize,
|
|
SeekDirectionDTO seekDirection,
|
|
SeekDirectionDTO seekDirection,
|
|
@Nullable String keySerde,
|
|
@Nullable String keySerde,
|
|
@Nullable String valueSerde) {
|
|
@Nullable String valueSerde) {
|
|
@@ -147,7 +167,13 @@ public class MessagesService {
|
|
.flux()
|
|
.flux()
|
|
.publishOn(Schedulers.boundedElastic())
|
|
.publishOn(Schedulers.boundedElastic())
|
|
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
|
|
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
|
|
- filterQueryType, limit, seekDirection, keySerde, valueSerde));
|
|
|
|
|
|
+ filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private int fixPageSize(@Nullable Integer pageSize) {
|
|
|
|
+ return Optional.ofNullable(pageSize)
|
|
|
|
+ .filter(ps -> ps > 0 && ps <= maxPageSize)
|
|
|
|
+ .orElse(defaultPageSize);
|
|
}
|
|
}
|
|
|
|
|
|
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
|
|
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
|