Ver Fonte

Merge remote-tracking branch 'origin/master' into dev

Roman Zabaluev há 2 anos atrás
pai
commit
e5d666e91f
33 ficheiros alterados com 372 adições e 93 exclusões
  1. 5 0
      README.md
  2. 36 0
      documentation/compose/kafka-ssl-components.yaml
  3. BIN
      documentation/compose/ssl/kafka.keystore.jks
  4. BIN
      documentation/compose/ssl/kafka.truststore.jks
  5. 1 1
      documentation/compose/ssl/san.cnf
  6. 2 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  7. 3 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
  8. 9 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java
  9. 5 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java
  10. 8 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java
  11. 7 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java
  12. 5 3
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java
  13. 15 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
  14. 6 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java
  15. 3 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java
  16. 1 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java
  17. 1 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java
  18. 5 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java
  19. 27 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java
  20. 37 33
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
  21. 0 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java
  22. 7 2
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java
  23. 19 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java
  24. 29 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java
  25. 54 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PollingThrottler.java
  26. 1 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java
  27. 8 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java
  28. 21 10
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java
  29. 39 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java
  30. 1 1
      kafka-ui-react-app/src/components/Connect/New/New.tsx
  31. 15 1
      kafka-ui-react-app/src/components/KsqlDb/Query/QueryForm/QueryForm.tsx
  32. 1 1
      kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.tsx
  33. 1 1
      pom.xml

+ 5 - 0
README.md

@@ -181,6 +181,10 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_KSQLDBSERVER` 	| KSQL DB server address
 |`KAFKA_CLUSTERS_0_KSQLDBSERVERAUTH_USERNAME` 	| KSQL DB server's basic authentication username
 |`KAFKA_CLUSTERS_0_KSQLDBSERVERAUTH_PASSWORD` 	| KSQL DB server's basic authentication password
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_KEYSTORELOCATION`   	|Path to the JKS keystore to communicate to KSQL DB
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_KEYSTOREPASSWORD`   	|Password of the JKS keystore for KSQL DB
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_TRUSTSTORELOCATION`   	|Path to the JKS truststore to communicate to KSQL DB
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_TRUSTSTOREPASSWORD`   	|Password of the JKS truststore for KSQL DB
 |`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` 	|Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY`   	|SchemaRegistry's address
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME`   	|SchemaRegistry's basic authentication username
@@ -205,5 +209,6 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_METRICS_SSL`          |Enable SSL for Metrics? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
 |`KAFKA_CLUSTERS_0_METRICS_USERNAME` |Username for Metrics authentication
 |`KAFKA_CLUSTERS_0_METRICS_PASSWORD` |Password for Metrics authentication
+|`KAFKA_CLUSTERS_0_POLLING_THROTTLE_RATE` |Max traffic rate (bytes/sec) that kafka-ui allowed to reach when polling messages from the cluster. Default: 0 (not limited)
 |`TOPIC_RECREATE_DELAY_SECONDS` |Time delay between topic deletion and topic creation attempts for topic recreate functionality. Default: 1
 |`TOPIC_RECREATE_MAXRETRIES`  |Number of attempts of topic creation after topic deletion for topic recreate functionality. Default: 15

+ 36 - 0
documentation/compose/kafka-ssl-components.yaml

@@ -10,6 +10,7 @@ services:
       - kafka0
       - schemaregistry0
       - kafka-connect0
+      - ksqldb0
     environment:
       KAFKA_CLUSTERS_0_NAME: local
       KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SSL
@@ -24,6 +25,11 @@ services:
       KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_KEYSTOREPASSWORD: "secret"
       KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTORELOCATION: /kafka.truststore.jks
       KAFKA_CLUSTERS_0_SCHEMAREGISTRYSSL_TRUSTSTOREPASSWORD: "secret"
+      KAFKA_CLUSTERS_0_KSQLDBSERVER: https://ksqldb0:8088
+      KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_KEYSTORELOCATION: /kafka.keystore.jks
+      KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_KEYSTOREPASSWORD: "secret"
+      KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_TRUSTSTORELOCATION: /kafka.truststore.jks
+      KAFKA_CLUSTERS_0_KSQLDBSERVERSSL_TRUSTSTOREPASSWORD: "secret"
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: local
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: https://kafka-connect0:8083
       KAFKA_CLUSTERS_0_KAFKACONNECT_0_KEYSTORELOCATION: /kafka.keystore.jks
@@ -142,3 +148,33 @@ services:
     volumes:
       - ./ssl/kafka.truststore.jks:/kafka.truststore.jks
       - ./ssl/kafka.keystore.jks:/kafka.keystore.jks
+
+  ksqldb0:
+    image: confluentinc/ksqldb-server:0.18.0
+    depends_on:
+      - kafka0
+      - kafka-connect0
+      - schemaregistry0
+    ports:
+      - 8088:8088
+    environment:
+      KSQL_CUB_KAFKA_TIMEOUT: 120
+      KSQL_LISTENERS: https://0.0.0.0:8088
+      KSQL_BOOTSTRAP_SERVERS: SSL://kafka0:29092
+      KSQL_SECURITY_PROTOCOL: SSL
+      KSQL_SSL_TRUSTSTORE_LOCATION: /kafka.truststore.jks
+      KSQL_SSL_TRUSTSTORE_PASSWORD: secret
+      KSQL_SSL_KEYSTORE_LOCATION: /kafka.keystore.jks
+      KSQL_SSL_KEYSTORE_PASSWORD: secret
+      KSQL_SSL_KEY_PASSWORD: secret
+      KSQL_SSL_CLIENT_AUTHENTICATION: REQUIRED
+      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
+      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
+      KSQL_KSQL_CONNECT_URL: https://kafka-connect0:8083
+      KSQL_KSQL_SCHEMA_REGISTRY_URL: https://schemaregistry0:8085
+      KSQL_KSQL_SERVICE_ID: my_ksql_1
+      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
+      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
+    volumes:
+      - ./ssl/kafka.truststore.jks:/kafka.truststore.jks
+      - ./ssl/kafka.keystore.jks:/kafka.keystore.jks

BIN
documentation/compose/ssl/kafka.keystore.jks


BIN
documentation/compose/ssl/kafka.truststore.jks


+ 1 - 1
documentation/compose/ssl/san.cnf

@@ -1,2 +1,2 @@
 [kafka]
-subjectAltName = DNS:kafka0,DNS:schemaregistry0,DNS:kafka-connect0
+subjectAltName = DNS:kafka0,DNS:schemaregistry0,DNS:kafka-connect0,DNS:ksqldb0

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -30,6 +30,7 @@ public class ClustersProperties {
     WebClientSsl schemaRegistrySsl;
     String ksqldbServer;
     KsqldbServerAuth ksqldbServerAuth;
+    WebClientSsl ksqldbServerSsl;
     List<ConnectCluster> kafkaConnect;
     MetricsConfigData metrics;
     Properties properties;
@@ -39,6 +40,7 @@ public class ClustersProperties {
     String defaultKeySerde;
     String defaultValueSerde;
     List<Masking> masking = new ArrayList<>();
+    long pollingThrottleRate = 0;
   }
 
   @Data

+ 3 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -30,6 +30,7 @@ import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 @RestController
 @RequiredArgsConstructor
@@ -135,6 +136,7 @@ public class MessagesController extends AbstractController implements MessagesAp
             .value(use == SerdeUsageDTO.SERIALIZE
                 ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
                 : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE))
-    ).map(ResponseEntity::ok);
+    ).subscribeOn(Schedulers.boundedElastic())
+        .map(ResponseEntity::ok);
   }
 }

+ 9 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java

@@ -4,6 +4,7 @@ import com.provectus.kafka.ui.model.TopicMessageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.time.Duration;
 import java.time.Instant;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -24,9 +25,11 @@ public abstract class AbstractEmitter {
 
   private final ConsumerRecordDeserializer recordDeserializer;
   private final ConsumingStats consumingStats = new ConsumingStats();
+  private final PollingThrottler throttler;
 
-  protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer) {
+  protected AbstractEmitter(ConsumerRecordDeserializer recordDeserializer, PollingThrottler throttler) {
     this.recordDeserializer = recordDeserializer;
+    this.throttler = throttler;
   }
 
   protected ConsumerRecords<Bytes, Bytes> poll(
@@ -39,7 +42,8 @@ public abstract class AbstractEmitter {
     Instant start = Instant.now();
     ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
     Instant finish = Instant.now();
-    sendConsuming(sink, records, Duration.between(start, finish).toMillis());
+    int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
+    throttler.throttleAfterPoll(polledBytes);
     return records;
   }
 
@@ -61,10 +65,10 @@ public abstract class AbstractEmitter {
     );
   }
 
-  protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink,
+  protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
                                ConsumerRecords<Bytes, Bytes> records,
                                long elapsed) {
-    consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
+    return consumingStats.sendConsumingEvt(sink, records, elapsed, getFilterApplyErrors(sink));
   }
 
   protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
@@ -78,4 +82,4 @@ public abstract class AbstractEmitter {
         .<Number>map(MessageFilterStats::getFilterApplyErrors)
         .orElse(0);
   }
-}
+}

+ 5 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardRecordEmitter.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,8 +35,9 @@ public class BackwardRecordEmitter
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       ConsumerPosition consumerPosition,
       int messagesPerPage,
