RetryingKafkaConnectClient.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package com.provectus.kafka.ui.client;
  2. import com.provectus.kafka.ui.connect.ApiClient;
  3. import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
  4. import com.provectus.kafka.ui.connect.model.Connector;
  5. import com.provectus.kafka.ui.connect.model.NewConnector;
  6. import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
  7. import com.provectus.kafka.ui.exception.ValidationException;
  8. import com.provectus.kafka.ui.model.KafkaConnectCluster;
  9. import java.time.Duration;
  10. import java.util.List;
  11. import java.util.Map;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.springframework.core.ParameterizedTypeReference;
  14. import org.springframework.http.HttpHeaders;
  15. import org.springframework.http.HttpMethod;
  16. import org.springframework.http.MediaType;
  17. import org.springframework.util.MultiValueMap;
  18. import org.springframework.web.client.RestClientException;
  19. import org.springframework.web.reactive.function.client.WebClientResponseException;
  20. import reactor.core.publisher.Flux;
  21. import reactor.core.publisher.Mono;
  22. import reactor.util.retry.Retry;
  23. @Slf4j
  24. public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
  25. private static final int MAX_RETRIES = 5;
  26. private static final Duration RETRIES_DELAY = Duration.ofMillis(200);
  27. public RetryingKafkaConnectClient(KafkaConnectCluster config) {
  28. super(new RetryingApiClient(config));
  29. }
  30. private static Retry conflictCodeRetry() {
  31. return Retry
  32. .fixedDelay(MAX_RETRIES, RETRIES_DELAY)
  33. .filter(e -> e instanceof WebClientResponseException.Conflict)
  34. .onRetryExhaustedThrow((spec, signal) ->
  35. new KafkaConnectConflictReponseException(
  36. (WebClientResponseException.Conflict) signal.failure()));
  37. }
  38. private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
  39. return publisher.retryWhen(conflictCodeRetry());
  40. }
  41. private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
  42. return publisher.retryWhen(conflictCodeRetry());
  43. }
  44. private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
  45. return publisher
  46. .onErrorResume(WebClientResponseException.BadRequest.class, e ->
  47. Mono.error(new ValidationException("Invalid configuration")))
  48. .onErrorResume(WebClientResponseException.InternalServerError.class, e ->
  49. Mono.error(new ValidationException("Invalid configuration")));
  50. }
  51. @Override
  52. public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
  53. return withBadRequestErrorHandling(
  54. super.createConnector(newConnector)
  55. );
  56. }
  57. @Override
  58. public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody)
  59. throws RestClientException {
  60. return withBadRequestErrorHandling(
  61. super.setConnectorConfig(connectorName, requestBody)
  62. );
  63. }
  64. private static class RetryingApiClient extends ApiClient {
  65. public RetryingApiClient(KafkaConnectCluster config) {
  66. super();
  67. setBasePath(config.getAddress());
  68. setUsername(config.getUserName());
  69. setPassword(config.getPassword());
  70. }
  71. @Override
  72. public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
  73. MultiValueMap<String, String> queryParams, Object body,
  74. HttpHeaders headerParams,
  75. MultiValueMap<String, String> cookieParams,
  76. MultiValueMap<String, Object> formParams, List<MediaType> accept,
  77. MediaType contentType, String[] authNames,
  78. ParameterizedTypeReference<T> returnType)
  79. throws RestClientException {
  80. return withRetryOnConflict(
  81. super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams,
  82. formParams, accept, contentType, authNames, returnType)
  83. );
  84. }
  85. @Override
  86. public <T> Flux<T> invokeFluxAPI(String path, HttpMethod method, Map<String, Object> pathParams,
  87. MultiValueMap<String, String> queryParams, Object body,
  88. HttpHeaders headerParams,
  89. MultiValueMap<String, String> cookieParams,
  90. MultiValueMap<String, Object> formParams,
  91. List<MediaType> accept, MediaType contentType,
  92. String[] authNames, ParameterizedTypeReference<T> returnType)
  93. throws RestClientException {
  94. return withRetryOnConflict(
  95. super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams,
  96. cookieParams, formParams, accept, contentType, authNames, returnType)
  97. );
  98. }
  99. }
  100. }