From 3e5093d101a0850a620d417027449c7410889da2 Mon Sep 17 00:00:00 2001 From: Artem Kazlanzhy <87637857+akazlanzhy@users.noreply.github.com> Date: Tue, 28 Jun 2022 22:58:50 +0300 Subject: [PATCH] Make max-in-memory configurable for webclients (#2139) * Fix max-in-memory configuration for KsqlApiClient * Fix tests, resolve mr comments * Fix max-in-memory configuration for RetryingKafkaConnectClient * Refactor KafkaConnectClients class Co-authored-by: Roman Zabaluev --- .../kafka/ui/client/KafkaConnectClients.java | 19 ------- .../ui/client/KafkaConnectClientsFactory.java | 22 ++++++++ .../ui/client/RetryingKafkaConnectClient.java | 55 +++++++++++++++++-- .../kafka/ui/service/KafkaConnectService.java | 6 +- .../kafka/ui/service/ksql/KsqlApiClient.java | 6 +- .../kafka/ui/service/ksql/KsqlServiceV2.java | 17 ++++-- .../ui/service/ksql/KsqlApiClientTest.java | 7 ++- .../ui/service/ksql/KsqlServiceV2Test.java | 13 +++-- 8 files changed, 106 insertions(+), 39 deletions(-) delete mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClientsFactory.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java deleted file mode 100644 index de0c9054ae..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.provectus.kafka.ui.client; - -import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; -import com.provectus.kafka.ui.model.KafkaConnectCluster; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public final class KafkaConnectClients { - - private KafkaConnectClients() { - - } - - private static final Map CACHE = new ConcurrentHashMap<>(); - - public static KafkaConnectClientApi withKafkaConnectConfig(KafkaConnectCluster config) { - return CACHE.computeIfAbsent(config.getAddress(), s -> new RetryingKafkaConnectClient(config)); - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClientsFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClientsFactory.java new file mode 100644 index 0000000000..3a5195f5cf --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClientsFactory.java @@ -0,0 +1,22 @@ +package com.provectus.kafka.ui.client; + +import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; +import com.provectus.kafka.ui.model.KafkaConnectCluster; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.util.unit.DataSize; + +@Service +public class KafkaConnectClientsFactory { + + @Value("${webclient.max-in-memory-buffer-size:20MB}") + private DataSize maxBuffSize; + + private final Map cache = new ConcurrentHashMap<>(); + + public KafkaConnectClientApi withKafkaConnectConfig(KafkaConnectCluster config) { + return cache.computeIfAbsent(config.getAddress(), s -> new RetryingKafkaConnectClient(config, maxBuffSize)); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java index 7071661373..b1115d0eef 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java @@ -1,22 +1,34 @@ package com.provectus.kafka.ui.client; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.provectus.kafka.ui.connect.ApiClient; +import com.provectus.kafka.ui.connect.RFC3339DateFormat; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; import com.provectus.kafka.ui.connect.model.Connector; import com.provectus.kafka.ui.connect.model.NewConnector; import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaConnectCluster; +import java.text.DateFormat; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.TimeZone; import lombok.extern.slf4j.Slf4j; +import org.openapitools.jackson.nullable.JsonNullableModule; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; +import org.springframework.http.codec.json.Jackson2JsonDecoder; +import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.util.MultiValueMap; +import org.springframework.util.unit.DataSize; import org.springframework.web.client.RestClientException; +import org.springframework.web.reactive.function.client.ExchangeStrategies; +import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -27,8 +39,8 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi { private static final int MAX_RETRIES = 5; private static final Duration RETRIES_DELAY = Duration.ofMillis(200); - public RetryingKafkaConnectClient(KafkaConnectCluster config) { - super(new RetryingApiClient(config)); + public RetryingKafkaConnectClient(KafkaConnectCluster config, DataSize maxBuffSize) { + super(new RetryingApiClient(config, maxBuffSize)); } private static Retry conflictCodeRetry() { @@ -73,13 +85,48 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi { private static class RetryingApiClient extends ApiClient { - public RetryingApiClient(KafkaConnectCluster config) { - super(); + private static final DateFormat dateFormat = getDefaultDateFormat(); + private static final ObjectMapper mapper = buildObjectMapper(dateFormat); + + public RetryingApiClient(KafkaConnectCluster config, DataSize maxBuffSize) { + super(buildWebClient(mapper, maxBuffSize), mapper, dateFormat); setBasePath(config.getAddress()); setUsername(config.getUserName()); setPassword(config.getPassword()); } + public static DateFormat getDefaultDateFormat() { + DateFormat dateFormat = new RFC3339DateFormat(); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + return dateFormat; + } + + public static WebClient buildWebClient(ObjectMapper mapper, DataSize maxBuffSize) { + ExchangeStrategies strategies = ExchangeStrategies + .builder() + .codecs(clientDefaultCodecsConfigurer -> { + clientDefaultCodecsConfigurer.defaultCodecs() + .jackson2JsonEncoder(new Jackson2JsonEncoder(mapper, MediaType.APPLICATION_JSON)); + clientDefaultCodecsConfigurer.defaultCodecs() + .jackson2JsonDecoder(new Jackson2JsonDecoder(mapper, MediaType.APPLICATION_JSON)); + clientDefaultCodecsConfigurer.defaultCodecs() + .maxInMemorySize((int) maxBuffSize.toBytes()); + }) + .build(); + WebClient.Builder webClient = WebClient.builder().exchangeStrategies(strategies); + return webClient.build(); + } + + public static ObjectMapper buildObjectMapper(DateFormat dateFormat) { + ObjectMapper mapper = new ObjectMapper(); + mapper.setDateFormat(dateFormat); + mapper.registerModule(new JavaTimeModule()); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + JsonNullableModule jnm = new JsonNullableModule(); + mapper.registerModule(jnm); + return mapper; + } + @Override public Mono invokeAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 21680d3dd9..69fd7a2627 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.provectus.kafka.ui.client.KafkaConnectClients; +import com.provectus.kafka.ui.client.KafkaConnectClientsFactory; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; import com.provectus.kafka.ui.connect.model.ConnectorStatus; import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector; @@ -21,7 +21,6 @@ import com.provectus.kafka.ui.model.ConnectorStateDTO; import com.provectus.kafka.ui.model.ConnectorTaskStatusDTO; import com.provectus.kafka.ui.model.FullConnectorInfoDTO; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.KafkaConnectCluster; import com.provectus.kafka.ui.model.NewConnectorDTO; import com.provectus.kafka.ui.model.TaskDTO; import com.provectus.kafka.ui.model.connect.InternalConnectInfo; @@ -51,6 +50,7 @@ public class KafkaConnectService { private final KafkaConnectMapper kafkaConnectMapper; private final ObjectMapper objectMapper; private final KafkaConfigSanitizer kafkaConfigSanitizer; + private final KafkaConnectClientsFactory kafkaConnectClientsFactory; public Mono> getConnects(KafkaCluster cluster) { return Mono.just( @@ -328,6 +328,6 @@ public class KafkaConnectService { .filter(connect -> connect.getName().equals(connectName)) .findFirst()) .switchIfEmpty(Mono.error(ConnectNotFoundException::new)) - .map(KafkaConnectClients::withKafkaConnectConfig); + .map(kafkaConnectClientsFactory::withKafkaConnectConfig); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java index 9341dd30d3..161c284c97 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java @@ -21,6 +21,7 @@ import org.springframework.core.codec.DecodingException; import org.springframework.http.MediaType; import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.util.MimeTypeUtils; +import org.springframework.util.unit.DataSize; import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException; @@ -57,9 +58,11 @@ public class KsqlApiClient { //-------------------------------------------------------------------------------------------- private final KafkaCluster cluster; + private final DataSize maxBuffSize; - public KsqlApiClient(KafkaCluster cluster) { + public KsqlApiClient(KafkaCluster cluster, DataSize maxBuffSize) { this.cluster = cluster; + this.maxBuffSize = maxBuffSize; } private WebClient webClient() { @@ -75,6 +78,7 @@ public class KsqlApiClient { }) .build(); return WebClient.builder() + .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes())) .exchangeStrategies(exchangeStrategies) .build(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2.java index 12a2e7d0b2..63fcede361 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2.java @@ -13,16 +13,23 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import lombok.Value; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.springframework.util.unit.DataSize; import reactor.core.publisher.Flux; @Slf4j @Service public class KsqlServiceV2 { - @Value + private final DataSize maxBuffSize; + + public KsqlServiceV2(@Value("${webclient.max-in-memory-buffer-size:20MB}") DataSize maxBuffSize) { + this.maxBuffSize = maxBuffSize; + } + + @lombok.Value private static class KsqlExecuteCommand { KafkaCluster cluster; String ksql; @@ -48,12 +55,12 @@ public class KsqlServiceV2 { throw new ValidationException("No command registered with id " + commandId); } registeredCommands.invalidate(commandId); - return new KsqlApiClient(cmd.cluster) + return new KsqlApiClient(cmd.cluster, maxBuffSize) .execute(cmd.ksql, cmd.streamProperties); } public Flux listTables(KafkaCluster cluster) { - return new KsqlApiClient(cluster) + return new KsqlApiClient(cluster, maxBuffSize) .execute("LIST TABLES;", Map.of()) .flatMap(resp -> { if (!resp.getHeader().equals("Tables")) { @@ -75,7 +82,7 @@ public class KsqlServiceV2 { } public Flux listStreams(KafkaCluster cluster) { - return new KsqlApiClient(cluster) + return new KsqlApiClient(cluster, maxBuffSize) .execute("LIST STREAMS;", Map.of()) .flatMap(resp -> { if (!resp.getHeader().equals("Streams")) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java index d09e32f948..5956d73082 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java @@ -16,6 +16,7 @@ import java.util.Map; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.util.unit.DataSize; import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; import reactor.test.StepVerifier; @@ -26,6 +27,8 @@ class KsqlApiClientTest extends AbstractIntegrationTest { DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0")) .withKafka(kafka); + private static final DataSize maxBuffSize = DataSize.ofMegabytes(20); + @BeforeAll static void startContainer() { KSQL_DB.start(); @@ -39,7 +42,7 @@ class KsqlApiClientTest extends AbstractIntegrationTest { // Tutorial is here: https://ksqldb.io/quickstart.html @Test void ksqTutorialQueriesWork() { - var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build()); + var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(), maxBuffSize); execCommandSync(client, "CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) " + "WITH (kafka_topic='locations', value_format='json', partitions=1);", @@ -126,4 +129,4 @@ class KsqlApiClientTest extends AbstractIntegrationTest { } -} \ No newline at end of file +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java index 22c87c9aec..d670680ea2 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java @@ -13,6 +13,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.springframework.util.unit.DataSize; import org.testcontainers.utility.DockerImageName; class KsqlServiceV2Test extends AbstractIntegrationTest { @@ -24,6 +25,8 @@ class KsqlServiceV2Test extends AbstractIntegrationTest { private static final Set STREAMS_TO_DELETE = new CopyOnWriteArraySet<>(); private static final Set TABLES_TO_DELETE = new CopyOnWriteArraySet<>(); + private static final DataSize maxBuffSize = DataSize.ofMegabytes(20); + @BeforeAll static void init() { KSQL_DB.start(); @@ -31,7 +34,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest { @AfterAll static void cleanup() { - var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build()); + var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(), maxBuffSize); TABLES_TO_DELETE.forEach(t -> client.execute(String.format("DROP TABLE IF EXISTS %s DELETE TOPIC;", t), Map.of()) @@ -44,7 +47,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest { KSQL_DB.stop(); } - private final KsqlServiceV2 ksqlService = new KsqlServiceV2(); + private final KsqlServiceV2 ksqlService = new KsqlServiceV2(maxBuffSize); @Test void listStreamsReturnsAllKsqlStreams() { @@ -52,7 +55,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest { var streamName = "stream_" + System.currentTimeMillis(); STREAMS_TO_DELETE.add(streamName); - new KsqlApiClient(cluster) + new KsqlApiClient(cluster, maxBuffSize) .execute( String.format("CREATE STREAM %s ( " + " c1 BIGINT KEY, " @@ -81,7 +84,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest { var tableName = "table_" + System.currentTimeMillis(); TABLES_TO_DELETE.add(tableName); - new KsqlApiClient(cluster) + new KsqlApiClient(cluster, maxBuffSize) .execute( String.format("CREATE TABLE %s ( " + " c1 BIGINT PRIMARY KEY, " @@ -105,4 +108,4 @@ class KsqlServiceV2Test extends AbstractIntegrationTest { ); } -} \ No newline at end of file +}