-      ConsumerRecordDeserializer recordDeserializer) {
-    super(recordDeserializer);
+      ConsumerRecordDeserializer recordDeserializer,
+      PollingThrottler throttler) {
+    super(recordDeserializer, throttler);
     this.consumerPosition = consumerPosition;
     this.messagesPerPage = messagesPerPage;
     this.consumerSupplier = consumerSupplier;
@@ -43,6 +45,7 @@ public class BackwardRecordEmitter
 
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting backward polling for {}", consumerPosition);
     try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
       sendPhase(sink, "Created consumer");
 

+ 8 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java

@@ -2,9 +2,8 @@ package com.provectus.kafka.ui.emitter;
 
 import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.utils.Bytes;
 import reactor.core.publisher.FluxSink;
 
@@ -14,18 +13,15 @@ class ConsumingStats {
   private int records = 0;
   private long elapsed = 0;
 
-  void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
+  /**
+   * returns bytes polled.
+   */
+  int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
                         ConsumerRecords<Bytes, Bytes> polledRecords,
                         long elapsed,
                         Number filterApplyErrors) {
-    for (ConsumerRecord<Bytes, Bytes> rec : polledRecords) {
-      for (Header header : rec.headers()) {
-        bytes +=
-            (header.key() != null ? header.key().getBytes().length : 0L)
-                + (header.value() != null ? header.value().length : 0L);
-      }
-      bytes += rec.serializedKeySize() + rec.serializedValueSize();
-    }
+    int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
+    bytes += polledBytes;
     this.records += polledRecords.count();
     this.elapsed += elapsed;
     sink.next(
@@ -33,6 +29,7 @@ class ConsumingStats {
             .type(TopicMessageEventDTO.TypeEnum.CONSUMING)
             .consuming(createConsumingStats(sink, filterApplyErrors))
     );
+    return polledBytes;
   }
 
   void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, Number filterApplyErrors) {

+ 7 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardRecordEmitter.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -22,14 +23,16 @@ public class ForwardRecordEmitter
   public ForwardRecordEmitter(
       Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
       ConsumerPosition position,
-      ConsumerRecordDeserializer recordDeserializer) {
-    super(recordDeserializer);
+      ConsumerRecordDeserializer recordDeserializer,
+      PollingThrottler throttler) {
+    super(recordDeserializer, throttler);
     this.position = position;
     this.consumerSupplier = consumerSupplier;
   }
 
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting forward polling for {}", position);
     try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
       sendPhase(sink, "Assigning partitions");
       var seekOperations = SeekOperations.create(consumer, position);
@@ -43,7 +46,7 @@ public class ForwardRecordEmitter
 
         sendPhase(sink, "Polling");
         ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
-        log.info("{} records polled", records.count());
+        log.debug("{} records polled", records.count());
         emptyPolls = records.isEmpty() ? emptyPolls + 1 : 0;
 
         for (ConsumerRecord<Bytes, Bytes> msg : records) {
@@ -55,7 +58,7 @@ public class ForwardRecordEmitter
         }
       }
       sendFinishStatsAndCompleteSink(sink);
-      log.info("Polling finished");
+      log.debug("Polling finished");
     } catch (Exception e) {
       log.error("Error occurred while consuming records", e);
       sink.error(e);

+ 5 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.emitter;
 import com.provectus.kafka.ui.model.ConsumerPosition;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.HashMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
@@ -20,16 +21,17 @@ public class TailingEmitter extends AbstractEmitter
 
   public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
                         ConsumerPosition consumerPosition,
-                        ConsumerRecordDeserializer recordDeserializer) {
-    super(recordDeserializer);
+                        ConsumerRecordDeserializer recordDeserializer,
+                        PollingThrottler throttler) {
+    super(recordDeserializer, throttler);
     this.consumerSupplier = consumerSupplier;
     this.consumerPosition = consumerPosition;
   }
 
   @Override
   public void accept(FluxSink<TopicMessageEventDTO> sink) {
+    log.debug("Starting tailing polling for {}", consumerPosition);
     try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
-      log.debug("Starting topic tailing");
       assignAndSeek(consumer);
       while (!sink.isCancelled()) {
         sendPhase(sink, "Polling");

+ 15 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -36,11 +36,13 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
 import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
 import com.provectus.kafka.ui.service.masking.DataMasking;
 import com.provectus.kafka.ui.service.metrics.RawMetric;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.mapstruct.Mapper;
@@ -56,6 +58,7 @@ public interface ClusterMapper {
   @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry")
   @Mapping(target = "ksqldbServer", source = ".", qualifiedByName = "setKsqldbServer")
   @Mapping(target = "metricsConfig", source = "metrics")
+  @Mapping(target = "throttler", source = ".", qualifiedByName = "createClusterThrottler")
   KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
 
   ClusterStatsDTO toClusterStats(InternalClusterState clusterState);
@@ -146,9 +149,21 @@ public interface ClusterMapper {
       internalKsqlServerBuilder.password(clusterProperties.getKsqldbServerAuth().getPassword());
     }
 
+    if (clusterProperties.getKsqldbServerSsl() != null) {
+      internalKsqlServerBuilder.keystoreLocation(clusterProperties.getKsqldbServerSsl().getKeystoreLocation());
+      internalKsqlServerBuilder.keystorePassword(clusterProperties.getKsqldbServerSsl().getKeystorePassword());
+      internalKsqlServerBuilder.truststoreLocation(clusterProperties.getKsqldbServerSsl().getTruststoreLocation());
+      internalKsqlServerBuilder.truststorePassword(clusterProperties.getKsqldbServerSsl().getTruststorePassword());
+    }
+
     return internalKsqlServerBuilder.build();
   }
 
+  @Named("createClusterThrottler")
+  default Supplier<PollingThrottler> createClusterThrottler(ClustersProperties.Cluster cluster) {
+    return PollingThrottler.throttlerSupplier(cluster);
+  }
+
   TopicDetailsDTO toTopicDetails(InternalTopic topic);
 
   @Mapping(target = "isReadOnly", source = "readOnly")

+ 6 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java

@@ -5,10 +5,15 @@ import lombok.Data;
 import lombok.ToString;
 
 @Data
-@ToString(exclude = "password")
+@ToString(exclude = {"password", "keystorePassword", "truststorePassword"})
 @Builder(toBuilder = true)
 public class InternalKsqlServer {
   private final String url;
   private final String username;
   private final String password;
+
+  private final String keystoreLocation;
+  private final String truststoreLocation;
+  private final String keystorePassword;
+  private final String truststorePassword;
 }

+ 3 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -1,8 +1,10 @@
 package com.provectus.kafka.ui.model;
 
 import com.provectus.kafka.ui.service.masking.DataMasking;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.util.List;
 import java.util.Properties;
+import java.util.function.Supplier;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -23,4 +25,5 @@ public class KafkaCluster {
   private final boolean disableLogDirsCollection;
   private final MetricsConfig metricsConfig;
   private final DataMasking masking;
+  private final Supplier<PollingThrottler> throttler;
 }

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java

@@ -37,7 +37,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
       properties
           .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
       properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
-      properties.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "kafka-ui-app");
+      properties.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "kafka-ui-admin-client-" + System.currentTimeMillis());
       return AdminClient.create(properties);
     })
         .flatMap(ReactiveAdminClient::create)

