package com.provectus.kafka.ui.client; import static com.provectus.kafka.ui.config.ClustersProperties.ConnectCluster; import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.connect.ApiClient; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; import com.provectus.kafka.ui.connect.model.Connector; import com.provectus.kafka.ui.connect.model.NewConnector; import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.util.WebClientConfigurator; import java.time.Duration; import java.util.List; import java.util.Map; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.util.MultiValueMap; import org.springframework.util.unit.DataSize; import org.springframework.web.client.RestClientException; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @Slf4j public class RetryingKafkaConnectClient extends KafkaConnectClientApi { private static final int MAX_RETRIES = 5; private static final Duration RETRIES_DELAY = Duration.ofMillis(200); public RetryingKafkaConnectClient(ConnectCluster config, @Nullable ClustersProperties.TruststoreConfig truststoreConfig, DataSize maxBuffSize) { super(new RetryingApiClient(config, truststoreConfig, maxBuffSize)); } private static Retry conflictCodeRetry() { return Retry .fixedDelay(MAX_RETRIES, RETRIES_DELAY) .filter(e -> e instanceof WebClientResponseException.Conflict) .onRetryExhaustedThrow((spec, signal) -> new KafkaConnectConflictReponseException( (WebClientResponseException.Conflict) signal.failure())); } private static Mono withRetryOnConflict(Mono publisher) { return publisher.retryWhen(conflictCodeRetry()); } private static Flux withRetryOnConflict(Flux publisher) { return publisher.retryWhen(conflictCodeRetry()); } private static Mono withBadRequestErrorHandling(Mono publisher) { return publisher .onErrorResume(WebClientResponseException.BadRequest.class, e -> Mono.error(new ValidationException("Invalid configuration"))) .onErrorResume(WebClientResponseException.InternalServerError.class, e -> Mono.error(new ValidationException("Invalid configuration"))); } @Override public Mono createConnector(NewConnector newConnector) throws RestClientException { return withBadRequestErrorHandling( super.createConnector(newConnector) ); } @Override public Mono setConnectorConfig(String connectorName, Map requestBody) throws RestClientException { return withBadRequestErrorHandling( super.setConnectorConfig(connectorName, requestBody) ); } private static class RetryingApiClient extends ApiClient { public RetryingApiClient(ConnectCluster config, ClustersProperties.TruststoreConfig truststoreConfig, DataSize maxBuffSize) { super(buildWebClient(maxBuffSize, config, truststoreConfig), null, null); setBasePath(config.getAddress()); setUsername(config.getUsername()); setPassword(config.getPassword()); } public static WebClient buildWebClient(DataSize maxBuffSize, ConnectCluster config, ClustersProperties.TruststoreConfig truststoreConfig) { return new WebClientConfigurator() .configureSsl( truststoreConfig, new ClustersProperties.KeystoreConfig( config.getKeystoreLocation(), config.getKeystorePassword() ) ) .configureBasicAuth( config.getUsername(), config.getPassword() ) .configureBufferSize(maxBuffSize) .build(); } @Override public Mono invokeAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, HttpHeaders headerParams, MultiValueMap cookieParams, MultiValueMap formParams, List accept, MediaType contentType, String[] authNames, ParameterizedTypeReference returnType) throws RestClientException { return withRetryOnConflict( super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType) ); } @Override public Flux invokeFluxAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, HttpHeaders headerParams, MultiValueMap cookieParams, MultiValueMap formParams, List accept, MediaType contentType, String[] authNames, ParameterizedTypeReference returnType) throws RestClientException { return withRetryOnConflict( super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType) ); } } }