RetryingKafkaConnectClient.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package com.provectus.kafka.ui.client;
  2. import static com.provectus.kafka.ui.config.ClustersProperties.ConnectCluster;
  3. import com.provectus.kafka.ui.connect.ApiClient;
  4. import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
  5. import com.provectus.kafka.ui.connect.model.Connector;
  6. import com.provectus.kafka.ui.connect.model.NewConnector;
  7. import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
  8. import com.provectus.kafka.ui.exception.ValidationException;
  9. import com.provectus.kafka.ui.util.WebClientConfigurator;
  10. import java.time.Duration;
  11. import java.util.List;
  12. import java.util.Map;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.springframework.core.ParameterizedTypeReference;
  15. import org.springframework.http.HttpHeaders;
  16. import org.springframework.http.HttpMethod;
  17. import org.springframework.http.MediaType;
  18. import org.springframework.util.MultiValueMap;
  19. import org.springframework.util.unit.DataSize;
  20. import org.springframework.web.client.RestClientException;
  21. import org.springframework.web.reactive.function.client.WebClient;
  22. import org.springframework.web.reactive.function.client.WebClientResponseException;
  23. import reactor.core.publisher.Flux;
  24. import reactor.core.publisher.Mono;
  25. import reactor.util.retry.Retry;
  26. @Slf4j
  27. public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
  28. private static final int MAX_RETRIES = 5;
  29. private static final Duration RETRIES_DELAY = Duration.ofMillis(200);
  30. public RetryingKafkaConnectClient(ConnectCluster config, DataSize maxBuffSize) {
  31. super(new RetryingApiClient(config, maxBuffSize));
  32. }
  33. private static Retry conflictCodeRetry() {
  34. return Retry
  35. .fixedDelay(MAX_RETRIES, RETRIES_DELAY)
  36. .filter(e -> e instanceof WebClientResponseException.Conflict)
  37. .onRetryExhaustedThrow((spec, signal) ->
  38. new KafkaConnectConflictReponseException(
  39. (WebClientResponseException.Conflict) signal.failure()));
  40. }
  41. private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
  42. return publisher.retryWhen(conflictCodeRetry());
  43. }
  44. private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
  45. return publisher.retryWhen(conflictCodeRetry());
  46. }
  47. private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
  48. return publisher
  49. .onErrorResume(WebClientResponseException.BadRequest.class, e ->
  50. Mono.error(new ValidationException("Invalid configuration")))
  51. .onErrorResume(WebClientResponseException.InternalServerError.class, e ->
  52. Mono.error(new ValidationException("Invalid configuration")));
  53. }
  54. @Override
  55. public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
  56. return withBadRequestErrorHandling(
  57. super.createConnector(newConnector)
  58. );
  59. }
  60. @Override
  61. public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody)
  62. throws RestClientException {
  63. return withBadRequestErrorHandling(
  64. super.setConnectorConfig(connectorName, requestBody)
  65. );
  66. }
  67. private static class RetryingApiClient extends ApiClient {
  68. public RetryingApiClient(ConnectCluster config, DataSize maxBuffSize) {
  69. super(buildWebClient(maxBuffSize, config), null, null);
  70. setBasePath(config.getAddress());
  71. setUsername(config.getUserName());
  72. setPassword(config.getPassword());
  73. }
  74. public static WebClient buildWebClient(DataSize maxBuffSize, ConnectCluster config) {
  75. return new WebClientConfigurator()
  76. .configureSsl(
  77. config.getKeystoreLocation(),
  78. config.getKeystorePassword(),
  79. config.getTruststoreLocation(),
  80. config.getTruststorePassword()
  81. )
  82. .configureBasicAuth(
  83. config.getUserName(),
  84. config.getPassword()
  85. )
  86. .configureBufferSize(maxBuffSize)
  87. .build();
  88. }
  89. @Override
  90. public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
  91. MultiValueMap<String, String> queryParams, Object body,
  92. HttpHeaders headerParams,
  93. MultiValueMap<String, String> cookieParams,
  94. MultiValueMap<String, Object> formParams, List<MediaType> accept,
  95. MediaType contentType, String[] authNames,
  96. ParameterizedTypeReference<T> returnType)
  97. throws RestClientException {
  98. return withRetryOnConflict(
  99. super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams,
  100. formParams, accept, contentType, authNames, returnType)
  101. );
  102. }
  103. @Override
  104. public <T> Flux<T> invokeFluxAPI(String path, HttpMethod method, Map<String, Object> pathParams,
  105. MultiValueMap<String, String> queryParams, Object body,
  106. HttpHeaders headerParams,
  107. MultiValueMap<String, String> cookieParams,
  108. MultiValueMap<String, Object> formParams,
  109. List<MediaType> accept, MediaType contentType,
  110. String[] authNames, ParameterizedTypeReference<T> returnType)
  111. throws RestClientException {
  112. return withRetryOnConflict(
  113. super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams,
  114. cookieParams, formParams, accept, contentType, authNames, returnType)
  115. );
  116. }
  117. }
  118. }