+ 1 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -11,7 +11,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 import java.util.function.ToIntFunction;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -197,7 +196,7 @@ public class ConsumerGroupService {
                                                     Map<String, Object> properties) {
     Properties props = new Properties();
     props.putAll(cluster.getProperties());
-    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID());
+    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/DeserializationService.java

@@ -20,6 +20,11 @@ import javax.validation.ValidationException;
 import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
+/**
+ * Class is responsible for managing serdes for kafka clusters.
+ * NOTE: Since Serde interface is designed to be blocking it is required that DeserializationService
+ * (and all Serde-related code) calls executed within special thread pool (boundedElastic).
+ */
 @Component
 public class DeserializationService implements Closeable {
 

+ 27 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service;
 
+import com.google.common.util.concurrent.RateLimiter;
 import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
 import com.provectus.kafka.ui.emitter.MessageFilterStats;
@@ -46,6 +47,10 @@ import reactor.core.scheduler.Schedulers;
 @RequiredArgsConstructor
 @Slf4j
 public class MessagesService {
+
+  // limiting UI messages rate to 20/sec in tailing mode
+  public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
+
   private final AdminClientService adminClientService;
   private final DeserializationService deserializationService;
   private final ConsumerGroupService consumerGroupService;
@@ -83,6 +88,7 @@ public class MessagesService {
   public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
                                           CreateTopicMessageDTO msg) {
     return withExistingTopic(cluster, topic)
+        .publishOn(Schedulers.boundedElastic())
         .flatMap(desc -> sendMessageImpl(cluster, desc, msg));
   }
 
@@ -138,6 +144,7 @@ public class MessagesService {
                                                  @Nullable String valueSerde) {
     return withExistingTopic(cluster, topic)
         .flux()
+        .publishOn(Schedulers.boundedElastic())
         .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
             filterQueryType, limit, seekDirection, keySerde, valueSerde));
   }
@@ -159,20 +166,23 @@ public class MessagesService {
       emitter = new ForwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          recordDeserializer
+          recordDeserializer,
+          cluster.getThrottler().get()
       );
     } else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
       emitter = new BackwardRecordEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
           limit,
