diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index 71ff6aa5b2..f13865d805 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.CompatibilityLevelDTO; import com.provectus.kafka.ui.model.ConfigSourceDTO; import com.provectus.kafka.ui.model.ConfigSynonymDTO; import com.provectus.kafka.ui.model.ConnectDTO; +import com.provectus.kafka.ui.model.FailoverUrlList; import com.provectus.kafka.ui.model.Feature; import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; @@ -97,8 +98,8 @@ public interface ClusterMapper { internalSchemaRegistry.url( clusterProperties.getSchemaRegistry() != null - ? Arrays.asList(clusterProperties.getSchemaRegistry().split(",")) - : Collections.emptyList() + ? new FailoverUrlList(Arrays.asList(clusterProperties.getSchemaRegistry().split(","))) + : new FailoverUrlList(Collections.emptyList()) ); if (clusterProperties.getSchemaRegistryAuth() != null) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/FailoverUrlList.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/FailoverUrlList.java new file mode 100644 index 0000000000..1a760ba339 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/FailoverUrlList.java @@ -0,0 +1,59 @@ +package com.provectus.kafka.ui.model; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.experimental.Delegate; + +public class FailoverUrlList { + + public static final int DEFAULT_RETRY_GRACE_PERIOD_IN_MS = 5000; + + private final Map failures = new ConcurrentHashMap<>(); + private final AtomicInteger index = new AtomicInteger(0); + @Delegate + private final List urls; + private final int retryGracePeriodInMs; + + public FailoverUrlList(List urls) { + this(urls, DEFAULT_RETRY_GRACE_PERIOD_IN_MS); + } + + public FailoverUrlList(List urls, int retryGracePeriodInMs) { + if (urls != null && !urls.isEmpty()) { + this.urls = new ArrayList<>(urls); + } else { + throw new IllegalArgumentException("Expected at least one URL to be passed in constructor"); + } + this.retryGracePeriodInMs = retryGracePeriodInMs; + } + + public String current() { + return this.urls.get(this.index.get()); + } + + public void fail(String url) { + int currentIndex = this.index.get(); + if ((this.urls.get(currentIndex)).equals(url)) { + this.failures.put(currentIndex, Instant.now()); + this.index.compareAndSet(currentIndex, (currentIndex + 1) % this.urls.size()); + } + } + + public boolean isFailoverAvailable() { + var now = Instant.now(); + return this.urls.size() > this.failures.size() + || this.failures + .values() + .stream() + .anyMatch(e -> now.isAfter(e.plusMillis(retryGracePeriodInMs))); + } + + @Override + public String toString() { + return this.urls.toString(); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java index 5516c05a6d..1115bac105 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalSchemaRegistry.java @@ -1,6 +1,5 @@ package com.provectus.kafka.ui.model; -import java.util.List; import lombok.Builder; import lombok.Data; @@ -9,10 +8,21 @@ import lombok.Data; public class InternalSchemaRegistry { private final String username; private final String password; - private final List url; + private final FailoverUrlList url; - public String getFirstUrl() { - return url != null && !url.isEmpty() ? url.iterator().next() : null; + public String getPrimaryNodeUri() { + return url.get(0); } + public String getUri() { + return url.current(); + } + + public void markAsUnavailable(String url) { + this.url.fail(url); + } + + public boolean isFailoverAvailable() { + return this.url.isFailoverAvailable(); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java index 4a903598c4..8f1bf5d35c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -71,7 +72,10 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { "You specified password but do not specified username"); } return new CachedSchemaRegistryClient( - cluster.getSchemaRegistry().getUrl(), + cluster.getSchemaRegistry() + .getUrl() + .stream() + .collect(Collectors.toUnmodifiableList()), 1_000, schemaProviders, configs @@ -224,7 +228,7 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { private String convertSchema(SchemaMetadata schema) { String jsonSchema; - URI basePath = new URI(cluster.getSchemaRegistry().getFirstUrl()) + URI basePath = new URI(cluster.getSchemaRegistry().getPrimaryNodeUri()) .resolve(Integer.toString(schema.getId())); final ParsedSchema schemaById = schemaRegistryClient.getSchemaById(schema.getId()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java index 8e04b1ebac..6669d9d2b2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/SchemaRegistryService.java @@ -21,6 +21,7 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel; import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema; import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse; +import java.io.IOException; import java.net.URI; import java.util.Collections; import java.util.Formatter; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -42,6 +44,7 @@ import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientRequestException; import org.springframework.web.util.UriComponentsBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -79,7 +82,9 @@ public class SchemaRegistryService { URL_SUBJECTS) .retrieve() .bodyToMono(String[].class) - .doOnError(e -> log.error("Unexpected error", e)); + .doOnError(e -> log.error("Unexpected error", e)) + .as(m -> failoverAble(m, + new FailoverMono<>(cluster.getSchemaRegistry(), () -> this.getAllSubjectNames(cluster)))); } public Flux getAllVersionsBySubject(KafkaCluster cluster, String subject) { @@ -96,7 +101,9 @@ public class SchemaRegistryService { .retrieve() .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) - .bodyToFlux(Integer.class); + .bodyToFlux(Integer.class) + .as(f -> failoverAble(f, new FailoverFlux<>(cluster.getSchemaRegistry(), + () -> this.getSubjectVersions(cluster, schemaName)))); } public Mono getSchemaSubjectByVersion(KafkaCluster cluster, String schemaName, @@ -114,7 +121,7 @@ public class SchemaRegistryService { return configuredWebClient( cluster, HttpMethod.GET, - URL_SUBJECT_BY_VERSION, + SchemaRegistryService.URL_SUBJECT_BY_VERSION, List.of(schemaName, version)) .retrieve() .onStatus(NOT_FOUND::equals, @@ -128,7 +135,9 @@ public class SchemaRegistryService { String compatibilityLevel = tuple.getT2().getCompatibility().getValue(); schema.setCompatibilityLevel(compatibilityLevel); return schema; - }); + }) + .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), + () -> this.getSchemaSubject(cluster, schemaName, version)))); } /** @@ -154,16 +163,18 @@ public class SchemaRegistryService { private Mono deleteSchemaSubject(KafkaCluster cluster, String schemaName, String version) { return configuredWebClient( - cluster, - HttpMethod.DELETE, - URL_SUBJECT_BY_VERSION, - List.of(schemaName, version)) - .retrieve() - .onStatus(NOT_FOUND::equals, - throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) - ) - .toBodilessEntity() - .then(); + cluster, + HttpMethod.DELETE, + SchemaRegistryService.URL_SUBJECT_BY_VERSION, + List.of(schemaName, version)) + .retrieve() + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version)) + ) + .toBodilessEntity() + .then() + .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), + () -> this.deleteSchemaSubject(cluster, schemaName, version)))); } public Mono deleteSchemaSubjectEntirely(KafkaCluster cluster, @@ -176,7 +187,9 @@ public class SchemaRegistryService { .retrieve() .onStatus(HttpStatus::isError, errorOnSchemaDeleteFailure(schemaName)) .toBodilessEntity() - .then(); + .then() + .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), + () -> this.deleteSchemaSubjectEntirely(cluster, schemaName)))); } /** @@ -202,19 +215,20 @@ public class SchemaRegistryService { Mono newSchemaSubject, KafkaCluster cluster) { return configuredWebClient( - cluster, - HttpMethod.POST, - URL_SUBJECT_VERSIONS, - subject) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) - .retrieve() - .onStatus(UNPROCESSABLE_ENTITY::equals, - r -> r.bodyToMono(ErrorResponse.class) - .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage()) - ? new SchemaTypeNotSupportedException() - : new UnprocessableEntityException(x.getMessage())))) - .bodyToMono(SubjectIdResponse.class); + cluster, + HttpMethod.POST, + URL_SUBJECT_VERSIONS, subject) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class)) + .retrieve() + .onStatus(UNPROCESSABLE_ENTITY::equals, + r -> r.bodyToMono(ErrorResponse.class) + .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage()) + ? new SchemaTypeNotSupportedException() + : new UnprocessableEntityException(x.getMessage())))) + .bodyToMono(SubjectIdResponse.class) + .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), + () -> submitNewSchema(subject, newSchemaSubject, cluster)))); } @NotNull @@ -233,16 +247,18 @@ public class SchemaRegistryService { Mono compatibilityLevel) { String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}"; return configuredWebClient( - cluster, - HttpMethod.PUT, - configEndpoint, + cluster, + HttpMethod.PUT, + configEndpoint, schemaName) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevelDTO.class)) - .retrieve() - .onStatus(NOT_FOUND::equals, - throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) - .bodyToMono(Void.class); + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevelDTO.class)) + .retrieve() + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) + .bodyToMono(Void.class) + .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), + () -> this.updateSchemaCompatibility(cluster, schemaName, compatibilityLevel)))); } public Mono updateSchemaCompatibility(KafkaCluster cluster, @@ -280,17 +296,19 @@ public class SchemaRegistryService { public Mono checksSchemaCompatibility( KafkaCluster cluster, String schemaName, Mono newSchemaSubject) { return configuredWebClient( - cluster, - HttpMethod.POST, - "/compatibility/subjects/{schemaName}/versions/latest", + cluster, + HttpMethod.POST, + "/compatibility/subjects/{schemaName}/versions/latest", schemaName) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubjectDTO.class)) - .retrieve() - .onStatus(NOT_FOUND::equals, - throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) - .bodyToMono(InternalCompatibilityCheck.class) - .map(mapper::toCompatibilityCheckResponse); + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubjectDTO.class)) + .retrieve() + .onStatus(NOT_FOUND::equals, + throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))) + .bodyToMono(InternalCompatibilityCheck.class) + .map(mapper::toCompatibilityCheckResponse) + .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(), + () -> this.checksSchemaCompatibility(cluster, schemaName, newSchemaSubject)))); } public String formatted(String str, Object... args) { @@ -318,7 +336,8 @@ public class SchemaRegistryService { return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE); } - private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method, String uri) { + private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method, + String uri) { return configuredWebClient(cluster, method, uri, Collections.emptyList(), new LinkedMultiValueMap<>()); } @@ -335,20 +354,20 @@ public class SchemaRegistryService { } private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, - HttpMethod method, String uri, + HttpMethod method, String path, List uriVariables, MultiValueMap queryParams) { final var schemaRegistry = cluster.getSchemaRegistry(); return webClient .method(method) - .uri(buildUri(schemaRegistry, uri, uriVariables, queryParams)) + .uri(buildUri(schemaRegistry, path, uriVariables, queryParams)) .headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers)); } - private URI buildUri(InternalSchemaRegistry schemaRegistry, String uri, List uriVariables, + private URI buildUri(InternalSchemaRegistry schemaRegistry, String path, List uriVariables, MultiValueMap queryParams) { final var builder = UriComponentsBuilder - .fromHttpUrl(schemaRegistry.getFirstUrl() + uri); + .fromHttpUrl(schemaRegistry.getUri() + path); builder.queryParams(queryParams); return builder.buildAndExpand(uriVariables.toArray()).toUri(); } @@ -361,4 +380,59 @@ public class SchemaRegistryService { return Mono.error(new SchemaFailedToDeleteException(schemaName)); }; } + + private Mono failoverAble(Mono request, FailoverMono failoverMethod) { + return request.onErrorResume(failoverMethod::failover); + } + + private Flux failoverAble(Flux request, FailoverFlux failoverMethod) { + return request.onErrorResume(failoverMethod::failover); + } + + private abstract static class Failover { + private final InternalSchemaRegistry schemaRegistry; + private final Supplier failover; + + private Failover(InternalSchemaRegistry schemaRegistry, Supplier failover) { + this.schemaRegistry = Objects.requireNonNull(schemaRegistry); + this.failover = Objects.requireNonNull(failover); + } + + abstract E error(Throwable error); + + public E failover(Throwable error) { + if (error instanceof WebClientRequestException + && error.getCause() instanceof IOException + && schemaRegistry.isFailoverAvailable()) { + var uri = ((WebClientRequestException) error).getUri(); + schemaRegistry.markAsUnavailable(String.format("%s://%s", uri.getScheme(), uri.getAuthority())); + return failover.get(); + } + return error(error); + } + } + + private static class FailoverMono extends Failover> { + + private FailoverMono(InternalSchemaRegistry schemaRegistry, Supplier> failover) { + super(schemaRegistry, failover); + } + + @Override + Mono error(Throwable error) { + return Mono.error(error); + } + } + + private static class FailoverFlux extends Failover> { + + private FailoverFlux(InternalSchemaRegistry schemaRegistry, Supplier> failover) { + super(schemaRegistry, failover); + } + + @Override + Flux error(Throwable error) { + return Flux.error(error); + } + } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java index b98f4a0d00..51386c3380 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java @@ -17,6 +17,7 @@ import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ContextConfiguration; +import org.springframework.util.SocketUtils; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.utility.DockerImageName; @@ -59,7 +60,9 @@ public abstract class AbstractIntegrationTest { public void initialize(@NotNull ConfigurableApplicationContext context) { System.setProperty("kafka.clusters.0.name", LOCAL); System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers()); - System.setProperty("kafka.clusters.0.schemaRegistry", schemaRegistry.getUrl()); + // List unavailable hosts to verify failover + System.setProperty("kafka.clusters.0.schemaRegistry", String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s", + SocketUtils.findAvailableTcpPort(), schemaRegistry.getUrl())); System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect"); System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget()); diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/FailoverUrlListTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/FailoverUrlListTest.java new file mode 100644 index 0000000000..5cbbaf7353 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/model/FailoverUrlListTest.java @@ -0,0 +1,69 @@ +package com.provectus.kafka.ui.model; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + + +class FailoverUrlListTest { + + public static final int RETRY_GRACE_PERIOD_IN_MS = 10; + + @Nested + @SuppressWarnings("all") + class ShouldHaveFailoverAvailableWhen { + + private FailoverUrlList failoverUrlList; + + @BeforeEach + void before() { + failoverUrlList = new FailoverUrlList(List.of("localhost:123", "farawayhost:5678"), RETRY_GRACE_PERIOD_IN_MS); + } + + @Test + void thereAreNoFailures() { + assertThat(failoverUrlList.isFailoverAvailable()).isTrue(); + } + + @Test + void withLessFailuresThenAvailableUrls() { + failoverUrlList.fail(failoverUrlList.current()); + + assertThat(failoverUrlList.isFailoverAvailable()).isTrue(); + } + + @Test + void withAllFailuresAndAtLeastOneAfterTheGraceTimeoutPeriod() throws InterruptedException { + failoverUrlList.fail(failoverUrlList.current()); + failoverUrlList.fail(failoverUrlList.current()); + + Thread.sleep(RETRY_GRACE_PERIOD_IN_MS + 1); + + assertThat(failoverUrlList.isFailoverAvailable()).isTrue(); + } + + @Nested + @SuppressWarnings("all") + class ShouldNotHaveFailoverAvailableWhen { + + private FailoverUrlList failoverUrlList; + + @BeforeEach + void before() { + failoverUrlList = new FailoverUrlList(List.of("localhost:123", "farawayhost:5678"), 1000); + } + + @Test + void allFailuresWithinGracePeriod() { + failoverUrlList.fail(failoverUrlList.current()); + failoverUrlList.fail(failoverUrlList.current()); + + assertThat(failoverUrlList.isFailoverAvailable()).isFalse(); + } + } + } +} +