浏览代码

Implement per-cluster polling rates limits (#2981)

* Polling throttling:
1. PollingThrottler class for per-cluster throttle managing
2. UI-publishing throttle for TailingEmitter
3. Elastic scheduler assigned for places where Serde interfaces are used

* int cast fix

* compilation fix

* test threshold fix

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Ilya Kuramshin 2 年之前
父节点
当前提交
ed61852e40
共有 19 个文件被更改,包括 233 次插入45 次删除
  1. 1 0
      README.md
  2. 1 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  3. 3 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
  4. 9 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
  5. 5 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  6. 8 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
  7. 7 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  8. 5 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
  9. 8 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  10. 3 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  11. 5 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java
  12. 27 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
  13. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java
  14. 7 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
  15. 29 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java
  16. 54 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PollingThrottler.java
  17. 1 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java
  18. 21 10
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
  19. 39 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java

+ 1 - 0
README.md

@@ -209,5 +209,6 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_METRICS_SSL`          |Enable SSL for Metrics? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
 |`KAFKA_CLUSTERS_0_METRICS_USERNAME` |Username for Metrics authentication
 |`KAFKA_CLUSTERS_0_METRICS_PASSWORD` |Password for Metrics authentication
+|`KAFKA_CLUSTERS_0_POLLING_THROTTLE_RATE` |Max traffic rate (bytes/sec) that kafka-ui allowed to reach when polling messages from the cluster. Default: 0 (not limited)
 |`TOPIC_RECREATE_DELAY_SECONDS` |Time delay between topic deletion and topic creation attempts for topic recreate functionality. Default: 1
 |`TOPIC_RECREATE_MAXRETRIES`  |Number of attempts of topic creation after topic deletion for topic recreate functionality. Default: 15

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -40,6 +40,7 @@ public class ClustersProperties {
     String defaultKeySerde;
     String defaultValueSerde;
     List<Masking> masking = new ArrayList<>();
+    long pollingThrottleRate = 0;
   }
 
   @Data

+ 3 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -30,6 +30,7 @@ import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 @RestController
 @RequiredArgsConstructor
@@ -135,6 +136,7 @@ public class MessagesController extends AbstractController implements MessagesAp
             .value(use == SerdeUsageDTO.SERIALIZE
                 ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
                 : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE))
-    ).map(ResponseEntity::ok);
+    ).subscribeOn(Schedulers.boundedElastic())
+        .map(ResponseEntity::ok);
   }
 }

+ 9 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -4,6 +4,7 @@ import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.time.Duration;
 import java.time.Instant;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -24,9 +25,11 @@ public abstract class AbstractEmitter {
 
   private final ConsumerRecordDeserializer recordDeserializer;
   private final ConsumingStats consumingStats = new ConsumingStats();
+  private final PollingThrottler throttler;
 
-  protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer) {
+  protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingThrottler throttler) {
     this.recordDeserializer = recordDeserializer;
+    this.throttler = throttler;
   }
 
   protected ConsumerRecords<Bytes, Bytes> poll(
@@ -39,7 +42,8 @@ public abstract class AbstractEmitter {
     Instant start = Instant.now();
     ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
     Instant finish = Instant.now();
-    sendConsuming(sink, records, Duration.between(start, finish).toMillis());
+    int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
+    throttler.throttleAfterPoll(polledBytes);
     return records;
   }
 
@@ -61,10 +65,10 @@ public abstract class AbstractEmitter {
     );
   }
 
-  protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink,
+  protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
                                ConsumerRecords<Bytes, Bytes> records,
                                long elapsed) {
-    consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
+    return consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
   }
 
   protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
@@ -78,4 +82,4 @@ public abstract class AbstractEmitter {
         .<Number>map(MessageFilterStats::getFilterApplyErrors)
         .orElse(0);
   }
-}
+}

+ 5 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,8 +35,9 @@ public class BackwardRecordEmitter
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
-      ConsumerRecordDeserializer recordDeserializer) {
-    super(recordDeserializer);
+      ConsumerRecordDeserializer recordDeserializer,
+      PollingThrottler throttler) {
+    super(recordDeserializer, throttler);
     this.consumerPosition = consumerPosition;
     this.messagesPerPage = messagesPerPage;
     this.consumerSupplier = consumerSupplier;
@@ -43,6 +45,7 @@ public class BackwardRecordEmitter
 
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting backward polling for {}", consumerPosition);
     try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
       sendPhase(sink, "Created consumer");
 

+ 8 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -2,9 +2,8 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
@@ -14,18 +13,15 @@ class ConsumingStats {
   private int records = 0;
   private long elapsed = 0;
 
-  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
+  /**
+   * returns bytes polled.
+   */
+  int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
                         ConsumerRecords<Bytes, Bytes> polledRecords,
                         long elapsed,
                         Number filterApplyErrors) {
-    for (ConsumerRecord<Bytes, Bytes> rec : polledRecords) {
-      for (Header header : rec.headers()) {
-        bytes +=
-            (header.key() != null ? header.key().getBytes().length : 0L)
-                + (header.value() != null ? header.value().length : 0L);
-      }
-      bytes += rec.serializedKeySize() + rec.serializedValueSize();
-    }
+    int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
+    bytes += polledBytes;
     this.records += polledRecords.count();
     this.elapsed += elapsed;
     sink.next(
@@ -33,6 +29,7 @@ class ConsumingStats {
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
             .consuming(createConsumingStats(sink, filterApplyErrors))
     );
+    return polledBytes;
   }
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, Number filterApplyErrors) {

+ 7 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -22,14 +23,16 @@ public class ForwardRecordEmitter
   public ForwardRecordEmitter(
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       ConsumerPosition position,
-      ConsumerRecordDeserializer recordDeserializer) {
-    super(recordDeserializer);
+      ConsumerRecordDeserializer recordDeserializer,
+      PollingThrottler throttler) {
+    super(recordDeserializer, throttler);
     this.position = position;
     this.consumerSupplier = consumerSupplier;
   }
 
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting forward polling for {}", position);
     try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
       sendPhase(sink, "Assigning partitions");
       var seekOperations = SeekOperations.create(consumer, position);
@@ -43,7 +46,7 @@ public class ForwardRecordEmitter
 
         sendPhase(sink, "Polling");
         ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
-        log.info("{} records polled", records.count());
+        log.debug("{} records polled", records.count());
         emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0;
 
         for (ConsumerRecord<Bytes, Bytes> msg : records) {
@@ -55,7 +58,7 @@ public class ForwardRecordEmitter
         }
       }
       sendFinishStatsAndCompleteSink(sink);
-      log.info("Polling finished");
+      log.debug("Polling finished");
     } catch (Exception e) {
       log.error("Error occurred while consuming records", e);
       sink.error(e);

+ 5 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.HashMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
@@ -20,16 +21,17 @@ public class TailingEmitter extends AbstractEmitter
 
   public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
                         ConsumerPosition consumerPosition,
-                        ConsumerRecordDeserializer recordDeserializer) {
-    super(recordDeserializer);
+                        ConsumerRecordDeserializer recordDeserializer,
+                        PollingThrottler throttler) {
+    super(recordDeserializer, throttler);
     this.consumerSupplier = consumerSupplier;
     this.consumerPosition = consumerPosition;
   }
 
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting tailing polling for {}", consumerPosition);
     try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
-      log.debug("Starting topic tailing");
       assignAndSeek(consumer);
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");

+ 8 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -36,11 +36,13 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
 import com.provectus.kafka.ui.service.masking.DataMasking;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.mapstruct.Mapper;
@@ -56,6 +58,7 @@ public interface ClusterMapper {
   @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry")
   @Mapping(target = "ksqldbServer", source = ".", qualifiedByName = "setKsqldbServer")
   @Mapping(target = "metricsConfig", source = "metrics")
+  @Mapping(target = "throttler", source = ".", qualifiedByName = "createClusterThrottler")
   KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
 
   ClusterStatsDTO toClusterStats(InternalClusterState clusterState);
@@ -156,6 +159,11 @@ public interface ClusterMapper {
     return internalKsqlServerBuilder.build();
   }
 
+  @Named("createClusterThrottler")
+  default Supplier<PollingThrottler> createClusterThrottler(ClustersProperties.Cluster cluster) {
+    return PollingThrottler.throttlerSupplier(cluster);
+  }
+
   TopicDetailsDTO toTopicDetails(InternalTopic topic);
 
   @Mapping(target = "isReadOnly", source = "readOnly")

+ 3 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -1,8 +1,10 @@
 package com.provectus.kafka.ui.model;
 
 import com.provectus.kafka.ui.service.masking.DataMasking;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.List;
 import java.util.Properties;
+import java.util.function.Supplier;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -23,4 +25,5 @@ public class KafkaCluster {
   private final boolean disableLogDirsCollection;
   private final MetricsConfig metricsConfig;
   private final DataMasking masking;
+  private final Supplier<PollingThrottler> throttler;
 }

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java

@@ -20,6 +20,11 @@ import javax.validation.ValidationException;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
+/**
+ * Class is responsible for managing serdes for kafka clusters.
+ * NOTE: Since Serde interface is designed to be blocking it is required that DeserializationService
+ * (and all Serde-related code) calls executed within special thread pool (boundedElastic).
+ */
 @Component
 public class DeserializationService implements Closeable {
 

+ 27 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service;
 
+import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilterStats;
@@ -46,6 +47,10 @@ import reactor.core.scheduler.Schedulers;
 @RequiredArgsConstructor
 @Slf4j
 public class MessagesService {
+
+  // limiting UI messages rate to 20/sec in tailing mode
+  public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
+
   private final AdminClientService adminClientService;
   private final DeserializationService deserializationService;
   private final ConsumerGroupService consumerGroupService;
@@ -83,6 +88,7 @@ public class MessagesService {
   public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
                                           CreateTopicMessageDTO msg) {
     return withExistingTopic(cluster, topic)
+        .publishOn(Schedulers.boundedElastic())
         .flatMap(desc -> sendMessageImpl(cluster, desc, msg));
   }
 
@@ -138,6 +144,7 @@ public class MessagesService {
                                                  @Nullable String valueSerde) {
     return withExistingTopic(cluster, topic)
         .flux()
+        .publishOn(Schedulers.boundedElastic())
         .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
             filterQueryType, limit, seekDirection, keySerde, valueSerde));
   }
@@ -159,20 +166,23 @@ public class MessagesService {
       emitter = new ForwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          recordDeserializer
+          recordDeserializer,
+          cluster.getThrottler().get()
       );
     } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
       emitter = new BackwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           limit,
-          recordDeserializer
+          recordDeserializer,
+          cluster.getThrottler().get()
       );
     } else {
       emitter = new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          recordDeserializer
+          recordDeserializer,
+          cluster.getThrottler().get()
       );
     }
     MessageFilterStats filterStats = new MessageFilterStats();
@@ -181,8 +191,7 @@ public class MessagesService {
         .filter(getMsgFilter(query, filterQueryType, filterStats))
         .map(getDataMasker(cluster, topic))
         .takeWhile(createTakeWhilePredicate(seekDirection, limit))
-        .subscribeOn(Schedulers.boundedElastic())
-        .share();
+        .map(throttleUiPublish(seekDirection));
   }
 
   private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
@@ -228,4 +237,17 @@ public class MessagesService {
     };
   }
 
+  private <T> UnaryOperator<T> throttleUiPublish(SeekDirectionDTO seekDirection) {
+    if (seekDirection == SeekDirectionDTO.TAILING) {
+      RateLimiter rateLimiter = RateLimiter.create(TAILING_UI_MESSAGE_THROTTLE_RATE);
+      return m -> {
+        rateLimiter.acquire(1);
+        return m;
+      };
+    }
+    // there is no need to throttle UI production rate for non-tailing modes, since max number of produced
+    // messages is limited for them (with page size)
+    return UnaryOperator.identity();
+  }
+
 }

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -51,7 +51,6 @@ import reactor.util.retry.Retry;
 public class TopicsService {
 
   private final AdminClientService adminClientService;
-  private final DeserializationService deserializationService;
   private final StatisticsCache statisticsCache;
   @Value("${topic.recreate.maxRetries:15}")
   private int recreateMaxRetries;

+ 7 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
 import com.provectus.kafka.ui.service.ConsumerGroupService;
 import com.provectus.kafka.ui.service.TopicsService;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.io.Closeable;
 import java.time.Duration;
 import java.time.Instant;
@@ -62,7 +63,7 @@ public class TopicAnalysisService {
     if (analysisTasksStore.isAnalysisInProgress(topicId)) {
       throw new TopicAnalysisException("Topic is already analyzing");
     }
-    var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs);
+    var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getThrottler().get());
     analysisTasksStore.registerNewTask(topicId, task);
     Schedulers.boundedElastic().schedule(task);
   }
@@ -82,13 +83,15 @@ public class TopicAnalysisService {
     private final TopicIdentity topicId;
     private final int partitionsCnt;
     private final long approxNumberOfMsgs;
+    private final PollingThrottler throttler;
 
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
     private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
 
     private final KafkaConsumer<Bytes, Bytes> consumer;
 
-    AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt, long approxNumberOfMsgs) {
+    AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
+                 long approxNumberOfMsgs, PollingThrottler throttler) {
       this.topicId = topicId;
       this.approxNumberOfMsgs = approxNumberOfMsgs;
       this.partitionsCnt = partitionsCnt;
@@ -100,6 +103,7 @@ public class TopicAnalysisService {
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
           )
       );
+      this.throttler = throttler;
     }
 
     @Override
@@ -123,6 +127,7 @@ public class TopicAnalysisService {
         for (int emptyPolls = 0; !offsetsInfo.assignedPartitionsFullyPolled()
             && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) {
           var polled = consumer.poll(Duration.ofSeconds(3));
+          throttler.throttleAfterPoll(polled);
           emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
           polled.forEach(r -> {
             totalStats.apply(r);

+ 29 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java

@@ -0,0 +1,29 @@
+package com.provectus.kafka.ui.util;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Bytes;
+
+public class ConsumerRecordsUtil {
+
+  public static int calculatePolledRecSize(ConsumerRecord<Bytes, Bytes> rec) {
+    int polledBytes = 0;
+    for (Header header : rec.headers()) {
+      polledBytes +=
+          (header.key() != null ? header.key().getBytes().length : 0)
+              + (header.value() != null ? header.value().length : 0);
+    }
+    polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
+    polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
+    return polledBytes;
+  }
+
+  public static int calculatePolledSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
+    int polledBytes = 0;
+    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
+      polledBytes += calculatePolledRecSize(rec);
+    }
+    return polledBytes;
+  }
+
+}

+ 54 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PollingThrottler.java

@@ -0,0 +1,54 @@
+package com.provectus.kafka.ui.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import java.util.Optional;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Bytes;
+
+@Slf4j
+public class PollingThrottler {
+
+  public static Supplier<PollingThrottler> throttlerSupplier(ClustersProperties.Cluster cluster) {
+    long rate = cluster.getPollingThrottleRate();
+    if (rate <= 0) {
+      return PollingThrottler::noop;
+    }
+    // RateLimiter instance should be shared across all created throttlers
+    var rateLimiter = RateLimiter.create(rate);
+    return () -> new PollingThrottler(cluster.getName(), rateLimiter);
+  }
+
+  private final String clusterName;
+  private final RateLimiter rateLimiter;
+  private boolean throttled;
+
+  @VisibleForTesting
+  public PollingThrottler(String clusterName, RateLimiter rateLimiter) {
+    this.clusterName = clusterName;
+    this.rateLimiter = rateLimiter;
+  }
+
+  public static PollingThrottler noop() {
+    return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
+  }
+
+  public void throttleAfterPoll(int polledBytes) {
+    if (polledBytes > 0) {
+      double sleptSeconds = rateLimiter.acquire(polledBytes);
+      if (!throttled && sleptSeconds > 0.0) {
+        throttled = true;
+        log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
+      }
+    }
+  }
+
+  public void throttleAfterPoll(ConsumerRecords<Bytes, Bytes> polled) {
+    throttleAfterPoll(ConsumerRecordsUtil.calculatePolledSize(polled));
+  }
+
+}

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java

@@ -143,4 +143,4 @@ class TailingEmitterTest extends AbstractIntegrationTest {
             .anyMatch(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.CONSUMING));
   }
 
-}
+}

+ 21 - 10
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -16,6 +16,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -110,14 +111,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
         100,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     StepVerifier.create(Flux.create(forwardEmitter))
@@ -138,14 +141,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, TOPIC, null),
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(LATEST, TOPIC, null),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
@@ -165,14 +170,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -208,14 +215,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -245,7 +254,8 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         numMessages,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -270,7 +280,8 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, offsets),
         100,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     expectEmitter(backwardEmitter,

+ 39 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java

@@ -0,0 +1,39 @@
+package com.provectus.kafka.ui.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.data.Percentage.withPercentage;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+class PollingThrottlerTest {
+
+  @Test
+  void testTrafficThrottled() {
+    var throttler = new PollingThrottler("test", RateLimiter.create(1000));
+    long polledBytes = 0;
+    var stopwatch = Stopwatch.createStarted();
+    while (stopwatch.elapsed(TimeUnit.SECONDS) < 1) {
+      int newPolled = ThreadLocalRandom.current().nextInt(10);
+      throttler.throttleAfterPoll(newPolled);
+      polledBytes += newPolled;
+    }
+    assertThat(polledBytes).isCloseTo(1000, withPercentage(3.0));
+  }
+
+  @Test
+  void noopThrottlerDoNotLimitPolling() {
+    var noopThrottler = PollingThrottler.noop();
+    var stopwatch = Stopwatch.createStarted();
+    // emulating that we polled 1GB
+    for (int i = 0; i < 1024; i++) {
+      noopThrottler.throttleAfterPoll(1024 * 1024);
+    }
+    // checking that were are able to "poll" 1GB in less than a second
+    assertThat(stopwatch.elapsed().getSeconds()).isLessThan(1);
+  }
+
+}