-          recordDeserializer
+          recordDeserializer,
+          cluster.getThrottler().get()
       );
     } else {
       emitter = new TailingEmitter(
           () -> consumerGroupService.createConsumer(cluster),
           consumerPosition,
-          recordDeserializer
+          recordDeserializer,
+          cluster.getThrottler().get()
       );
     }
     MessageFilterStats filterStats = new MessageFilterStats();
@@ -181,8 +191,7 @@ public class MessagesService {
         .filter(getMsgFilter(query, filterQueryType, filterStats))
         .map(getDataMasker(cluster, topic))
         .takeWhile(createTakeWhilePredicate(seekDirection, limit))
-        .subscribeOn(Schedulers.boundedElastic())
-        .share();
+        .map(throttleUiPublish(seekDirection));
   }
 
   private Predicate<TopicMessageEventDTO> createTakeWhilePredicate(
@@ -228,4 +237,17 @@ public class MessagesService {
     };
   }
 
+  private <T> UnaryOperator<T> throttleUiPublish(SeekDirectionDTO seekDirection) {
+    if (seekDirection == SeekDirectionDTO.TAILING) {
+      RateLimiter rateLimiter = RateLimiter.create(TAILING_UI_MESSAGE_THROTTLE_RATE);
+      return m -> {
+        rateLimiter.acquire(1);
+        return m;
+      };
+    }
+    // there is no need to throttle UI production rate for non-tailing modes, since max number of produced
+    // messages is limited for them (with page size)
+    return UnaryOperator.identity();
+  }
+
 }

+ 37 - 33
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

@@ -25,9 +25,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Predicate;
@@ -260,37 +258,32 @@ public class ReactiveAdminClient implements Closeable {
    * such topics in resulting map.
    * <p/>
    * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
-   * finished with <code>clazz</code> exception.
+   * finished with <code>clazz</code> exception and empty Monos.
    */
   static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
-                                                           Class<? extends KafkaException> clazz) {
+                                                          Class<? extends KafkaException> clazz) {
     if (values.isEmpty()) {
       return Mono.just(Map.of());
     }
 
-    List<Mono<Tuple2<K, V>>> monos = values.entrySet().stream()
-        .map(e -> toMono(e.getValue()).map(r -> Tuples.of(e.getKey(), r)))
-        .collect(toList());
-
-    return Mono.create(sink -> {
-      var finishedCnt = new AtomicInteger();
-      var results = new ConcurrentHashMap<K, V>();
-      monos.forEach(mono -> mono.subscribe(
-          r -> {
-            results.put(r.getT1(), r.getT2());
-            if (finishedCnt.incrementAndGet() == monos.size()) {
-              sink.success(results);
-            }
-          },
-          th -> {
-            if (!th.getClass().isAssignableFrom(clazz)) {
-              sink.error(th);
-            } else if (finishedCnt.incrementAndGet() == monos.size()) {
-              sink.success(results);
-            }
-          }
-      ));
-    });
+    List<Mono<Tuple2<K, Optional<V>>>> monos = values.entrySet().stream()
+        .map(e ->
+            toMono(e.getValue())
+                .map(r -> Tuples.of(e.getKey(), Optional.of(r)))
+                .defaultIfEmpty(Tuples.of(e.getKey(), Optional.empty())) //tracking empty Monos
+                .onErrorResume(
+                    // tracking Monos with suppressible error
+                    th -> th.getClass().isAssignableFrom(clazz),
+                    th -> Mono.just(Tuples.of(e.getKey(), Optional.empty()))))
+        .toList();
+
+    return Mono.zip(
+        monos,
+        resultsArr -> Stream.of(resultsArr)
+            .map(obj -> (Tuple2<K, Optional<V>>) obj)
+            .filter(t -> t.getT2().isPresent()) //skipping empty & suppressible-errors
+            .collect(Collectors.toMap(Tuple2::getT1, t -> t.getT2().get()))
+    );
   }
 
   public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
