Make max-in-memory configurable for webclients (#2139)

* Fix max-in-memory configuration for KsqlApiClient

* Fix tests, resolve mr comments

* Fix max-in-memory configuration for RetryingKafkaConnectClient

* Refactor KafkaConnectClients class

Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
This commit is contained in:
Artem Kazlanzhy 2022-06-28 22:58:50 +03:00 committed by GitHub
parent a4046d46ef
commit 3e5093d101
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 106 additions and 39 deletions

View file

@ -1,19 +0,0 @@
package com.provectus.kafka.ui.client;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.model.KafkaConnectCluster;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public final class KafkaConnectClients {
private KafkaConnectClients() {
}
private static final Map<String, KafkaConnectClientApi> CACHE = new ConcurrentHashMap<>();
public static KafkaConnectClientApi withKafkaConnectConfig(KafkaConnectCluster config) {
return CACHE.computeIfAbsent(config.getAddress(), s -> new RetryingKafkaConnectClient(config));
}
}

View file

@ -0,0 +1,22 @@
package com.provectus.kafka.ui.client;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.model.KafkaConnectCluster;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.unit.DataSize;
@Service
public class KafkaConnectClientsFactory {
@Value("${webclient.max-in-memory-buffer-size:20MB}")
private DataSize maxBuffSize;
private final Map<String, KafkaConnectClientApi> cache = new ConcurrentHashMap<>();
public KafkaConnectClientApi withKafkaConnectConfig(KafkaConnectCluster config) {
return cache.computeIfAbsent(config.getAddress(), s -> new RetryingKafkaConnectClient(config, maxBuffSize));
}
}

View file

@ -1,22 +1,34 @@
package com.provectus.kafka.ui.client;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.provectus.kafka.ui.connect.ApiClient;
import com.provectus.kafka.ui.connect.RFC3339DateFormat;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.connect.model.Connector;
import com.provectus.kafka.ui.connect.model.NewConnector;
import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaConnectCluster;
import java.text.DateFormat;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import lombok.extern.slf4j.Slf4j;
import org.openapitools.jackson.nullable.JsonNullableModule;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.util.MultiValueMap;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.RestClientException;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -27,8 +39,8 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
private static final int MAX_RETRIES = 5;
private static final Duration RETRIES_DELAY = Duration.ofMillis(200);
public RetryingKafkaConnectClient(KafkaConnectCluster config) {
super(new RetryingApiClient(config));
public RetryingKafkaConnectClient(KafkaConnectCluster config, DataSize maxBuffSize) {
super(new RetryingApiClient(config, maxBuffSize));
}
private static Retry conflictCodeRetry() {
@ -73,13 +85,48 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
private static class RetryingApiClient extends ApiClient {
public RetryingApiClient(KafkaConnectCluster config) {
super();
private static final DateFormat dateFormat = getDefaultDateFormat();
private static final ObjectMapper mapper = buildObjectMapper(dateFormat);
public RetryingApiClient(KafkaConnectCluster config, DataSize maxBuffSize) {
super(buildWebClient(mapper, maxBuffSize), mapper, dateFormat);
setBasePath(config.getAddress());
setUsername(config.getUserName());
setPassword(config.getPassword());
}
public static DateFormat getDefaultDateFormat() {
DateFormat dateFormat = new RFC3339DateFormat();
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return dateFormat;
}
public static WebClient buildWebClient(ObjectMapper mapper, DataSize maxBuffSize) {
ExchangeStrategies strategies = ExchangeStrategies
.builder()
.codecs(clientDefaultCodecsConfigurer -> {
clientDefaultCodecsConfigurer.defaultCodecs()
.jackson2JsonEncoder(new Jackson2JsonEncoder(mapper, MediaType.APPLICATION_JSON));
clientDefaultCodecsConfigurer.defaultCodecs()
.jackson2JsonDecoder(new Jackson2JsonDecoder(mapper, MediaType.APPLICATION_JSON));
clientDefaultCodecsConfigurer.defaultCodecs()
.maxInMemorySize((int) maxBuffSize.toBytes());
})
.build();
WebClient.Builder webClient = WebClient.builder().exchangeStrategies(strategies);
return webClient.build();
}
public static ObjectMapper buildObjectMapper(DateFormat dateFormat) {
ObjectMapper mapper = new ObjectMapper();
mapper.setDateFormat(dateFormat);
mapper.registerModule(new JavaTimeModule());
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
JsonNullableModule jnm = new JsonNullableModule();
mapper.registerModule(jnm);
return mapper;
}
@Override
public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
MultiValueMap<String, String> queryParams, Object body,

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.client.KafkaConnectClients;
import com.provectus.kafka.ui.client.KafkaConnectClientsFactory;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.connect.model.ConnectorStatus;
import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
@ -21,7 +21,6 @@ import com.provectus.kafka.ui.model.ConnectorStateDTO;
import com.provectus.kafka.ui.model.ConnectorTaskStatusDTO;
import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KafkaConnectCluster;
import com.provectus.kafka.ui.model.NewConnectorDTO;
import com.provectus.kafka.ui.model.TaskDTO;
import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
@ -51,6 +50,7 @@ public class KafkaConnectService {
private final KafkaConnectMapper kafkaConnectMapper;
private final ObjectMapper objectMapper;
private final KafkaConfigSanitizer kafkaConfigSanitizer;
private final KafkaConnectClientsFactory kafkaConnectClientsFactory;
public Mono<Flux<ConnectDTO>> getConnects(KafkaCluster cluster) {
return Mono.just(
@ -328,6 +328,6 @@ public class KafkaConnectService {
.filter(connect -> connect.getName().equals(connectName))
.findFirst())
.switchIfEmpty(Mono.error(ConnectNotFoundException::new))
.map(KafkaConnectClients::withKafkaConnectConfig);
.map(kafkaConnectClientsFactory::withKafkaConnectConfig);
}
}

View file

@ -21,6 +21,7 @@ import org.springframework.core.codec.DecodingException;
import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
@ -57,9 +58,11 @@ public class KsqlApiClient {
//--------------------------------------------------------------------------------------------
private final KafkaCluster cluster;
private final DataSize maxBuffSize;
public KsqlApiClient(KafkaCluster cluster) {
public KsqlApiClient(KafkaCluster cluster, DataSize maxBuffSize) {
this.cluster = cluster;
this.maxBuffSize = maxBuffSize;
}
private WebClient webClient() {
@ -75,6 +78,7 @@ public class KsqlApiClient {
})
.build();
return WebClient.builder()
.codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
.exchangeStrategies(exchangeStrategies)
.build();
}

View file

@ -13,16 +13,23 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.unit.DataSize;
import reactor.core.publisher.Flux;
@Slf4j
@Service
public class KsqlServiceV2 {
@Value
private final DataSize maxBuffSize;
public KsqlServiceV2(@Value("${webclient.max-in-memory-buffer-size:20MB}") DataSize maxBuffSize) {
this.maxBuffSize = maxBuffSize;
}
@lombok.Value
private static class KsqlExecuteCommand {
KafkaCluster cluster;
String ksql;
@ -48,12 +55,12 @@ public class KsqlServiceV2 {
throw new ValidationException("No command registered with id " + commandId);
}
registeredCommands.invalidate(commandId);
return new KsqlApiClient(cmd.cluster)
return new KsqlApiClient(cmd.cluster, maxBuffSize)
.execute(cmd.ksql, cmd.streamProperties);
}
public Flux<KsqlTableDescriptionDTO> listTables(KafkaCluster cluster) {
return new KsqlApiClient(cluster)
return new KsqlApiClient(cluster, maxBuffSize)
.execute("LIST TABLES;", Map.of())
.flatMap(resp -> {
if (!resp.getHeader().equals("Tables")) {
@ -75,7 +82,7 @@ public class KsqlServiceV2 {
}
public Flux<KsqlStreamDescriptionDTO> listStreams(KafkaCluster cluster) {
return new KsqlApiClient(cluster)
return new KsqlApiClient(cluster, maxBuffSize)
.execute("LIST STREAMS;", Map.of())
.flatMap(resp -> {
if (!resp.getHeader().equals("Streams")) {

View file

@ -16,6 +16,7 @@ import java.util.Map;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.util.unit.DataSize;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import reactor.test.StepVerifier;
@ -26,6 +27,8 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
.withKafka(kafka);
private static final DataSize maxBuffSize = DataSize.ofMegabytes(20);
@BeforeAll
static void startContainer() {
KSQL_DB.start();
@ -39,7 +42,7 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
// Tutorial is here: https://ksqldb.io/quickstart.html
@Test
void ksqTutorialQueriesWork() {
var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build());
var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(), maxBuffSize);
execCommandSync(client,
"CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) "
+ "WITH (kafka_topic='locations', value_format='json', partitions=1);",
@ -126,4 +129,4 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
}
}
}

View file

@ -13,6 +13,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.util.unit.DataSize;
import org.testcontainers.utility.DockerImageName;
class KsqlServiceV2Test extends AbstractIntegrationTest {
@ -24,6 +25,8 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
private static final Set<String> STREAMS_TO_DELETE = new CopyOnWriteArraySet<>();
private static final Set<String> TABLES_TO_DELETE = new CopyOnWriteArraySet<>();
private static final DataSize maxBuffSize = DataSize.ofMegabytes(20);
@BeforeAll
static void init() {
KSQL_DB.start();
@ -31,7 +34,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
@AfterAll
static void cleanup() {
var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build());
var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(), maxBuffSize);
TABLES_TO_DELETE.forEach(t ->
client.execute(String.format("DROP TABLE IF EXISTS %s DELETE TOPIC;", t), Map.of())
@ -44,7 +47,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
KSQL_DB.stop();
}
private final KsqlServiceV2 ksqlService = new KsqlServiceV2();
private final KsqlServiceV2 ksqlService = new KsqlServiceV2(maxBuffSize);
@Test
void listStreamsReturnsAllKsqlStreams() {
@ -52,7 +55,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
var streamName = "stream_" + System.currentTimeMillis();
STREAMS_TO_DELETE.add(streamName);
new KsqlApiClient(cluster)
new KsqlApiClient(cluster, maxBuffSize)
.execute(
String.format("CREATE STREAM %s ( "
+ " c1 BIGINT KEY, "
@ -81,7 +84,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
var tableName = "table_" + System.currentTimeMillis();
TABLES_TO_DELETE.add(tableName);
new KsqlApiClient(cluster)
new KsqlApiClient(cluster, maxBuffSize)
.execute(
String.format("CREATE TABLE %s ( "
+ " c1 BIGINT PRIMARY KEY, "
@ -105,4 +108,4 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
);
}
}
}