ReactiveFailover.java 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package com.provectus.kafka.ui.util;
  2. import com.google.common.base.Preconditions;
  3. import java.io.IOException;
  4. import java.time.Duration;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.concurrent.atomic.AtomicLong;
  8. import java.util.function.Function;
  9. import java.util.function.Predicate;
  10. import java.util.function.Supplier;
  11. import reactor.core.publisher.Flux;
  12. import reactor.core.publisher.Mono;
  13. public class ReactiveFailover<T> {
  14. public static final Duration DEFAULT_RETRY_GRACE_PERIOD_MS = Duration.ofSeconds(5);
  15. public static final Predicate<Throwable> CONNECTION_REFUSED_EXCEPTION_FILTER =
  16. error -> error.getCause() instanceof IOException && error.getCause().getMessage().contains("Connection refused");
  17. private final List<PublisherHolder<T>> publishers;
  18. private int currentIndex = 0;
  19. private final Predicate<Throwable> failoverExceptionsPredicate;
  20. private final String noAvailablePublishersMsg;
  21. // creates single-publisher failover (basically for tests usage)
  22. public static <T> ReactiveFailover<T> createNoop(T publisher) {
  23. return create(
  24. List.of(publisher),
  25. th -> true,
  26. "publisher is not available",
  27. DEFAULT_RETRY_GRACE_PERIOD_MS
  28. );
  29. }
  30. public static <T> ReactiveFailover<T> create(List<T> publishers,
  31. Predicate<Throwable> failoverExeptionsPredicate,
  32. String noAvailablePublishersMsg,
  33. Duration retryGracePeriodMs) {
  34. return new ReactiveFailover<>(
  35. publishers.stream().map(p -> new PublisherHolder<>(() -> p, retryGracePeriodMs.toMillis())).toList(),
  36. failoverExeptionsPredicate,
  37. noAvailablePublishersMsg
  38. );
  39. }
  40. public static <T, A> ReactiveFailover<T> create(List<A> args,
  41. Function<A, T> factory,
  42. Predicate<Throwable> failoverExeptionsPredicate,
  43. String noAvailablePublishersMsg,
  44. Duration retryGracePeriodMs) {
  45. return new ReactiveFailover<>(
  46. args.stream().map(arg ->
  47. new PublisherHolder<>(() -> factory.apply(arg), retryGracePeriodMs.toMillis())).toList(),
  48. failoverExeptionsPredicate,
  49. noAvailablePublishersMsg
  50. );
  51. }
  52. private ReactiveFailover(List<PublisherHolder<T>> publishers,
  53. Predicate<Throwable> failoverExceptionsPredicate,
  54. String noAvailablePublishersMsg) {
  55. Preconditions.checkArgument(!publishers.isEmpty());
  56. this.publishers = publishers;
  57. this.failoverExceptionsPredicate = failoverExceptionsPredicate;
  58. this.noAvailablePublishersMsg = noAvailablePublishersMsg;
  59. }
  60. public <V> Mono<V> mono(Function<T, Mono<V>> f) {
  61. List<PublisherHolder<T>> candidates = getActivePublishers();
  62. if (candidates.isEmpty()) {
  63. return Mono.error(() -> new IllegalStateException(noAvailablePublishersMsg));
  64. }
  65. return mono(f, candidates);
  66. }
  67. private <V> Mono<V> mono(Function<T, Mono<V>> f, List<PublisherHolder<T>> candidates) {
  68. var publisher = candidates.get(0);
  69. return publisher.get()
  70. .flatMap(f)
  71. .onErrorResume(failoverExceptionsPredicate, th -> {
  72. publisher.markFailed();
  73. if (candidates.size() == 1) {
  74. return Mono.error(th);
  75. }
  76. var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList();
  77. if (newCandidates.isEmpty()) {
  78. return Mono.error(th);
  79. }
  80. return mono(f, newCandidates);
  81. });
  82. }
  83. public <V> Flux<V> flux(Function<T, Flux<V>> f) {
  84. List<PublisherHolder<T>> candidates = getActivePublishers();
  85. if (candidates.isEmpty()) {
  86. return Flux.error(() -> new IllegalStateException(noAvailablePublishersMsg));
  87. }
  88. return flux(f, candidates);
  89. }
  90. private <V> Flux<V> flux(Function<T, Flux<V>> f, List<PublisherHolder<T>> candidates) {
  91. var publisher = candidates.get(0);
  92. return publisher.get()
  93. .flatMapMany(f)
  94. .onErrorResume(failoverExceptionsPredicate, th -> {
  95. publisher.markFailed();
  96. if (candidates.size() == 1) {
  97. return Flux.error(th);
  98. }
  99. var newCandidates = candidates.stream().skip(1).filter(PublisherHolder::isActive).toList();
  100. if (newCandidates.isEmpty()) {
  101. return Flux.error(th);
  102. }
  103. return flux(f, newCandidates);
  104. });
  105. }
  106. /**
  107. * Returns list of active publishers, starting with latest active.
  108. */
  109. private synchronized List<PublisherHolder<T>> getActivePublishers() {
  110. var result = new ArrayList<PublisherHolder<T>>();
  111. for (int i = 0, j = currentIndex; i < publishers.size(); i++) {
  112. var publisher = publishers.get(j);
  113. if (publisher.isActive()) {
  114. result.add(publisher);
  115. } else if (currentIndex == j) {
  116. currentIndex = ++currentIndex == publishers.size() ? 0 : currentIndex;
  117. }
  118. j = ++j == publishers.size() ? 0 : j;
  119. }
  120. return result;
  121. }
  122. static class PublisherHolder<T> {
  123. private final long retryGracePeriodMs;
  124. private final Supplier<T> supplier;
  125. private final AtomicLong lastErrorTs = new AtomicLong();
  126. private T publisherInstance;
  127. PublisherHolder(Supplier<T> supplier, long retryGracePeriodMs) {
  128. this.supplier = supplier;
  129. this.retryGracePeriodMs = retryGracePeriodMs;
  130. }
  131. synchronized Mono<T> get() {
  132. if (publisherInstance == null) {
  133. try {
  134. publisherInstance = supplier.get();
  135. } catch (Throwable th) {
  136. return Mono.error(th);
  137. }
  138. }
  139. return Mono.just(publisherInstance);
  140. }
  141. void markFailed() {
  142. lastErrorTs.set(System.currentTimeMillis());
  143. }
  144. boolean isActive() {
  145. return System.currentTimeMillis() - lastErrorTs.get() > retryGracePeriodMs;
  146. }
  147. }
  148. }