@@ -305,6 +298,10 @@ public class ReactiveAdminClient implements Closeable {
   }
 
   public Mono<ClusterDescription> describeCluster() {
+    return describeClusterImpl(client);
+  }
+
+  private static Mono<ClusterDescription> describeClusterImpl(AdminClient client) {
     var r = client.describeCluster();
     var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations());
     return Mono.create(sink -> all.whenComplete((res, ex) -> {
@@ -328,15 +325,20 @@ public class ReactiveAdminClient implements Closeable {
   }
 
   private static Mono<String> getClusterVersion(AdminClient client) {
-    return toMono(client.describeCluster().controller())
-        .flatMap(controller -> loadBrokersConfig(client, List.of(controller.id())))
-        .map(configs -> configs.values().stream()
+    return describeClusterImpl(client)
+        // choosing node from which we will get configs (starting with controller)
+        .flatMap(descr -> descr.controller != null
+            ? Mono.just(descr.controller)
+            : Mono.justOrEmpty(descr.nodes.stream().findFirst())
+        )
+        .flatMap(node -> loadBrokersConfig(client, List.of(node.id())))
+        .flatMap(configs -> configs.values().stream()
             .flatMap(Collection::stream)
             .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
             .findFirst()
-            .map(ConfigEntry::value)
-            .orElse("1.0-UNKNOWN")
-        );
+            .map(configEntry -> Mono.just(configEntry.value()))
+            .orElse(Mono.empty()))
+        .switchIfEmpty(Mono.just("1.0-UNKNOWN"));
   }
 
   public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
@@ -419,6 +421,7 @@ public class ReactiveAdminClient implements Closeable {
 
   /**
    * List offset for the topic's partitions and OffsetSpec.
+   *
    * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
    *                            false - skip partitions with no leader
    */
@@ -432,6 +435,7 @@ public class ReactiveAdminClient implements Closeable {
 
   /**
    * List offset for the specified partitions and OffsetSpec.
+   *
    * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
    *                            false - skip partitions with no leader
    */

+ 0 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -51,7 +51,6 @@ import reactor.util.retry.Retry;
 public class TopicsService {
 
   private final AdminClientService adminClientService;
-  private final DeserializationService deserializationService;
   private final StatisticsCache statisticsCache;
   @Value("${topic.recreate.maxRetries:15}")
   private int recreateMaxRetries;

+ 7 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java

@@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.TopicAnalysisDTO;
 import com.provectus.kafka.ui.service.ConsumerGroupService;
 import com.provectus.kafka.ui.service.TopicsService;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.io.Closeable;
 import java.time.Duration;
 import java.time.Instant;
@@ -62,7 +63,7 @@ public class TopicAnalysisService {
     if (analysisTasksStore.isAnalysisInProgress(topicId)) {
       throw new TopicAnalysisException("Topic is already analyzing");
     }
-    var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs);
+    var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getThrottler().get());
     analysisTasksStore.registerNewTask(topicId, task);
     Schedulers.boundedElastic().schedule(task);
   }
@@ -82,13 +83,15 @@ public class TopicAnalysisService {
     private final TopicIdentity topicId;
     private final int partitionsCnt;
     private final long approxNumberOfMsgs;
+    private final PollingThrottler throttler;
 
     private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
     private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
 
     private final KafkaConsumer<Bytes, Bytes> consumer;
 
-    AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt, long approxNumberOfMsgs) {
+    AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
+                 long approxNumberOfMsgs, PollingThrottler throttler) {
       this.topicId = topicId;
       this.approxNumberOfMsgs = approxNumberOfMsgs;
       this.partitionsCnt = partitionsCnt;
@@ -100,6 +103,7 @@ public class TopicAnalysisService {
               ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
           )
       );
+      this.throttler = throttler;
     }
 
     @Override
@@ -123,6 +127,7 @@ public class TopicAnalysisService {
         for (int emptyPolls = 0; !offsetsInfo.assignedPartitionsFullyPolled()
             && emptyPolls < NO_MORE_DATA_EMPTY_POLLS_COUNT;) {
           var polled = consumer.poll(Duration.ofSeconds(3));
+          throttler.throttleAfterPoll(polled);
           emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
           polled.forEach(r -> {
             totalStats.apply(r);

+ 19 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java

@@ -11,6 +11,7 @@ import com.fasterxml.jackson.databind.node.TextNode;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.ksql.response.ResponseParser;
+import com.provectus.kafka.ui.util.SecuredWebClient;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -79,11 +80,24 @@ public class KsqlApiClient {
                       MimeTypeUtils.APPLICATION_OCTET_STREAM));
         })
         .build();
-    return WebClient.builder()
-        .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
-        .defaultHeaders(httpHeaders -> setBasicAuthIfEnabled(httpHeaders, cluster))
-        .exchangeStrategies(exchangeStrategies)
-        .build();
+
+    try {
+      WebClient.Builder securedWebClient = SecuredWebClient.configure(
+          cluster.getKsqldbServer().getKeystoreLocation(),
+          cluster.getKsqldbServer().getKeystorePassword(),
+          cluster.getKsqldbServer().getTruststoreLocation(),
+          cluster.getKsqldbServer().getTruststorePassword()
+      );
+
+      return securedWebClient
+          .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
+          .defaultHeaders(httpHeaders -> setBasicAuthIfEnabled(httpHeaders, cluster))
+          .exchangeStrategies(exchangeStrategies)
+          .build();
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "cannot create TLS configuration for ksqlDB in cluster " + cluster.getName(), e);
+    }
   }
 
   public static void setBasicAuthIfEnabled(HttpHeaders headers, KafkaCluster cluster) {

+ 29 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ConsumerRecordsUtil.java

@@ -0,0 +1,29 @@
+package com.provectus.kafka.ui.util;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.utils.Bytes;
+
+public class ConsumerRecordsUtil {
+
+  public static int calculatePolledRecSize(ConsumerRecord<Bytes, Bytes> rec) {
+    int polledBytes = 0;
+    for (Header header : rec.headers()) {
+      polledBytes +=
+          (header.key() != null ? header.key().getBytes().length : 0)
+              + (header.value() != null ? header.value().length : 0);
+    }
+    polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
+    polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
+    return polledBytes;
+  }
+
+  public static int calculatePolledSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
+    int polledBytes = 0;
+    for (ConsumerRecord<Bytes, Bytes> rec : recs) {
+      polledBytes += calculatePolledRecSize(rec);
+    }
+    return polledBytes;
+  }
+
+}

+ 54 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/PollingThrottler.java

@@ -0,0 +1,54 @@
+package com.provectus.kafka.ui.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import java.util.Optional;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Bytes;
+
+@Slf4j
+public class PollingThrottler {
+
+  public static Supplier<PollingThrottler> throttlerSupplier(ClustersProperties.Cluster cluster) {
+    long rate = cluster.getPollingThrottleRate();
+    if (rate <= 0) {
+      return PollingThrottler::noop;
+    }
+    // RateLimiter instance should be shared across all created throttlers
+    var rateLimiter = RateLimiter.create(rate);
+    return () -> new PollingThrottler(cluster.getName(), rateLimiter);
+  }
+
+  private final String clusterName;
+  private final RateLimiter rateLimiter;
+  private boolean throttled;
+
+  @VisibleForTesting
+  public PollingThrottler(String clusterName, RateLimiter rateLimiter) {
+    this.clusterName = clusterName;
+    this.rateLimiter = rateLimiter;
+  }
+
+  public static PollingThrottler noop() {
+    return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
+  }
+
+  public void throttleAfterPoll(int polledBytes) {
+    if (polledBytes > 0) {
+      double sleptSeconds = rateLimiter.acquire(polledBytes);
+      if (!throttled && sleptSeconds > 0.0) {
+        throttled = true;
+        log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
+      }
+    }
+  }
+
+  public void throttleAfterPoll(ConsumerRecords<Bytes, Bytes> polled) {
+    throttleAfterPoll(ConsumerRecordsUtil.calculatePolledSize(polled));
+  }
+
+}

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java

@@ -143,4 +143,4 @@ class TailingEmitterTest extends AbstractIntegrationTest {
             .anyMatch(msg -> msg.getType() == TopicMessageEventDTO.TypeEnum.CONSUMING));
   }
 
-}
+}

