Преглед изворни кода

Polling timeouts made configurable (#3513)

1. Polling timeouts made configurable
2. polling-related classes moved to emitter package

---------

Co-authored-by: iliax <ikuramshin@provectus.com>
Ilya Kuramshin пре 2 година
родитељ
комит
bd6394cb14
17 измењених фајлова са 184 додато и 69 уклоњено
  1. 9 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  2. 5 12
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
  3. 10 14
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  4. 28 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java
  5. 6 7
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  6. 79 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java
  7. 2 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java
  8. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ResultSizeLimiter.java
  9. 2 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
  10. 2 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  11. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java
  12. 4 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java
  13. 4 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
  14. 10 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
  15. 11 11
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
  16. 1 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java
  17. 9 0
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -27,6 +27,8 @@ public class ClustersProperties {
 
 
   String internalTopicPrefix;
   String internalTopicPrefix;
 
 
+  PollingProperties polling = new PollingProperties();
+
   @Data
   @Data
   public static class Cluster {
   public static class Cluster {
     String name;
     String name;
@@ -49,6 +51,13 @@ public class ClustersProperties {
     TruststoreConfig ssl;
     TruststoreConfig ssl;
   }
   }
 
 
+  @Data
+  public static class PollingProperties {
+    Integer pollTimeoutMs;
+    Integer partitionPollTimeout;
+    Integer noDataEmptyPolls;
+  }
+
   @Data
   @Data
   @ToString(exclude = "password")
   @ToString(exclude = "password")
   public static class MetricsConfigData {
   public static class MetricsConfigData {

+ 5 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -4,7 +4,6 @@ import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
-import com.provectus.kafka.ui.util.PollingThrottler;
 import java.time.Duration;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.Instant;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -14,27 +13,21 @@ import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.FluxSink;
 
 
 public abstract class AbstractEmitter {
 public abstract class AbstractEmitter {
-  private static final Duration DEFAULT_POLL_TIMEOUT_MS = Duration.ofMillis(1000L);
-
-  // In some situations it is hard to say whether records range (between two offsets) was fully polled.
-  // This happens when we have holes in records sequences that is usual case for compact topics or
-  // topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
-  // there is no guarantee that you will ever see record with offset Y.
-  // To workaround this we can assume that after N consecutive empty polls all target messages were read.
-  public static final int NO_MORE_DATA_EMPTY_POLLS_COUNT = 3;
 
 
   private final ConsumerRecordDeserializer recordDeserializer;
   private final ConsumerRecordDeserializer recordDeserializer;
   private final ConsumingStats consumingStats = new ConsumingStats();
   private final ConsumingStats consumingStats = new ConsumingStats();
   private final PollingThrottler throttler;
   private final PollingThrottler throttler;
+  protected final PollingSettings pollingSettings;
 
 
-  protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingThrottler throttler) {
+  protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingSettings pollingSettings) {
     this.recordDeserializer = recordDeserializer;
     this.recordDeserializer = recordDeserializer;
-    this.throttler = throttler;
+    this.pollingSettings = pollingSettings;
+    this.throttler = pollingSettings.getPollingThrottler();
   }
   }
 
 
   protected ConsumerRecords<Bytes, Bytes> poll(
   protected ConsumerRecords<Bytes, Bytes> poll(
       FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
       FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
-    return poll(sink, consumer, DEFAULT_POLL_TIMEOUT_MS);
+    return poll(sink, consumer, pollingSettings.getPollTimeout());
   }
   }
 
 
   protected ConsumerRecords<Bytes, Bytes> poll(
   protected ConsumerRecords<Bytes, Bytes> poll(

+ 10 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -3,15 +3,12 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
-import com.provectus.kafka.ui.util.PollingThrottler;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.List;
 import java.util.List;
 import java.util.TreeMap;
 import java.util.TreeMap;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -26,8 +23,6 @@ public class BackwardRecordEmitter
     extends AbstractEmitter
     extends AbstractEmitter
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
     implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
 
 
-  private static final Duration POLL_TIMEOUT = Duration.ofMillis(200);
-
   private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
   private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
   private final ConsumerPosition consumerPosition;
   private final ConsumerPosition consumerPosition;
   private final int messagesPerPage;
   private final int messagesPerPage;
@@ -37,8 +32,8 @@ public class BackwardRecordEmitter
       ConsumerPosition consumerPosition,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
       int messagesPerPage,
       ConsumerRecordDeserializer recordDeserializer,
       ConsumerRecordDeserializer recordDeserializer,
-      PollingThrottler throttler) {
-    super(recordDeserializer, throttler);
+      PollingSettings pollingSettings) {
+    super(recordDeserializer, pollingSettings);
     this.consumerPosition = consumerPosition;
     this.consumerPosition = consumerPosition;
     this.messagesPerPage = messagesPerPage;
     this.messagesPerPage = messagesPerPage;
     this.consumerSupplier = consumerSupplier;
     this.consumerSupplier = consumerSupplier;
@@ -109,17 +104,18 @@ public class BackwardRecordEmitter
 
 
     var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
     var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
 
 
-    // we use empty polls counting to verify that partition was fully read
-    for (int emptyPolls = 0; recordsToSend.size() < desiredMsgsToPoll && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) {
-      var polledRecords = poll(sink, consumer, POLL_TIMEOUT);
-      log.debug("{} records polled from {}", polledRecords.count(), tp);
+    EmptyPollsCounter emptyPolls  = pollingSettings.createEmptyPollsCounter();
+    while (!sink.isCancelled()
+        && recordsToSend.size() < desiredMsgsToPoll
+        && !emptyPolls.noDataEmptyPollsReached()) {
+      var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
+      emptyPolls.count(polledRecords);
 
 
-      // counting sequential empty polls
-      emptyPolls = polledRecords.isEmpty() ? emptyPolls + 1 : 0;
+      log.debug("{} records polled from {}", polledRecords.count(), tp);
 
 
       var filteredRecords = polledRecords.records(tp).stream()
       var filteredRecords = polledRecords.records(tp).stream()
           .filter(r -> r.offset() < toOffset)
           .filter(r -> r.offset() < toOffset)
-          .collect(Collectors.toList());
+          .toList();
 
 
       if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
       if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
         // we already read all messages in target offsets interval
         // we already read all messages in target offsets interval

+ 28 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/EmptyPollsCounter.java

@@ -0,0 +1,28 @@
+package com.provectus.kafka.ui.emitter;
+
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+// In some situations it is hard to say whether records range (between two offsets) was fully polled.
+// This happens when we have holes in records sequences that is usual case for compact topics or
+// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
+// there is no guarantee that you will ever see record with offset Y.
+// To workaround this we can assume that after N consecutive empty polls all target messages were read.
+public class EmptyPollsCounter {
+
+  private final int maxEmptyPolls;
+
+  private int emptyPolls = 0;
+
+  EmptyPollsCounter(int maxEmptyPolls) {
+    this.maxEmptyPolls = maxEmptyPolls;
+  }
+
+  public void count(ConsumerRecords<?, ?> polled) {
+    emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
+  }
+
+  public boolean noDataEmptyPollsReached() {
+    return emptyPolls >= maxEmptyPolls;
+  }
+
+}

+ 6 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -3,7 +3,6 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
-import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -25,8 +24,8 @@ public class ForwardRecordEmitter
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       ConsumerPosition position,
       ConsumerPosition position,
       ConsumerRecordDeserializer recordDeserializer,
       ConsumerRecordDeserializer recordDeserializer,
-      PollingThrottler throttler) {
-    super(recordDeserializer, throttler);
+      PollingSettings pollingSettings) {
+    super(recordDeserializer, pollingSettings);
     this.position = position;
     this.position = position;
     this.consumerSupplier = consumerSupplier;
     this.consumerSupplier = consumerSupplier;
   }
   }
@@ -39,16 +38,16 @@ public class ForwardRecordEmitter
       var seekOperations = SeekOperations.create(consumer, position);
       var seekOperations = SeekOperations.create(consumer, position);
       seekOperations.assignAndSeekNonEmptyPartitions();
       seekOperations.assignAndSeekNonEmptyPartitions();
 
 
-      // we use empty polls counting to verify that topic was fully read
-      int emptyPolls = 0;
+      EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
       while (!sink.isCancelled()
       while (!sink.isCancelled()
           && !seekOperations.assignedPartitionsFullyPolled()
           && !seekOperations.assignedPartitionsFullyPolled()
-          && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT) {
+          && !emptyPolls.noDataEmptyPollsReached()) {
 
 
         sendPhase(sink, "Polling");
         sendPhase(sink, "Polling");
         ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
         ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
+        emptyPolls.count(records);
+
         log.debug("{} records polled", records.count());
         log.debug("{} records polled", records.count());
-        emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0;
 
 
         for (ConsumerRecord<Bytes, Bytes> msg : records) {
         for (ConsumerRecord<Bytes, Bytes> msg : records) {
           if (!sink.isCancelled()) {
           if (!sink.isCancelled()) {

+ 79 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingSettings.java

@@ -0,0 +1,79 @@
+package com.provectus.kafka.ui.emitter;
+
+import com.provectus.kafka.ui.config.ClustersProperties;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public class PollingSettings {
+
+  private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(1_000);
+  private static final Duration DEFAULT_PARTITION_POLL_TIMEOUT = Duration.ofMillis(200);
+  private static final int DEFAULT_NO_DATA_EMPTY_POLLS = 3;
+
+  private final Duration pollTimeout;
+  private final Duration partitionPollTimeout;
+  private final int notDataEmptyPolls; //see EmptyPollsCounter docs
+
+  private final Supplier<PollingThrottler> throttlerSupplier;
+
+  public static PollingSettings create(ClustersProperties.Cluster cluster,
+                                       ClustersProperties clustersProperties) {
+    var pollingProps = Optional.ofNullable(clustersProperties.getPolling())
+        .orElseGet(ClustersProperties.PollingProperties::new);
+
+    var pollTimeout = pollingProps.getPollTimeoutMs() != null
+        ? Duration.ofMillis(pollingProps.getPollTimeoutMs())
+        : DEFAULT_POLL_TIMEOUT;
+
+    var partitionPollTimeout = pollingProps.getPartitionPollTimeout() != null
+        ? Duration.ofMillis(pollingProps.getPartitionPollTimeout())
+        : Duration.ofMillis(pollTimeout.toMillis() / 5);
+
+    int noDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null
+        ? pollingProps.getNoDataEmptyPolls()
+        : DEFAULT_NO_DATA_EMPTY_POLLS;
+
+    return new PollingSettings(
+        pollTimeout,
+        partitionPollTimeout,
+        noDataEmptyPolls,
+        PollingThrottler.throttlerSupplier(cluster)
+    );
+  }
+
+  public static PollingSettings createDefault() {
+    return new PollingSettings(
+        DEFAULT_POLL_TIMEOUT,
+        DEFAULT_PARTITION_POLL_TIMEOUT,
+        DEFAULT_NO_DATA_EMPTY_POLLS,
+        PollingThrottler::noop
+    );
+  }
+
+  private PollingSettings(Duration pollTimeout,
+                          Duration partitionPollTimeout,
+                          int notDataEmptyPolls,
+                          Supplier<PollingThrottler> throttlerSupplier) {
+    this.pollTimeout = pollTimeout;
+    this.partitionPollTimeout = partitionPollTimeout;
+    this.notDataEmptyPolls = notDataEmptyPolls;
+    this.throttlerSupplier = throttlerSupplier;
+  }
+
+  public EmptyPollsCounter createEmptyPollsCounter() {
+    return new EmptyPollsCounter(notDataEmptyPolls);
+  }
+
+  public Duration getPollTimeout() {
+    return pollTimeout;
+  }
+
+  public Duration getPartitionPollTimeout() {
+    return partitionPollTimeout;
+  }
+
+  public PollingThrottler getPollingThrottler() {
+    return throttlerSupplier.get();
+  }
+}

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PollingThrottler.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PollingThrottler.java

@@ -1,8 +1,9 @@
-package com.provectus.kafka.ui.util;
+package com.provectus.kafka.ui.emitter;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.ConsumerRecords;

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ResultSizeLimiter.java → kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ResultSizeLimiter.java

@@ -1,4 +1,4 @@
-package com.provectus.kafka.ui.util;
+package com.provectus.kafka.ui.emitter;
 
 
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicInteger;

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -3,7 +3,6 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
-import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
@@ -22,8 +21,8 @@ public class TailingEmitter extends AbstractEmitter
   public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
   public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
                         ConsumerPosition consumerPosition,
                         ConsumerPosition consumerPosition,
                         ConsumerRecordDeserializer recordDeserializer,
                         ConsumerRecordDeserializer recordDeserializer,
-                        PollingThrottler throttler) {
-    super(recordDeserializer, throttler);
+                        PollingSettings pollingSettings) {
+    super(recordDeserializer, pollingSettings);
     this.consumerSupplier = consumerSupplier;
     this.consumerSupplier = consumerSupplier;
     this.consumerPosition = consumerPosition;
     this.consumerPosition = consumerPosition;
   }
   }

+ 2 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -2,14 +2,13 @@ package com.provectus.kafka.ui.model;
 
 
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
 import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
+import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
 import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
 import com.provectus.kafka.ui.service.masking.DataMasking;
 import com.provectus.kafka.ui.service.masking.DataMasking;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
-import com.provectus.kafka.ui.util.PollingThrottler;
 import com.provectus.kafka.ui.util.ReactiveFailover;
 import com.provectus.kafka.ui.util.ReactiveFailover;
 import java.util.Map;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Properties;
-import java.util.function.Supplier;
 import lombok.AccessLevel;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Builder;
@@ -28,7 +27,7 @@ public class KafkaCluster {
   private final boolean readOnly;
   private final boolean readOnly;
   private final MetricsConfig metricsConfig;
   private final MetricsConfig metricsConfig;
   private final DataMasking masking;
   private final DataMasking masking;
-  private final Supplier<PollingThrottler> throttler;
+  private final PollingSettings pollingSettings;
   private final ReactiveFailover<KafkaSrClientApi> schemaRegistryClient;
   private final ReactiveFailover<KafkaSrClientApi> schemaRegistryClient;
   private final Map<String, ReactiveFailover<KafkaConnectClientApi>> connectsClients;
   private final Map<String, ReactiveFailover<KafkaConnectClientApi>> connectsClients;
   private final ReactiveFailover<KsqlApiClient> ksqlClient;
   private final ReactiveFailover<KsqlApiClient> ksqlClient;

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java

@@ -14,7 +14,7 @@ public class ClustersStorage {
 
 
   public ClustersStorage(ClustersProperties properties, KafkaClusterFactory factory) {
   public ClustersStorage(ClustersProperties properties, KafkaClusterFactory factory) {
     var builder = ImmutableMap.<String, KafkaCluster>builder();
     var builder = ImmutableMap.<String, KafkaCluster>builder();
-    properties.getClusters().forEach(c -> builder.put(c.getName(), factory.create(c)));
+    properties.getClusters().forEach(c -> builder.put(c.getName(), factory.create(properties, c)));
     this.kafkaClusters = builder.build();
     this.kafkaClusters = builder.build();
   }
   }
 
 

+ 4 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
 import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
 import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
 import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
+import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
 import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
 import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
 import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
@@ -12,7 +13,6 @@ import com.provectus.kafka.ui.service.masking.DataMasking;
 import com.provectus.kafka.ui.sr.ApiClient;
 import com.provectus.kafka.ui.sr.ApiClient;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
 import com.provectus.kafka.ui.util.KafkaServicesValidation;
 import com.provectus.kafka.ui.util.KafkaServicesValidation;
-import com.provectus.kafka.ui.util.PollingThrottler;
 import com.provectus.kafka.ui.util.ReactiveFailover;
 import com.provectus.kafka.ui.util.ReactiveFailover;
 import com.provectus.kafka.ui.util.WebClientConfigurator;
 import com.provectus.kafka.ui.util.WebClientConfigurator;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -41,7 +41,8 @@ public class KafkaClusterFactory {
   @Value("${webclient.max-in-memory-buffer-size:20MB}")
   @Value("${webclient.max-in-memory-buffer-size:20MB}")
   private DataSize maxBuffSize;
   private DataSize maxBuffSize;
 
 
-  public KafkaCluster create(ClustersProperties.Cluster clusterProperties) {
+  public KafkaCluster create(ClustersProperties properties,
+                             ClustersProperties.Cluster clusterProperties) {
     KafkaCluster.KafkaClusterBuilder builder = KafkaCluster.builder();
     KafkaCluster.KafkaClusterBuilder builder = KafkaCluster.builder();
 
 
     builder.name(clusterProperties.getName());
     builder.name(clusterProperties.getName());
@@ -49,7 +50,7 @@ public class KafkaClusterFactory {
     builder.properties(convertProperties(clusterProperties.getProperties()));
     builder.properties(convertProperties(clusterProperties.getProperties()));
     builder.readOnly(clusterProperties.isReadOnly());
     builder.readOnly(clusterProperties.isReadOnly());
     builder.masking(DataMasking.create(clusterProperties.getMasking()));
     builder.masking(DataMasking.create(clusterProperties.getMasking()));
-    builder.throttler(PollingThrottler.throttlerSupplier(clusterProperties));
+    builder.pollingSettings(PollingSettings.create(clusterProperties, properties));
 
 
     if (schemaRegistryConfigured(clusterProperties)) {
     if (schemaRegistryConfigured(clusterProperties)) {
       builder.schemaRegistryClient(schemaRegistryClient(clusterProperties));
       builder.schemaRegistryClient(schemaRegistryClient(clusterProperties));

+ 4 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilterStats;
 import com.provectus.kafka.ui.emitter.MessageFilterStats;
 import com.provectus.kafka.ui.emitter.MessageFilters;
 import com.provectus.kafka.ui.emitter.MessageFilters;
+import com.provectus.kafka.ui.emitter.ResultSizeLimiter;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.emitter.TailingEmitter;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.TopicNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.exception.ValidationException;
@@ -17,7 +18,6 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
 import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
-import com.provectus.kafka.ui.util.ResultSizeLimiter;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -169,7 +169,7 @@ public class MessagesService {
           () -> consumerGroupService.createConsumer(cluster),
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           consumerPosition,
           recordDeserializer,
           recordDeserializer,
-          cluster.getThrottler().get()
+          cluster.getPollingSettings()
       );
       );
     } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
     } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
       emitter = new BackwardRecordEmitter(
       emitter = new BackwardRecordEmitter(
@@ -177,14 +177,14 @@ public class MessagesService {
           consumerPosition,
           consumerPosition,
           limit,
           limit,
           recordDeserializer,
           recordDeserializer,
-          cluster.getThrottler().get()
+          cluster.getPollingSettings()
       );
       );
     } else {
     } else {
       emitter = new TailingEmitter(
       emitter = new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           consumerPosition,
           recordDeserializer,
           recordDeserializer,
-          cluster.getThrottler().get()
+          cluster.getPollingSettings()
       );
       );
     }
     }
     MessageFilterStats filterStats = new MessageFilterStats();
     MessageFilterStats filterStats = new MessageFilterStats();

+ 10 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -1,14 +1,14 @@
 package com.provectus.kafka.ui.service.analyze;
 package com.provectus.kafka.ui.service.analyze;
 
 
-import static com.provectus.kafka.ui.emitter.AbstractEmitter.NO_MORE_DATA_EMPTY_POLLS_COUNT;
-
+import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
 import com.provectus.kafka.ui.emitter.OffsetsInfo;
 import com.provectus.kafka.ui.emitter.OffsetsInfo;
+import com.provectus.kafka.ui.emitter.PollingSettings;
+import com.provectus.kafka.ui.emitter.PollingThrottler;
 import com.provectus.kafka.ui.exception.TopicAnalysisException;
 import com.provectus.kafka.ui.exception.TopicAnalysisException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
 import com.provectus.kafka.ui.service.ConsumerGroupService;
 import com.provectus.kafka.ui.service.ConsumerGroupService;
 import com.provectus.kafka.ui.service.TopicsService;
 import com.provectus.kafka.ui.service.TopicsService;
-import com.provectus.kafka.ui.util.PollingThrottler;
 import java.io.Closeable;
 import java.io.Closeable;
 import java.time.Duration;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.Instant;
@@ -63,7 +63,7 @@ public class TopicAnalysisService {
     if (analysisTasksStore.isAnalysisInProgress(topicId)) {
     if (analysisTasksStore.isAnalysisInProgress(topicId)) {
       throw new TopicAnalysisException("Topic is already analyzing");
       throw new TopicAnalysisException("Topic is already analyzing");
     }
     }
-    var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getThrottler().get());
+    var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getPollingSettings());
     analysisTasksStore.registerNewTask(topicId, task);
     analysisTasksStore.registerNewTask(topicId, task);
     Schedulers.boundedElastic().schedule(task);
     Schedulers.boundedElastic().schedule(task);
   }
   }
@@ -83,6 +83,7 @@ public class TopicAnalysisService {
     private final TopicIdentity topicId;
     private final TopicIdentity topicId;
     private final int partitionsCnt;
     private final int partitionsCnt;
     private final long approxNumberOfMsgs;
     private final long approxNumberOfMsgs;
+    private final EmptyPollsCounter emptyPollsCounter;
     private final PollingThrottler throttler;
     private final PollingThrottler throttler;
 
 
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
@@ -91,7 +92,7 @@ public class TopicAnalysisService {
     private final KafkaConsumer<Bytes, Bytes> consumer;
     private final KafkaConsumer<Bytes, Bytes> consumer;
 
 
     AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
     AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
-                 long approxNumberOfMsgs, PollingThrottler throttler) {
+                 long approxNumberOfMsgs, PollingSettings pollingSettings) {
       this.topicId = topicId;
       this.topicId = topicId;
       this.approxNumberOfMsgs = approxNumberOfMsgs;
       this.approxNumberOfMsgs = approxNumberOfMsgs;
       this.partitionsCnt = partitionsCnt;
       this.partitionsCnt = partitionsCnt;
@@ -103,7 +104,8 @@ public class TopicAnalysisService {
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
           )
           )
       );
       );
-      this.throttler = throttler;
+      this.throttler = pollingSettings.getPollingThrottler();
+      this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
     }
     }
 
 
     @Override
     @Override
@@ -124,11 +126,10 @@ public class TopicAnalysisService {
         consumer.seekToBeginning(topicPartitions);
         consumer.seekToBeginning(topicPartitions);
 
 
         var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
         var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
-        for (int emptyPolls = 0; !offsetsInfo.assignedPartitionsFullyPolled()
-            && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) {
+        while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
           var polled = consumer.poll(Duration.ofSeconds(3));
           var polled = consumer.poll(Duration.ofSeconds(3));
           throttler.throttleAfterPoll(polled);
           throttler.throttleAfterPoll(polled);
-          emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
+          emptyPollsCounter.count(polled);
           polled.forEach(r -> {
           polled.forEach(r -> {
             totalStats.apply(r);
             totalStats.apply(r);
             partitionStats.get(r.partition()).apply(r);
             partitionStats.get(r.partition()).apply(r);

+ 11 - 11
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -9,6 +9,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
+import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
 import com.provectus.kafka.ui.producer.KafkaTestProducer;
@@ -16,7 +17,6 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
-import com.provectus.kafka.ui.util.PollingThrottler;
 import java.io.Serializable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -112,7 +112,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     var backwardEmitter = new BackwardRecordEmitter(
     var backwardEmitter = new BackwardRecordEmitter(
@@ -120,7 +120,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
         100,
         100,
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     StepVerifier.create(Flux.create(forwardEmitter))
     StepVerifier.create(Flux.create(forwardEmitter))
@@ -142,7 +142,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(BEGINNING, TOPIC, null),
         new ConsumerPosition(BEGINNING, TOPIC, null),
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     var backwardEmitter = new BackwardRecordEmitter(
     var backwardEmitter = new BackwardRecordEmitter(
@@ -150,7 +150,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         new ConsumerPosition(LATEST, TOPIC, null),
         new ConsumerPosition(LATEST, TOPIC, null),
         PARTITIONS * MSGS_PER_PARTITION,
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
     List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
@@ -171,7 +171,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     var backwardEmitter = new BackwardRecordEmitter(
     var backwardEmitter = new BackwardRecordEmitter(
@@ -179,7 +179,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         PARTITIONS * MSGS_PER_PARTITION,
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     var expectedValues = SENT_RECORDS.stream()
     var expectedValues = SENT_RECORDS.stream()
@@ -216,7 +216,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     var backwardEmitter = new BackwardRecordEmitter(
     var backwardEmitter = new BackwardRecordEmitter(
@@ -224,7 +224,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
         PARTITIONS * MSGS_PER_PARTITION,
         PARTITIONS * MSGS_PER_PARTITION,
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     var expectedValues = SENT_RECORDS.stream()
     var expectedValues = SENT_RECORDS.stream()
@@ -255,7 +255,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         numMessages,
         numMessages,
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     var expectedValues = SENT_RECORDS.stream()
     var expectedValues = SENT_RECORDS.stream()
@@ -281,7 +281,7 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         new ConsumerPosition(OFFSET, TOPIC, offsets),
         new ConsumerPosition(OFFSET, TOPIC, offsets),
         100,
         100,
         RECORD_DESERIALIZER,
         RECORD_DESERIALIZER,
-        PollingThrottler.noop()
+        PollingSettings.createDefault()
     );
     );
 
 
     expectEmitter(backwardEmitter,
     expectEmitter(backwardEmitter,

+ 1 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java

@@ -5,6 +5,7 @@ import static org.assertj.core.data.Percentage.withPercentage;
 
 
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.RateLimiter;
+import com.provectus.kafka.ui.emitter.PollingThrottler;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Test;

+ 9 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -3445,6 +3445,15 @@ components:
             kafka:
             kafka:
               type: object
               type: object
               properties:
               properties:
+                polling:
+                  type: object
+                  properties:
+                    pollTimeoutMs:
+                      type: integer
+                    partitionPollTimeout:
+                      type: integer
+                    noDataEmptyPolls:
+                      type: integer
                 clusters:
                 clusters:
                   type: array
                   type: array
                   items:
                   items: