|
@@ -3,7 +3,6 @@ package com.provectus.kafka.ui.service;
|
|
|
import static org.springframework.http.HttpStatus.NOT_FOUND;
|
|
|
import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
|
|
|
|
|
|
-import com.provectus.kafka.ui.exception.DuplicateEntityException;
|
|
|
import com.provectus.kafka.ui.exception.SchemaFailedToDeleteException;
|
|
|
import com.provectus.kafka.ui.exception.SchemaNotFoundException;
|
|
|
import com.provectus.kafka.ui.exception.SchemaTypeIsNotSupportedException;
|
|
@@ -22,6 +21,8 @@ import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck;
|
|
|
import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel;
|
|
|
import com.provectus.kafka.ui.model.schemaregistry.InternalNewSchema;
|
|
|
import com.provectus.kafka.ui.model.schemaregistry.SubjectIdResponse;
|
|
|
+import java.net.URI;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.Formatter;
|
|
|
import java.util.List;
|
|
|
import java.util.Objects;
|
|
@@ -35,11 +36,13 @@ import org.springframework.http.HttpHeaders;
|
|
|
import org.springframework.http.HttpMethod;
|
|
|
import org.springframework.http.HttpStatus;
|
|
|
import org.springframework.http.MediaType;
|
|
|
-import org.springframework.http.ResponseEntity;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.util.LinkedMultiValueMap;
|
|
|
+import org.springframework.util.MultiValueMap;
|
|
|
import org.springframework.web.reactive.function.BodyInserters;
|
|
|
import org.springframework.web.reactive.function.client.ClientResponse;
|
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
|
+import org.springframework.web.util.UriComponentsBuilder;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
@@ -65,8 +68,8 @@ public class SchemaRegistryService {
|
|
|
public Mono<List<SchemaSubjectDTO>> getAllLatestVersionSchemas(KafkaCluster cluster,
|
|
|
List<String> subjects) {
|
|
|
return Flux.fromIterable(subjects)
|
|
|
- .concatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject))
|
|
|
- .collect(Collectors.toList());
|
|
|
+ .concatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject))
|
|
|
+ .collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
public Mono<String[]> getAllSubjectNames(KafkaCluster cluster) {
|
|
@@ -88,7 +91,8 @@ public class SchemaRegistryService {
|
|
|
return configuredWebClient(
|
|
|
cluster,
|
|
|
HttpMethod.GET,
|
|
|
- URL_SUBJECT_VERSIONS, schemaName)
|
|
|
+ URL_SUBJECT_VERSIONS,
|
|
|
+ schemaName)
|
|
|
.retrieve()
|
|
|
.onStatus(NOT_FOUND::equals,
|
|
|
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
|
|
@@ -101,7 +105,7 @@ public class SchemaRegistryService {
|
|
|
}
|
|
|
|
|
|
public Mono<SchemaSubjectDTO> getLatestSchemaVersionBySubject(KafkaCluster cluster,
|
|
|
- String schemaName) {
|
|
|
+ String schemaName) {
|
|
|
return this.getSchemaSubject(cluster, schemaName, LATEST);
|
|
|
}
|
|
|
|
|
@@ -110,7 +114,8 @@ public class SchemaRegistryService {
|
|
|
return configuredWebClient(
|
|
|
cluster,
|
|
|
HttpMethod.GET,
|
|
|
- URL_SUBJECT_BY_VERSION, schemaName, version)
|
|
|
+ URL_SUBJECT_BY_VERSION,
|
|
|
+ List.of(schemaName, version))
|
|
|
.retrieve()
|
|
|
.onStatus(NOT_FOUND::equals,
|
|
|
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
|
|
@@ -135,35 +140,39 @@ public class SchemaRegistryService {
|
|
|
return s.schemaType(Optional.ofNullable(s.getSchemaType()).orElse(SchemaTypeDTO.AVRO));
|
|
|
}
|
|
|
|
|
|
- public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(KafkaCluster cluster,
|
|
|
- String schemaName,
|
|
|
- Integer version) {
|
|
|
+ public Mono<Void> deleteSchemaSubjectByVersion(KafkaCluster cluster,
|
|
|
+ String schemaName,
|
|
|
+ Integer version) {
|
|
|
return this.deleteSchemaSubject(cluster, schemaName, String.valueOf(version));
|
|
|
}
|
|
|
|
|
|
- public Mono<ResponseEntity<Void>> deleteLatestSchemaSubject(KafkaCluster cluster,
|
|
|
- String schemaName) {
|
|
|
+ public Mono<Void> deleteLatestSchemaSubject(KafkaCluster cluster,
|
|
|
+ String schemaName) {
|
|
|
return this.deleteSchemaSubject(cluster, schemaName, LATEST);
|
|
|
}
|
|
|
|
|
|
- private Mono<ResponseEntity<Void>> deleteSchemaSubject(KafkaCluster cluster, String schemaName,
|
|
|
- String version) {
|
|
|
+ private Mono<Void> deleteSchemaSubject(KafkaCluster cluster, String schemaName,
|
|
|
+ String version) {
|
|
|
return configuredWebClient(
|
|
|
cluster,
|
|
|
HttpMethod.DELETE,
|
|
|
- URL_SUBJECT_BY_VERSION, schemaName, version)
|
|
|
+ URL_SUBJECT_BY_VERSION,
|
|
|
+ List.of(schemaName, version))
|
|
|
.retrieve()
|
|
|
.onStatus(NOT_FOUND::equals,
|
|
|
throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
|
|
|
- ).toBodilessEntity();
|
|
|
+ )
|
|
|
+ .toBodilessEntity()
|
|
|
+ .then();
|
|
|
}
|
|
|
|
|
|
public Mono<Void> deleteSchemaSubjectEntirely(KafkaCluster cluster,
|
|
|
- String schemaName) {
|
|
|
+ String schemaName) {
|
|
|
return configuredWebClient(
|
|
|
cluster,
|
|
|
HttpMethod.DELETE,
|
|
|
- URL_SUBJECT, schemaName)
|
|
|
+ URL_SUBJECT,
|
|
|
+ schemaName)
|
|
|
.retrieve()
|
|
|
.onStatus(HttpStatus::isError, errorOnSchemaDeleteFailure(schemaName))
|
|
|
.toBodilessEntity()
|
|
@@ -183,9 +192,7 @@ public class SchemaRegistryService {
|
|
|
Mono<InternalNewSchema> newSchema =
|
|
|
Mono.just(new InternalNewSchema(schema.getSchema(), schemaType));
|
|
|
String subject = schema.getSubject();
|
|
|
- var schemaRegistry = cluster.getSchemaRegistry();
|
|
|
- return checkSchemaOnDuplicate(subject, newSchema, schemaRegistry)
|
|
|
- .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistry))
|
|
|
+ return submitNewSchema(subject, newSchema, cluster)
|
|
|
.flatMap(resp -> getLatestSchemaVersionBySubject(cluster, subject));
|
|
|
});
|
|
|
}
|
|
@@ -193,44 +200,23 @@ public class SchemaRegistryService {
|
|
|
@NotNull
|
|
|
private Mono<SubjectIdResponse> submitNewSchema(String subject,
|
|
|
Mono<InternalNewSchema> newSchemaSubject,
|
|
|
- InternalSchemaRegistry schemaRegistry) {
|
|
|
+ KafkaCluster cluster) {
|
|
|
return configuredWebClient(
|
|
|
- schemaRegistry,
|
|
|
+ cluster,
|
|
|
HttpMethod.POST,
|
|
|
- URL_SUBJECT_VERSIONS, subject)
|
|
|
+ URL_SUBJECT_VERSIONS,
|
|
|
+ subject)
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
|
|
|
.retrieve()
|
|
|
.onStatus(UNPROCESSABLE_ENTITY::equals,
|
|
|
r -> r.bodyToMono(ErrorResponse.class)
|
|
|
.flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage())
|
|
|
- ? new SchemaTypeIsNotSupportedException()
|
|
|
- : new UnprocessableEntityException(x.getMessage()))))
|
|
|
+ ? new SchemaTypeIsNotSupportedException()
|
|
|
+ : new UnprocessableEntityException(x.getMessage()))))
|
|
|
.bodyToMono(SubjectIdResponse.class);
|
|
|
}
|
|
|
|
|
|
- @NotNull
|
|
|
- private Mono<SchemaSubjectDTO> checkSchemaOnDuplicate(String subject,
|
|
|
- Mono<InternalNewSchema> newSchemaSubject,
|
|
|
- InternalSchemaRegistry schemaRegistry) {
|
|
|
- return configuredWebClient(
|
|
|
- schemaRegistry,
|
|
|
- HttpMethod.POST,
|
|
|
- URL_SUBJECT, subject)
|
|
|
- .contentType(MediaType.APPLICATION_JSON)
|
|
|
- .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
|
|
|
- .retrieve()
|
|
|
- .onStatus(NOT_FOUND::equals, res -> Mono.empty())
|
|
|
- .onStatus(UNPROCESSABLE_ENTITY::equals,
|
|
|
- r -> r.bodyToMono(ErrorResponse.class)
|
|
|
- .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage())
|
|
|
- ? new SchemaTypeIsNotSupportedException()
|
|
|
- : new UnprocessableEntityException(x.getMessage()))))
|
|
|
- .bodyToMono(SchemaSubjectDTO.class)
|
|
|
- .filter(s -> Objects.isNull(s.getId()))
|
|
|
- .switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists")));
|
|
|
- }
|
|
|
-
|
|
|
@NotNull
|
|
|
private Function<ClientResponse, Mono<? extends Throwable>> throwIfNotFoundStatus(
|
|
|
String formatted) {
|
|
@@ -249,7 +235,8 @@ public class SchemaRegistryService {
|
|
|
return configuredWebClient(
|
|
|
cluster,
|
|
|
HttpMethod.PUT,
|
|
|
- configEndpoint, schemaName)
|
|
|
+ configEndpoint,
|
|
|
+ schemaName)
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevelDTO.class))
|
|
|
.retrieve()
|
|
@@ -265,11 +252,15 @@ public class SchemaRegistryService {
|
|
|
|
|
|
public Mono<CompatibilityLevelDTO> getSchemaCompatibilityLevel(KafkaCluster cluster,
|
|
|
String schemaName) {
|
|
|
- String configEndpoint = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
|
|
|
+ String globalConfig = Objects.isNull(schemaName) ? "/config" : "/config/{schemaName}";
|
|
|
+ final var values = new LinkedMultiValueMap<String, String>();
|
|
|
+ values.add("defaultToGlobal", "true");
|
|
|
return configuredWebClient(
|
|
|
cluster,
|
|
|
HttpMethod.GET,
|
|
|
- configEndpoint, schemaName)
|
|
|
+ globalConfig,
|
|
|
+ (schemaName == null ? Collections.emptyList() : List.of(schemaName)),
|
|
|
+ values)
|
|
|
.retrieve()
|
|
|
.bodyToMono(InternalCompatibilityLevel.class)
|
|
|
.map(mapper::toCompatibilityLevel)
|
|
@@ -281,7 +272,7 @@ public class SchemaRegistryService {
|
|
|
}
|
|
|
|
|
|
private Mono<CompatibilityLevelDTO> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
|
|
|
- String schemaName) {
|
|
|
+ String schemaName) {
|
|
|
return this.getSchemaCompatibilityLevel(cluster, schemaName)
|
|
|
.switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(cluster));
|
|
|
}
|
|
@@ -291,7 +282,8 @@ public class SchemaRegistryService {
|
|
|
return configuredWebClient(
|
|
|
cluster,
|
|
|
HttpMethod.POST,
|
|
|
- "/compatibility/subjects/{schemaName}/versions/latest", schemaName)
|
|
|
+ "/compatibility/subjects/{schemaName}/versions/latest",
|
|
|
+ schemaName)
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubjectDTO.class))
|
|
|
.retrieve()
|
|
@@ -313,29 +305,50 @@ public class SchemaRegistryService {
|
|
|
);
|
|
|
} else if (schemaRegistry.getUsername() != null) {
|
|
|
throw new ValidationException(
|
|
|
- "You specified username but do not specified password");
|
|
|
+ "You specified username but did not specify password");
|
|
|
} else if (schemaRegistry.getPassword() != null) {
|
|
|
throw new ValidationException(
|
|
|
- "You specified password but do not specified username");
|
|
|
+ "You specified password but did not specify username");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private boolean isUnrecognizedFieldSchemaTypeMessage(String errorMessage) {
|
|
|
+ return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE);
|
|
|
+ }
|
|
|
+
|
|
|
+ private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method, String uri) {
|
|
|
+ return configuredWebClient(cluster, method, uri, Collections.emptyList(),
|
|
|
+ new LinkedMultiValueMap<>());
|
|
|
+ }
|
|
|
+
|
|
|
private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
|
|
|
- String uri, Object... params) {
|
|
|
- return configuredWebClient(cluster.getSchemaRegistry(), method, uri, params);
|
|
|
+ String uri, List<String> uriVariables) {
|
|
|
+ return configuredWebClient(cluster, method, uri, uriVariables, new LinkedMultiValueMap<>());
|
|
|
}
|
|
|
|
|
|
- private WebClient.RequestBodySpec configuredWebClient(InternalSchemaRegistry schemaRegistry,
|
|
|
+ private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
|
|
|
+ String uri, String uriVariable) {
|
|
|
+ return configuredWebClient(cluster, method, uri, List.of(uriVariable),
|
|
|
+ new LinkedMultiValueMap<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster,
|
|
|
HttpMethod method, String uri,
|
|
|
- Object... params) {
|
|
|
+ List<String> uriVariables,
|
|
|
+ MultiValueMap<String, String> queryParams) {
|
|
|
+ final var schemaRegistry = cluster.getSchemaRegistry();
|
|
|
return webClient
|
|
|
.method(method)
|
|
|
- .uri(schemaRegistry.getFirstUrl() + uri, params)
|
|
|
+ .uri(buildUri(schemaRegistry, uri, uriVariables, queryParams))
|
|
|
.headers(headers -> setBasicAuthIfEnabled(schemaRegistry, headers));
|
|
|
}
|
|
|
|
|
|
- private boolean isUnrecognizedFieldSchemaTypeMessage(String errorMessage) {
|
|
|
- return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE);
|
|
|
+ private URI buildUri(InternalSchemaRegistry schemaRegistry, String uri, List<String> uriVariables,
|
|
|
+ MultiValueMap<String, String> queryParams) {
|
|
|
+ final var builder = UriComponentsBuilder
|
|
|
+ .fromHttpUrl(schemaRegistry.getFirstUrl() + uri);
|
|
|
+ builder.queryParams(queryParams);
|
|
|
+ return builder.buildAndExpand(uriVariables.toArray()).toUri();
|
|
|
}
|
|
|
|
|
|
private Function<ClientResponse, Mono<? extends Throwable>> errorOnSchemaDeleteFailure(String schemaName) {
|