+ 8 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java

@@ -104,7 +104,14 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
     var okFuture = new KafkaFutureImpl<String>();
     okFuture.complete("done");
 
-    Map<String, KafkaFuture<String>> arg = Map.of("failure", failedFuture, "ok", okFuture);
+    var emptyFuture = new KafkaFutureImpl<String>();
+    emptyFuture.complete(null);
+
+    Map<String, KafkaFuture<String>> arg = Map.of(
+        "failure", failedFuture,
+        "ok", okFuture,
+        "empty", emptyFuture
+    );
     StepVerifier.create(toMonoWithExceptionFilter(arg, UnknownTopicOrPartitionException.class))
         .assertNext(result -> assertThat(result).hasSize(1).containsEntry("ok", "done"))
         .verifyComplete();

+ 21 - 10
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java

@@ -16,6 +16,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
 import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
 import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
+import com.provectus.kafka.ui.util.PollingThrottler;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -110,14 +111,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null),
         100,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     StepVerifier.create(Flux.create(forwardEmitter))
@@ -138,14 +141,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(BEGINNING, TOPIC, null),
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(LATEST, TOPIC, null),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     List<String> expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList());
@@ -165,14 +170,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -208,14 +215,16 @@ class RecordEmitterTest extends AbstractIntegrationTest {
     var forwardEmitter = new ForwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var backwardEmitter = new BackwardRecordEmitter(
         this::createConsumer,
         new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps),
         PARTITIONS * MSGS_PER_PARTITION,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -245,7 +254,8 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, targetOffsets),
         numMessages,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     var expectedValues = SENT_RECORDS.stream()
