Browse Source

Config wizard BE: Add remaining cluster properties to wizard API (#3523)

* Important @Value annotated properties moved to typed classes
---------

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Co-authored-by: VladSenyuta <vlad.senyuta@gmail.com>
Ilya Kuramshin 2 năm trước cách đây
mục cha
commit
40c198f0fc

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -27,6 +27,8 @@ public class ClustersProperties {
 
 
   String internalTopicPrefix;
   String internalTopicPrefix;
 
 
+  Integer adminClientTimeout;
+
   PollingProperties polling = new PollingProperties();
   PollingProperties polling = new PollingProperties();
 
 
   @Data
   @Data

+ 1 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java

@@ -5,7 +5,6 @@ import java.util.Map;
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
 import org.openapitools.jackson.nullable.JsonNullableModule;
 import org.openapitools.jackson.nullable.JsonNullableModule;
 import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.beans.factory.ObjectProvider;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.boot.autoconfigure.web.ServerProperties;
 import org.springframework.boot.autoconfigure.web.reactive.WebFluxProperties;
 import org.springframework.boot.autoconfigure.web.reactive.WebFluxProperties;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContext;
@@ -15,8 +14,6 @@ import org.springframework.http.server.reactive.ContextPathCompositeHandler;
 import org.springframework.http.server.reactive.HttpHandler;
 import org.springframework.http.server.reactive.HttpHandler;
 import org.springframework.jmx.export.MBeanExporter;
 import org.springframework.jmx.export.MBeanExporter;
 import org.springframework.util.StringUtils;
 import org.springframework.util.StringUtils;
-import org.springframework.util.unit.DataSize;
-import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
 import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
 
 
 @Configuration
 @Configuration
@@ -52,14 +49,7 @@ public class Config {
   }
   }
 
 
   @Bean
   @Bean
-  public WebClient webClient(
-      @Value("${webclient.max-in-memory-buffer-size:20MB}") DataSize maxBuffSize) {
-    return WebClient.builder()
-        .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
-        .build();
-  }
-
-  @Bean
+  // will be used by webflux json mapping
   public JsonNullableModule jsonNullableModule() {
   public JsonNullableModule jsonNullableModule() {
     return new JsonNullableModule();
     return new JsonNullableModule();
   }
   }

+ 33 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/WebclientProperties.java

@@ -0,0 +1,33 @@
+package com.provectus.kafka.ui.config;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import java.beans.Transient;
+import javax.annotation.PostConstruct;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.util.unit.DataSize;
+
+@Configuration
+@ConfigurationProperties("webclient")
+@Data
+public class WebclientProperties {
+
+  String maxInMemoryBufferSize;
+
+  @PostConstruct
+  public void validate() {
+    validateAndSetDefaultBufferSize();
+  }
+
+  private void validateAndSetDefaultBufferSize() {
+    if (maxInMemoryBufferSize != null) {
+      try {
+        DataSize.parse(maxInMemoryBufferSize);
+      } catch (Exception e) {
+        throw new ValidationException("Invalid format for webclient.maxInMemoryBufferSize");
+      }
+    }
+  }
+
+}

+ 11 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java

@@ -1,33 +1,36 @@
 package com.provectus.kafka.ui.service;
 package com.provectus.kafka.ui.service;
 
 
+import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import com.provectus.kafka.ui.util.SslPropertiesUtil;
 import java.io.Closeable;
 import java.io.Closeable;
 import java.time.Instant;
 import java.time.Instant;
 import java.util.Map;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Mono;
 
 
 @Service
 @Service
-@RequiredArgsConstructor
 @Slf4j
 @Slf4j
 public class AdminClientServiceImpl implements AdminClientService, Closeable {
 public class AdminClientServiceImpl implements AdminClientService, Closeable {
 
 
+  private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000;
+
   private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong();
   private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong();
 
 
   private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
   private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
-  @Setter // used in tests
-  @Value("${kafka.admin-client-timeout:30000}")
-  private int clientTimeout;
+  private final int clientTimeout;
+
+  public AdminClientServiceImpl(ClustersProperties clustersProperties) {
+    this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
+        .orElse(DEFAULT_CLIENT_TIMEOUT_MS);
+  }
 
 
   @Override
   @Override
   public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
   public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
@@ -42,7 +45,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
       SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
       SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
       properties.putAll(cluster.getProperties());
       properties.putAll(cluster.getProperties());
       properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
       properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
-      properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
+      properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
       properties.putIfAbsent(
       properties.putIfAbsent(
           AdminClientConfig.CLIENT_ID_CONFIG,
           AdminClientConfig.CLIENT_ID_CONFIG,
           "kafka-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()
           "kafka-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()

+ 13 - 8
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaClusterFactory.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
 
 
 import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
 import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.config.WebclientProperties;
 import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
 import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.emitter.PollingSettings;
 import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
 import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
@@ -22,9 +23,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Properties;
 import java.util.stream.Stream;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
 import org.springframework.util.unit.DataSize;
 import org.springframework.util.unit.DataSize;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClient;
@@ -34,12 +33,18 @@ import reactor.util.function.Tuple2;
 import reactor.util.function.Tuples;
 import reactor.util.function.Tuples;
 
 
 @Service
 @Service
-@RequiredArgsConstructor
 @Slf4j
 @Slf4j
 public class KafkaClusterFactory {
 public class KafkaClusterFactory {
 
 
-  @Value("${webclient.max-in-memory-buffer-size:20MB}")
-  private DataSize maxBuffSize;
+  private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
+
+  private final DataSize webClientMaxBuffSize;
+
+  public KafkaClusterFactory(WebclientProperties webclientProperties) {
+    this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
+        .map(DataSize::parse)
+        .orElse(DEFAULT_WEBCLIENT_BUFFER);
+  }
 
 
   public KafkaCluster create(ClustersProperties properties,
   public KafkaCluster create(ClustersProperties properties,
                              ClustersProperties.Cluster clusterProperties) {
                              ClustersProperties.Cluster clusterProperties) {
@@ -140,7 +145,7 @@ public class KafkaClusterFactory {
         url -> new RetryingKafkaConnectClient(
         url -> new RetryingKafkaConnectClient(
             connectCluster.toBuilder().address(url).build(),
             connectCluster.toBuilder().address(url).build(),
             cluster.getSsl(),
             cluster.getSsl(),
-            maxBuffSize
+            webClientMaxBuffSize
         ),
         ),
         ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
         ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
         "No alive connect instances available",
         "No alive connect instances available",
@@ -158,7 +163,7 @@ public class KafkaClusterFactory {
     WebClient webClient = new WebClientConfigurator()
     WebClient webClient = new WebClientConfigurator()
         .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
         .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
         .configureBasicAuth(auth.getUsername(), auth.getPassword())
         .configureBasicAuth(auth.getUsername(), auth.getPassword())
-        .configureBufferSize(maxBuffSize)
+        .configureBufferSize(webClientMaxBuffSize)
         .build();
         .build();
     return ReactiveFailover.create(
     return ReactiveFailover.create(
         parseUrlList(clusterProperties.getSchemaRegistry()),
         parseUrlList(clusterProperties.getSchemaRegistry()),
@@ -181,7 +186,7 @@ public class KafkaClusterFactory {
             clusterProperties.getKsqldbServerAuth(),
             clusterProperties.getKsqldbServerAuth(),
             clusterProperties.getSsl(),
             clusterProperties.getSsl(),
             clusterProperties.getKsqldbServerSsl(),
             clusterProperties.getKsqldbServerSsl(),
-            maxBuffSize
+            webClientMaxBuffSize
         ),
         ),
         ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
         ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
         "No live ksqldb instances available",
         "No live ksqldb instances available",

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.util;
 
 
 
 
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.config.WebclientProperties;
 import com.provectus.kafka.ui.config.auth.OAuthProperties;
 import com.provectus.kafka.ui.config.auth.OAuthProperties;
 import com.provectus.kafka.ui.config.auth.RoleBasedAccessControlProperties;
 import com.provectus.kafka.ui.config.auth.RoleBasedAccessControlProperties;
 import com.provectus.kafka.ui.exception.FileUploadException;
 import com.provectus.kafka.ui.exception.FileUploadException;
@@ -97,6 +98,7 @@ public class DynamicConfigOperations {
                 .type(ctx.getEnvironment().getProperty("auth.type"))
                 .type(ctx.getEnvironment().getProperty("auth.type"))
                 .oauth2(getNullableBean(OAuthProperties.class))
                 .oauth2(getNullableBean(OAuthProperties.class))
                 .build())
                 .build())
+        .webclient(getNullableBean(WebclientProperties.class))
         .build();
         .build();
   }
   }
 
 
@@ -204,6 +206,7 @@ public class DynamicConfigOperations {
     private ClustersProperties kafka;
     private ClustersProperties kafka;
     private RoleBasedAccessControlProperties rbac;
     private RoleBasedAccessControlProperties rbac;
     private Auth auth;
     private Auth auth;
+    private WebclientProperties webclient;
 
 
     @Data
     @Data
     @Builder
     @Builder
@@ -222,6 +225,9 @@ public class DynamicConfigOperations {
       Optional.ofNullable(auth)
       Optional.ofNullable(auth)
           .flatMap(a -> Optional.ofNullable(a.oauth2))
           .flatMap(a -> Optional.ofNullable(a.oauth2))
           .ifPresent(OAuthProperties::validate);
           .ifPresent(OAuthProperties::validate);
+
+      Optional.ofNullable(webclient)
+          .ifPresent(WebclientProperties::validate);
     }
     }
   }
   }
 
 

+ 10 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -3467,6 +3467,12 @@ components:
                               type: array
                               type: array
                               items:
                               items:
                                 $ref: '#/components/schemas/Action'
                                 $ref: '#/components/schemas/Action'
+            webclient:
+              type: object
+              properties:
+                maxInMemoryBufferSize:
+                  type: string
+                  description: "examples: 20, 12KB, 5MB"
             kafka:
             kafka:
               type: object
               type: object
               properties:
               properties:
@@ -3479,6 +3485,10 @@ components:
                       type: integer
                       type: integer
                     noDataEmptyPolls:
                     noDataEmptyPolls:
                       type: integer
                       type: integer
+                adminClientTimeout:
+                  type: integer
+                internalTopicPrefix:
+                  type: string
                 clusters:
                 clusters:
                   type: array
                   type: array
                   items:
                   items: