RetryingKafkaConnectClient.java 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package com.provectus.kafka.ui.client;
  2. import static com.provectus.kafka.ui.config.ClustersProperties.ConnectCluster;
  3. import com.provectus.kafka.ui.config.ClustersProperties;
  4. import com.provectus.kafka.ui.connect.ApiClient;
  5. import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
  6. import com.provectus.kafka.ui.connect.model.Connector;
  7. import com.provectus.kafka.ui.connect.model.NewConnector;
  8. import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
  9. import com.provectus.kafka.ui.exception.ValidationException;
  10. import com.provectus.kafka.ui.util.WebClientConfigurator;
  11. import java.time.Duration;
  12. import java.util.List;
  13. import java.util.Map;
  14. import javax.annotation.Nullable;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.springframework.core.ParameterizedTypeReference;
  17. import org.springframework.http.HttpHeaders;
  18. import org.springframework.http.HttpMethod;
  19. import org.springframework.http.MediaType;
  20. import org.springframework.util.MultiValueMap;
  21. import org.springframework.util.unit.DataSize;
  22. import org.springframework.web.client.RestClientException;
  23. import org.springframework.web.reactive.function.client.WebClient;
  24. import org.springframework.web.reactive.function.client.WebClientResponseException;
  25. import reactor.core.publisher.Flux;
  26. import reactor.core.publisher.Mono;
  27. import reactor.util.retry.Retry;
  28. @Slf4j
  29. public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
  30. private static final int MAX_RETRIES = 5;
  31. private static final Duration RETRIES_DELAY = Duration.ofMillis(200);
  32. public RetryingKafkaConnectClient(ConnectCluster config,
  33. @Nullable ClustersProperties.TruststoreConfig truststoreConfig,
  34. DataSize maxBuffSize) {
  35. super(new RetryingApiClient(config, truststoreConfig, maxBuffSize));
  36. }
  37. private static Retry conflictCodeRetry() {
  38. return Retry
  39. .fixedDelay(MAX_RETRIES, RETRIES_DELAY)
  40. .filter(e -> e instanceof WebClientResponseException.Conflict)
  41. .onRetryExhaustedThrow((spec, signal) ->
  42. new KafkaConnectConflictReponseException(
  43. (WebClientResponseException.Conflict) signal.failure()));
  44. }
  45. private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
  46. return publisher.retryWhen(conflictCodeRetry());
  47. }
  48. private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
  49. return publisher.retryWhen(conflictCodeRetry());
  50. }
  51. private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
  52. return publisher
  53. .onErrorResume(WebClientResponseException.BadRequest.class, e ->
  54. Mono.error(new ValidationException("Invalid configuration")))
  55. .onErrorResume(WebClientResponseException.InternalServerError.class, e ->
  56. Mono.error(new ValidationException("Invalid configuration")));
  57. }
  58. @Override
  59. public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
  60. return withBadRequestErrorHandling(
  61. super.createConnector(newConnector)
  62. );
  63. }
  64. @Override
  65. public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody)
  66. throws RestClientException {
  67. return withBadRequestErrorHandling(
  68. super.setConnectorConfig(connectorName, requestBody)
  69. );
  70. }
  71. private static class RetryingApiClient extends ApiClient {
  72. public RetryingApiClient(ConnectCluster config,
  73. ClustersProperties.TruststoreConfig truststoreConfig,
  74. DataSize maxBuffSize) {
  75. super(buildWebClient(maxBuffSize, config, truststoreConfig), null, null);
  76. setBasePath(config.getAddress());
  77. setUsername(config.getUsername());
  78. setPassword(config.getPassword());
  79. }
  80. public static WebClient buildWebClient(DataSize maxBuffSize,
  81. ConnectCluster config,
  82. ClustersProperties.TruststoreConfig truststoreConfig) {
  83. return new WebClientConfigurator()
  84. .configureSsl(
  85. truststoreConfig,
  86. new ClustersProperties.KeystoreConfig(
  87. config.getKeystoreLocation(),
  88. config.getKeystorePassword()
  89. )
  90. )
  91. .configureBasicAuth(
  92. config.getUsername(),
  93. config.getPassword()
  94. )
  95. .configureBufferSize(maxBuffSize)
  96. .build();
  97. }
  98. @Override
  99. public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
  100. MultiValueMap<String, String> queryParams, Object body,
  101. HttpHeaders headerParams,
  102. MultiValueMap<String, String> cookieParams,
  103. MultiValueMap<String, Object> formParams, List<MediaType> accept,
  104. MediaType contentType, String[] authNames,
  105. ParameterizedTypeReference<T> returnType)
  106. throws RestClientException {
  107. return withRetryOnConflict(
  108. super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams,
  109. formParams, accept, contentType, authNames, returnType)
  110. );
  111. }
  112. @Override
  113. public <T> Flux<T> invokeFluxAPI(String path, HttpMethod method, Map<String, Object> pathParams,
  114. MultiValueMap<String, String> queryParams, Object body,
  115. HttpHeaders headerParams,
  116. MultiValueMap<String, String> cookieParams,
  117. MultiValueMap<String, Object> formParams,
  118. List<MediaType> accept, MediaType contentType,
  119. String[] authNames, ParameterizedTypeReference<T> returnType)
  120. throws RestClientException {
  121. return withRetryOnConflict(
  122. super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams,
  123. cookieParams, formParams, accept, contentType, authNames, returnType)
  124. );
  125. }
  126. }
  127. }