|
@@ -21,6 +21,7 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
|
|
|
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
|
|
|
import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema;
|
|
|
import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse;
|
|
|
+import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Formatter;
|
|
@@ -28,6 +29,7 @@ import java.util.List;
|
|
|
import java.util.Objects;
|
|
|
import java.util.Optional;
|
|
|
import java.util.function.Function;
|
|
|
+import java.util.function.Supplier;
|
|
|
import java.util.stream.Collectors;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -42,6 +44,7 @@ import org.springframework.util.MultiValueMap;
|
|
|
import org.springframework.web.reactive.function.BodyInserters;
|
|
|
import org.springframework.web.reactive.function.client.ClientResponse;
|
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
|
+import org.springframework.web.reactive.function.client.WebClientRequestException;
|
|
|
import org.springframework.web.util.UriComponentsBuilder;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
@@ -79,7 +82,9 @@ public class SchemaRegistryService {
|
|
|
URL_SUBJECTS)
|
|
|
.retrieve()
|
|
|
.bodyToMono(String[].class)
|
|
|
- .doOnError(e -> log.error("Unexpected error", e));
|
|
|
+ .doOnError(e -> log.error("Unexpected error", e))
|
|
|
+ .as(m -> failoverAble(m,
|
|
|
+ new FailoverMono<>(cluster.getSchemaRegistry(), () -> this.getAllSubjectNames(cluster))));
|
|
|
}
|
|
|
|
|
|
public Flux<SchemaSubjectDTO> getAllVersionsBySubject(KafkaCluster cluster, String subject) {
|
|
@@ -96,7 +101,9 @@ public class SchemaRegistryService {
|
|
|
.retrieve()
|
|
|
.onStatus(NOT_FOUND::equals,
|
|
|
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
|
|
|
- .bodyToFlux(Integer.class);
|
|
|
+ .bodyToFlux(Integer.class)
|
|
|
+ .as(f -> failoverAble(f, new FailoverFlux<>(cluster.getSchemaRegistry(),
|
|
|
+ () -> this.getSubjectVersions(cluster, schemaName))));
|
|
|
}
|
|
|
|
|
|
public Mono<SchemaSubjectDTO> getSchemaSubjectByVersion(KafkaCluster cluster, String schemaName,
|
|
@@ -114,7 +121,7 @@ public class SchemaRegistryService {
|
|
|
return configuredWebClient(
|
|
|
cluster,
|
|
|
HttpMethod.GET,
|
|
|
- URL_SUBJECT_BY_VERSION,
|
|
|
+ SchemaRegistryService.URL_SUBJECT_BY_VERSION,
|
|
|
List.of(schemaName, version))
|
|
|
.retrieve()
|
|
|
.onStatus(NOT_FOUND::equals,
|
|
@@ -128,7 +135,9 @@ public class SchemaRegistryService {
|
|
|
String compatibilityLevel = tuple.getT2().getCompatibility().getValue();
|
|
|
schema.setCompatibilityLevel(compatibilityLevel);
|
|
|
return schema;
|
|
|
- });
|
|
|
+ })
|
|
|
+ .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
+ () -> this.getSchemaSubject(cluster, schemaName, version))));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -154,16 +163,18 @@ public class SchemaRegistryService {
|
|
|
private Mono<Void> deleteSchemaSubject(KafkaCluster cluster, String schemaName,
|
|
|
String version) {
|
|
|
return configuredWebClient(
|
|
|
- cluster,
|
|
|
- HttpMethod.DELETE,
|
|
|
- URL_SUBJECT_BY_VERSION,
|
|
|
- List.of(schemaName, version))
|
|
|
- .retrieve()
|
|
|
- .onStatus(NOT_FOUND::equals,
|
|
|
- throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
|
|
|
- )
|
|
|
- .toBodilessEntity()
|
|
|
- .then();
|
|
|
+ cluster,
|
|
|
+ HttpMethod.DELETE,
|
|
|
+ SchemaRegistryService.URL_SUBJECT_BY_VERSION,
|
|
|
+ List.of(schemaName, version))
|
|
|
+ .retrieve()
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
|
|
|
+ )
|
|
|
+ .toBodilessEntity()
|
|
|
+ .then()
|
|
|
+ .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
+ () -> this.deleteSchemaSubject(cluster, schemaName, version))));
|
|
|
}
|
|
|
|
|
|
public Mono<Void> deleteSchemaSubjectEntirely(KafkaCluster cluster,
|
|
@@ -176,7 +187,9 @@ public class SchemaRegistryService {
|
|
|
.retrieve()
|
|
|
.onStatus(HttpStatus::isError, errorOnSchemaDeleteFailure(schemaName))
|
|
|
.toBodilessEntity()
|
|
|
- .then();
|
|
|
+ .then()
|
|
|
+ .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
+ () -> this.deleteSchemaSubjectEntirely(cluster, schemaName))));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -202,19 +215,20 @@ public class SchemaRegistryService {
|
|
|
Mono<InternalNewSchema> newSchemaSubject,
|
|
|
KafkaCluster cluster) {
|
|
|
return configuredWebClient(
|
|
|
- cluster,
|
|
|
- HttpMethod.POST,
|
|
|
- URL_SUBJECT_VERSIONS,
|
|
|
- subject)
|
|
|
- .contentType(MediaType.APPLICATION_JSON)
|
|
|
- .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
|
|
|
- .retrieve()
|
|
|
- .onStatus(UNPROCESSABLE_ENTITY::equals,
|
|
|
- r -> r.bodyToMono(ErrorResponse.class)
|
|
|
- .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage())
|
|
|
- ? new SchemaTypeNotSupportedException()
|
|
|
- : new UnprocessableEntityException(x.getMessage()))))
|
|
|
- .bodyToMono(SubjectIdResponse.class);
|
|
|
+ cluster,
|
|
|
+ HttpMethod.POST,
|
|
|
+ URL_SUBJECT_VERSIONS, subject)
|
|
|
+ .contentType(MediaType.APPLICATION_JSON)
|
|
|
+ .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
|
|
|
+ .retrieve()
|
|
|
+ .onStatus(UNPROCESSABLE_ENTITY::equals,
|
|
|
+ r -> r.bodyToMono(ErrorResponse.class)
|
|
|
+ .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage())
|
|
|
+ ? new SchemaTypeNotSupportedException()
|
|
|
+ : new UnprocessableEntityException(x.getMessage()))))
|
|
|
+ .bodyToMono(SubjectIdResponse.class)
|
|
|
+ .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
+ () -> submitNewSchema(subject, newSchemaSubject, cluster))));
|
|
|
}
|
|
|
|
|
|
@NotNull
|
|
@@ -233,16 +247,18 @@ public class SchemaRegistryService {
|
|
|
Mono<CompatibilityLevelDTO> compatibilityLevel) {
|
|
|
String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
|
|
|
return configuredWebClient(
|
|
|
- cluster,
|
|
|
- HttpMethod.PUT,
|
|
|
- configEndpoint,
|
|
|
+ cluster,
|
|
|
+ HttpMethod.PUT,
|
|
|
+ configEndpoint,
|
|
|
schemaName)
|
|
|
- .contentType(MediaType.APPLICATION_JSON)
|
|
|
- .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevelDTO.class))
|
|
|
- .retrieve()
|
|
|
- .onStatus(NOT_FOUND::equals,
|
|
|
- throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
|
|
|
- .bodyToMono(Void.class);
|
|
|
+ .contentType(MediaType.APPLICATION_JSON)
|
|
|
+ .body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevelDTO.class))
|
|
|
+ .retrieve()
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
|
|
|
+ .bodyToMono(Void.class)
|
|
|
+ .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
+ () -> this.updateSchemaCompatibility(cluster, schemaName, compatibilityLevel))));
|
|
|
}
|
|
|
|
|
|
public Mono<Void> updateSchemaCompatibility(KafkaCluster cluster,
|
|
@@ -280,17 +296,19 @@ public class SchemaRegistryService {
|
|
|
public Mono<CompatibilityCheckResponseDTO> checksSchemaCompatibility(
|
|
|
KafkaCluster cluster, String schemaName, Mono<NewSchemaSubjectDTO> newSchemaSubject) {
|
|
|
return configuredWebClient(
|
|
|
- cluster,
|
|
|
- HttpMethod.POST,
|
|
|
- "/compatibility/subjects/{schemaName}/versions/latest",
|
|
|
+ cluster,
|
|
|
+ HttpMethod.POST,
|
|
|
+ "/compatibility/subjects/{schemaName}/versions/latest",
|
|
|
schemaName)
|
|
|
- .contentType(MediaType.APPLICATION_JSON)
|
|
|
- .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubjectDTO.class))
|
|
|
- .retrieve()
|
|
|
- .onStatus(NOT_FOUND::equals,
|
|
|
- throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
|
|
|
- .bodyToMono(InternalCompatibilityCheck.class)
|
|
|
- .map(mapper::toCompatibilityCheckResponse);
|
|
|
+ .contentType(MediaType.APPLICATION_JSON)
|
|
|
+ .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubjectDTO.class))
|
|
|
+ .retrieve()
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
|
|
|
+ .bodyToMono(InternalCompatibilityCheck.class)
|
|
|
+ .map(mapper::toCompatibilityCheckResponse)
|
|
|
+ .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
+ () -> this.checksSchemaCompatibility(cluster, schemaName, newSchemaSubject))));
|
|
|
}
|
|
|
|
|
|
public String formatted(String str, Object... args) {
|
|
@@ -318,7 +336,8 @@ public class SchemaRegistryService {
|
|
|
return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE);
|
|
|
}
|
|
|
|
|
|
- private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method, String uri) {
|
|
|
+ private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
|
|
|
+ String uri) {
|
|
|
return configuredWebClient(cluster, method, uri, Collections.emptyList(),
|
|
|
new LinkedMultiValueMap<>());
|
|
|
}
|
|
@@ -335,20 +354,20 @@ public class SchemaRegistryService {
|
|
|
}
|
|
|
|
|
|
private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster,
|
|
|
- HttpMethod method, String uri,
|
|
|
+ HttpMethod method, String path,
|
|
|
List<String> uriVariables,
|
|
|
MultiValueMap<String, String> queryParams) {
|
|
|
final var schemaRegistry = cluster.getSchemaRegistry();
|
|
|
return webClient
|
|
|
.method(method)
|
|
|
- .uri(buildUri(schemaRegistry, uri, uriVariables, queryParams))
|
|
|
+ .uri(buildUri(schemaRegistry, path, uriVariables, queryParams))
|
|
|
.headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers));
|
|
|
}
|
|
|
|
|
|
- private URI buildUri(InternalSchemaRegistry schemaRegistry, String uri, List<String> uriVariables,
|
|
|
+ private URI buildUri(InternalSchemaRegistry schemaRegistry, String path, List<String> uriVariables,
|
|
|
MultiValueMap<String, String> queryParams) {
|
|
|
final var builder = UriComponentsBuilder
|
|
|
- .fromHttpUrl(schemaRegistry.getFirstUrl() + uri);
|
|
|
+ .fromHttpUrl(schemaRegistry.getUri() + path);
|
|
|
builder.queryParams(queryParams);
|
|
|
return builder.buildAndExpand(uriVariables.toArray()).toUri();
|
|
|
}
|
|
@@ -361,4 +380,59 @@ public class SchemaRegistryService {
|
|
|
return Mono.error(new SchemaFailedToDeleteException(schemaName));
|
|
|
};
|
|
|
}
|
|
|
+
|
|
|
+ private <T> Mono<T> failoverAble(Mono<T> request, FailoverMono<T> failoverMethod) {
|
|
|
+ return request.onErrorResume(failoverMethod::failover);
|
|
|
+ }
|
|
|
+
|
|
|
+ private <T> Flux<T> failoverAble(Flux<T> request, FailoverFlux<T> failoverMethod) {
|
|
|
+ return request.onErrorResume(failoverMethod::failover);
|
|
|
+ }
|
|
|
+
|
|
|
+ private abstract static class Failover<E> {
|
|
|
+ private final InternalSchemaRegistry schemaRegistry;
|
|
|
+ private final Supplier<E> failover;
|
|
|
+
|
|
|
+ private Failover(InternalSchemaRegistry schemaRegistry, Supplier<E> failover) {
|
|
|
+ this.schemaRegistry = Objects.requireNonNull(schemaRegistry);
|
|
|
+ this.failover = Objects.requireNonNull(failover);
|
|
|
+ }
|
|
|
+
|
|
|
+ abstract E error(Throwable error);
|
|
|
+
|
|
|
+ public E failover(Throwable error) {
|
|
|
+ if (error instanceof WebClientRequestException
|
|
|
+ && error.getCause() instanceof IOException
|
|
|
+ && schemaRegistry.isFailoverAvailable()) {
|
|
|
+ var uri = ((WebClientRequestException) error).getUri();
|
|
|
+ schemaRegistry.markAsUnavailable(String.format("%s://%s", uri.getScheme(), uri.getAuthority()));
|
|
|
+ return failover.get();
|
|
|
+ }
|
|
|
+ return error(error);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class FailoverMono<T> extends Failover<Mono<T>> {
|
|
|
+
|
|
|
+ private FailoverMono(InternalSchemaRegistry schemaRegistry, Supplier<Mono<T>> failover) {
|
|
|
+ super(schemaRegistry, failover);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ Mono<T> error(Throwable error) {
|
|
|
+ return Mono.error(error);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class FailoverFlux<T> extends Failover<Flux<T>> {
|
|
|
+
|
|
|
+ private FailoverFlux(InternalSchemaRegistry schemaRegistry, Supplier<Flux<T>> failover) {
|
|
|
+ super(schemaRegistry, failover);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ Flux<T> error(Throwable error) {
|
|
|
+ return Flux.error(error);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|