package com.provectus.kafka.ui.client; import com.provectus.kafka.ui.connect.ApiClient; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; import com.provectus.kafka.ui.connect.model.Connector; import com.provectus.kafka.ui.connect.model.NewConnector; import com.provectus.kafka.ui.exception.RebalanceInProgressException; import com.provectus.kafka.ui.exception.ValidationException; import java.util.List; import java.util.Map; import lombok.extern.log4j.Log4j2; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.util.MultiValueMap; import org.springframework.web.client.RestClientException; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.retry.Retry; @Log4j2 public class RetryingKafkaConnectClient extends KafkaConnectClientApi { private static final int MAX_RETRIES = 5; public RetryingKafkaConnectClient(String basePath) { super(new RetryingApiClient().setBasePath(basePath)); } private static Mono withRetryOnConflict(Mono publisher) { return publisher.retryWhen( Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict)) .onErrorResume(WebClientResponseException.Conflict.class, e -> Mono.error(new RebalanceInProgressException())) .doOnError(log::error); } private static Flux withRetryOnConflict(Flux publisher) { return publisher.retryWhen( Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict)) .onErrorResume(WebClientResponseException.Conflict.class, e -> Mono.error(new RebalanceInProgressException())) .doOnError(log::error); } private static Mono withBadRequestErrorHandling(Mono publisher) { return publisher .onErrorResume(WebClientResponseException.BadRequest.class, e -> Mono.error(new ValidationException("Invalid configuration"))) .onErrorResume(WebClientResponseException.InternalServerError.class, e -> Mono.error(new ValidationException("Invalid configuration"))); } @Override public Mono createConnector(NewConnector newConnector) throws RestClientException { return withBadRequestErrorHandling( super.createConnector(newConnector) ); } @Override public Mono setConnectorConfig(String connectorName, Map requestBody) throws RestClientException { return withBadRequestErrorHandling( super.setConnectorConfig(connectorName, requestBody) ); } private static class RetryingApiClient extends ApiClient { @Override public Mono invokeAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, HttpHeaders headerParams, MultiValueMap cookieParams, MultiValueMap formParams, List accept, MediaType contentType, String[] authNames, ParameterizedTypeReference returnType) throws RestClientException { return withRetryOnConflict( super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType) ); } @Override public Flux invokeFluxAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, HttpHeaders headerParams, MultiValueMap cookieParams, MultiValueMap formParams, List accept, MediaType contentType, String[] authNames, ParameterizedTypeReference returnType) throws RestClientException { return withRetryOnConflict( super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType) ); } } }