package com.provectus.kafka.ui.util; import com.google.common.base.Preconditions; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class ReactiveFailover { public static final Duration DEFAULT_RETRY_GRACE_PERIOD_MS = Duration.ofSeconds(5); public static final Predicate CONNECTION_REFUSED_EXCEPTION_FILTER = error -> error.getCause() instanceof IOException && error.getCause().getMessage().contains("Connection refused"); private final List> publishers; private int currentIndex = 0; private final Predicate failoverExceptionsPredicate; private final String noAvailablePublishersMsg; // creates single-publisher failover (basically for tests usage) public static ReactiveFailover createNoop(T publisher) { return create( List.of(publisher), th -> true, "publisher is not available", DEFAULT_RETRY_GRACE_PERIOD_MS ); } public static ReactiveFailover create(List publishers, Predicate failoverExeptionsPredicate, String noAvailablePublishersMsg, Duration retryGracePeriodMs) { return new ReactiveFailover<>( publishers.stream().map(p -> new PublisherHolder<>(() -> p, retryGracePeriodMs.toMillis())).toList(), failoverExeptionsPredicate, noAvailablePublishersMsg ); } public static ReactiveFailover create(List args, Function factory, Predicate failoverExeptionsPredicate, String noAvailablePublishersMsg, Duration retryGracePeriodMs) { return new ReactiveFailover<>( args.stream().map(arg -> new PublisherHolder<>(() -> factory.apply(arg), retryGracePeriodMs.toMillis())).toList(), failoverExeptionsPredicate, noAvailablePublishersMsg ); } private ReactiveFailover(List> publishers, Predicate failoverExceptionsPredicate, String noAvailablePublishersMsg) { Preconditions.checkArgument(!publishers.isEmpty()); this.publishers = publishers; this.failoverExceptionsPredicate = failoverExceptionsPredicate; this.noAvailablePublishersMsg = noAvailablePublishersMsg; } public Mono mono(Function> f) { List> candidates = getActivePublishers(); if (candidates.isEmpty()) { return Mono.error(() -> new IllegalStateException(noAvailablePublishersMsg)); } return mono(f, candidates); } private Mono mono(Function> f, List> candidates) { var publisher = candidates.get(0); return publisher.get() .flatMap(f) .onErrorResume(failoverExceptionsPredicate, th -> { publisher.markFailed(); if (candidates.size() == 1) { return Mono.error(th); } var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList(); if (newCandidates.isEmpty()) { return Mono.error(th); } return mono(f, newCandidates); }); } public Flux flux(Function> f) { List> candidates = getActivePublishers(); if (candidates.isEmpty()) { return Flux.error(() -> new IllegalStateException(noAvailablePublishersMsg)); } return flux(f, candidates); } private Flux flux(Function> f, List> candidates) { var publisher = candidates.get(0); return publisher.get() .flatMapMany(f) .onErrorResume(failoverExceptionsPredicate, th -> { publisher.markFailed(); if (candidates.size() == 1) { return Flux.error(th); } var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList(); if (newCandidates.isEmpty()) { return Flux.error(th); } return flux(f, newCandidates); }); } /** * Returns list of active publishers, starting with latest active. */ private synchronized List> getActivePublishers() { var result = new ArrayList>(); for (int i = 0, j = currentIndex; i < publishers.size(); i++) { var publisher = publishers.get(j); if (publisher.isActive()) { result.add(publisher); } else if (currentIndex == j) { currentIndex = ++currentIndex == publishers.size() ? 0 : currentIndex; } j = ++j == publishers.size() ? 0 : j; } return result; } static class PublisherHolder { private final long retryGracePeriodMs; private final Supplier supplier; private final AtomicLong lastErrorTs = new AtomicLong(); private T publisherInstance; PublisherHolder(Supplier supplier, long retryGracePeriodMs) { this.supplier = supplier; this.retryGracePeriodMs = retryGracePeriodMs; } synchronized Mono get() { if (publisherInstance == null) { try { publisherInstance = supplier.get(); } catch (Throwable th) { return Mono.error(th); } } return Mono.just(publisherInstance); } void markFailed() { lastErrorTs.set(System.currentTimeMillis()); } boolean isActive() { return System.currentTimeMillis() - lastErrorTs.get() > retryGracePeriodMs; } } }