RetryingKafkaConnectClient.java 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package com.provectus.kafka.ui.client;
  2. import com.provectus.kafka.ui.connect.ApiClient;
  3. import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
  4. import com.provectus.kafka.ui.connect.model.Connector;
  5. import com.provectus.kafka.ui.connect.model.NewConnector;
  6. import com.provectus.kafka.ui.exception.RebalanceInProgressException;
  7. import com.provectus.kafka.ui.exception.ValidationException;
  8. import java.util.List;
  9. import java.util.Map;
  10. import lombok.extern.log4j.Log4j2;
  11. import org.springframework.core.ParameterizedTypeReference;
  12. import org.springframework.http.HttpHeaders;
  13. import org.springframework.http.HttpMethod;
  14. import org.springframework.http.MediaType;
  15. import org.springframework.util.MultiValueMap;
  16. import org.springframework.web.client.RestClientException;
  17. import org.springframework.web.reactive.function.client.WebClientResponseException;
  18. import reactor.core.publisher.Flux;
  19. import reactor.core.publisher.Mono;
  20. import reactor.util.retry.Retry;
  21. @Log4j2
  22. public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
  23. private static final int MAX_RETRIES = 5;
  24. public RetryingKafkaConnectClient(String basePath) {
  25. super(new RetryingApiClient().setBasePath(basePath));
  26. }
  27. private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
  28. return publisher.retryWhen(
  29. Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict))
  30. .onErrorResume(WebClientResponseException.Conflict.class,
  31. e -> Mono.error(new RebalanceInProgressException()))
  32. .doOnError(log::error);
  33. }
  34. private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
  35. return publisher.retryWhen(
  36. Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict))
  37. .onErrorResume(WebClientResponseException.Conflict.class,
  38. e -> Mono.error(new RebalanceInProgressException()))
  39. .doOnError(log::error);
  40. }
  41. private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
  42. return publisher
  43. .onErrorResume(WebClientResponseException.BadRequest.class, e ->
  44. Mono.error(new ValidationException("Invalid configuration")))
  45. .onErrorResume(WebClientResponseException.InternalServerError.class, e ->
  46. Mono.error(new ValidationException("Invalid configuration")));
  47. }
  48. @Override
  49. public Mono<Connector> createConnector(NewConnector newConnector) throws RestClientException {
  50. return withBadRequestErrorHandling(
  51. super.createConnector(newConnector)
  52. );
  53. }
  54. @Override
  55. public Mono<Connector> setConnectorConfig(String connectorName, Map<String, Object> requestBody)
  56. throws RestClientException {
  57. return withBadRequestErrorHandling(
  58. super.setConnectorConfig(connectorName, requestBody)
  59. );
  60. }
  61. private static class RetryingApiClient extends ApiClient {
  62. @Override
  63. public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
  64. MultiValueMap<String, String> queryParams, Object body,
  65. HttpHeaders headerParams,
  66. MultiValueMap<String, String> cookieParams,
  67. MultiValueMap<String, Object> formParams, List<MediaType> accept,
  68. MediaType contentType, String[] authNames,
  69. ParameterizedTypeReference<T> returnType)
  70. throws RestClientException {
  71. return withRetryOnConflict(
  72. super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams,
  73. formParams, accept, contentType, authNames, returnType)
  74. );
  75. }
  76. @Override
  77. public <T> Flux<T> invokeFluxAPI(String path, HttpMethod method, Map<String, Object> pathParams,
  78. MultiValueMap<String, String> queryParams, Object body,
  79. HttpHeaders headerParams,
  80. MultiValueMap<String, String> cookieParams,
  81. MultiValueMap<String, Object> formParams,
  82. List<MediaType> accept, MediaType contentType,
  83. String[] authNames, ParameterizedTypeReference<T> returnType)
  84. throws RestClientException {
  85. return withRetryOnConflict(
  86. super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams,
  87. cookieParams, formParams, accept, contentType, authNames, returnType)
  88. );
  89. }
  90. }
  91. }