|
@@ -18,7 +18,7 @@ import org.springframework.web.client.RestClientException;
|
|
|
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
-import reactor.retry.Retry;
|
|
|
+import reactor.util.retry.Retry;
|
|
|
|
|
|
@Log4j2
|
|
|
public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
|
|
@@ -30,9 +30,7 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
|
|
|
|
|
|
private static <T> Mono<T> withRetryOnConflict(Mono<T> publisher) {
|
|
|
return publisher.retryWhen(
|
|
|
- Retry.onlyIf(e -> e.exception() instanceof WebClientResponseException.Conflict)
|
|
|
- .retryMax(MAX_RETRIES)
|
|
|
- )
|
|
|
+ Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict))
|
|
|
.onErrorResume(WebClientResponseException.Conflict.class,
|
|
|
e -> Mono.error(new RebalanceInProgressException()))
|
|
|
.doOnError(log::error);
|
|
@@ -40,9 +38,7 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
|
|
|
|
|
|
private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
|
|
|
return publisher.retryWhen(
|
|
|
- Retry.onlyIf(e -> e.exception() instanceof WebClientResponseException.Conflict)
|
|
|
- .retryMax(MAX_RETRIES)
|
|
|
- )
|
|
|
+ Retry.max(MAX_RETRIES).filter(e -> e instanceof WebClientResponseException.Conflict))
|
|
|
.onErrorResume(WebClientResponseException.Conflict.class,
|
|
|
e -> Mono.error(new RebalanceInProgressException()))
|
|
|
.doOnError(log::error);
|