@@ -270,7 +280,8 @@ class RecordEmitterTest extends AbstractIntegrationTest {
         this::createConsumer,
         new ConsumerPosition(OFFSET, TOPIC, offsets),
         100,
-        RECORD_DESERIALIZER
+        RECORD_DESERIALIZER,
+        PollingThrottler.noop()
     );
 
     expectEmitter(backwardEmitter,

+ 39 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/PollingThrottlerTest.java

@@ -0,0 +1,39 @@
+package com.provectus.kafka.ui.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.data.Percentage.withPercentage;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+class PollingThrottlerTest {
+
+  @Test
+  void testTrafficThrottled() {
+    var throttler = new PollingThrottler("test", RateLimiter.create(1000));
+    long polledBytes = 0;
+    var stopwatch = Stopwatch.createStarted();
+    while (stopwatch.elapsed(TimeUnit.SECONDS) < 1) {
+      int newPolled = ThreadLocalRandom.current().nextInt(10);
+      throttler.throttleAfterPoll(newPolled);
+      polledBytes += newPolled;
+    }
+    assertThat(polledBytes).isCloseTo(1000, withPercentage(3.0));
+  }
+
+  @Test
+  void noopThrottlerDoNotLimitPolling() {
+    var noopThrottler = PollingThrottler.noop();
+    var stopwatch = Stopwatch.createStarted();
+    // emulating that we polled 1GB
+    for (int i = 0; i < 1024; i++) {
+      noopThrottler.throttleAfterPoll(1024 * 1024);
+    }
+    // checking that were are able to "poll" 1GB in less than a second
+    assertThat(stopwatch.elapsed().getSeconds()).isLessThan(1);
+  }
+
+}

+ 1 - 1
kafka-ui-react-app/src/components/Connect/New/New.tsx

@@ -42,7 +42,7 @@ const New: React.FC = () => {
   const mutation = useCreateConnector(clusterName);
 
   const methods = useForm<FormValues>({
-    mode: 'onTouched',
+    mode: 'all',
     resolver: yupResolver(validationSchema),
     defaultValues: {
       connectName: get(connects, '0.name', ''),

+ 15 - 1
kafka-ui-react-app/src/components/KsqlDb/Query/QueryForm/QueryForm.tsx

@@ -1,4 +1,4 @@
-import React, { useCallback } from 'react';
+import React, { useCallback, useRef } from 'react';
 import { FormError } from 'components/common/Input/Input.styled';
 import { ErrorMessage } from '@hookform/error-message';
 import { useForm, Controller, useFieldArray } from 'react-hook-form';
@@ -8,6 +8,7 @@ import CloseIcon from 'components/common/Icons/CloseIcon';
 import { yupResolver } from '@hookform/resolvers/yup';
 import yup from 'lib/yupExtended';
 import PlusIcon from 'components/common/Icons/PlusIcon';
+import ReactAce from 'react-ace/lib/ace';
 
 import * as S from './QueryForm.styled';
 
@@ -75,6 +76,17 @@ const QueryForm: React.FC<Props> = ({
     }
   }, []);
 
+  const inputRef = useRef<ReactAce>(null);
+
+  const handleFocus = () => {
+    // eslint-disable-next-line @typescript-eslint/no-explicit-any
+    const textInput = inputRef?.current?.editor?.textInput as any;
+
+    if (textInput) {
+      textInput.focus();
+    }
+  };
+
   return (
     <S.QueryWrapper>
       <form onSubmit={handleSubmit(submitHandler)}>
@@ -111,6 +123,7 @@ const QueryForm: React.FC<Props> = ({
                     },
                   ]}
                   readOnly={fetching}
+                  ref={inputRef}
                 />
               )}
             />
@@ -188,6 +201,7 @@ const QueryForm: React.FC<Props> = ({
             buttonSize="M"
             type="submit"
             disabled={fetching}
+            onClick={handleFocus}
           >
             Execute
           </Button>

+ 1 - 1
kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.tsx

@@ -428,7 +428,7 @@ const Filters: React.FC<FiltersProps> = ({
                   onChange={(date: Date | null) => setTimestamp(date)}
                   showTimeInput
                   timeInputLabel="Time:"
-                  dateFormat="MMMM d, yyyy HH:mm"
+                  dateFormat="MMM d, yyyy HH:mm"
                   placeholderText="Select timestamp"
                   disabled={isTailing}
                 />

+ 1 - 1
pom.xml

@@ -25,7 +25,7 @@
         <apache.commons.version>2.11.1</apache.commons.version>
         <assertj.version>3.19.0</assertj.version>
         <avro.version>1.11.1</avro.version>
-        <byte-buddy.version>1.12.18</byte-buddy.version>
+        <byte-buddy.version>1.12.19</byte-buddy.version>
         <confluent.version>7.3.0</confluent.version>
         <datasketches-java.version>3.1.0</datasketches-java.version>
         <groovy.version>3.0.13</groovy.version>