|
@@ -1,8 +1,10 @@
|
|
|
package com.provectus.kafka.ui.service;
|
|
|
|
|
|
+import static org.springframework.http.HttpStatus.CONFLICT;
|
|
|
import static org.springframework.http.HttpStatus.NOT_FOUND;
|
|
|
import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
|
|
|
|
|
|
+import com.provectus.kafka.ui.exception.SchemaCompatibilityException;
|
|
|
import com.provectus.kafka.ui.exception.SchemaFailedToDeleteException;
|
|
|
import com.provectus.kafka.ui.exception.SchemaNotFoundException;
|
|
|
import com.provectus.kafka.ui.exception.SchemaTypeNotSupportedException;
|
|
@@ -65,6 +67,7 @@ public class SchemaRegistryService {
|
|
|
private static final String LATEST = "latest";
|
|
|
|
|
|
private static final String UNRECOGNIZED_FIELD_SCHEMA_TYPE = "Unrecognized field: schemaType";
|
|
|
+ private static final String INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA = "incompatible with an earlier schema";
|
|
|
|
|
|
private final ClusterMapper mapper;
|
|
|
private final WebClient webClient;
|
|
@@ -164,18 +167,18 @@ public class SchemaRegistryService {
|
|
|
private Mono<Void> deleteSchemaSubject(KafkaCluster cluster, String schemaName,
|
|
|
String version) {
|
|
|
return configuredWebClient(
|
|
|
- cluster,
|
|
|
- HttpMethod.DELETE,
|
|
|
- SchemaRegistryService.URL_SUBJECT_BY_VERSION,
|
|
|
- List.of(schemaName, version))
|
|
|
- .retrieve()
|
|
|
- .onStatus(NOT_FOUND::equals,
|
|
|
- throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
|
|
|
- )
|
|
|
- .toBodilessEntity()
|
|
|
- .then()
|
|
|
- .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
- () -> this.deleteSchemaSubject(cluster, schemaName, version))));
|
|
|
+ cluster,
|
|
|
+ HttpMethod.DELETE,
|
|
|
+ SchemaRegistryService.URL_SUBJECT_BY_VERSION,
|
|
|
+ List.of(schemaName, version))
|
|
|
+ .retrieve()
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
|
|
|
+ )
|
|
|
+ .toBodilessEntity()
|
|
|
+ .then()
|
|
|
+ .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
+ () -> this.deleteSchemaSubject(cluster, schemaName, version))));
|
|
|
}
|
|
|
|
|
|
public Mono<Void> deleteSchemaSubjectEntirely(KafkaCluster cluster,
|
|
@@ -216,20 +219,29 @@ public class SchemaRegistryService {
|
|
|
Mono<InternalNewSchema> newSchemaSubject,
|
|
|
KafkaCluster cluster) {
|
|
|
return configuredWebClient(
|
|
|
- cluster,
|
|
|
- HttpMethod.POST,
|
|
|
- URL_SUBJECT_VERSIONS, subject)
|
|
|
- .contentType(MediaType.APPLICATION_JSON)
|
|
|
- .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
|
|
|
- .retrieve()
|
|
|
- .onStatus(UNPROCESSABLE_ENTITY::equals,
|
|
|
- r -> r.bodyToMono(ErrorResponse.class)
|
|
|
- .flatMap(x -> Mono.error(isUnrecognizedFieldSchemaTypeMessage(x.getMessage())
|
|
|
- ? new SchemaTypeNotSupportedException()
|
|
|
- : new UnprocessableEntityException(x.getMessage()))))
|
|
|
- .bodyToMono(SubjectIdResponse.class)
|
|
|
- .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
- () -> submitNewSchema(subject, newSchemaSubject, cluster))));
|
|
|
+ cluster,
|
|
|
+ HttpMethod.POST,
|
|
|
+ URL_SUBJECT_VERSIONS, subject)
|
|
|
+ .contentType(MediaType.APPLICATION_JSON)
|
|
|
+ .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
|
|
|
+ .retrieve()
|
|
|
+ .onStatus(status -> UNPROCESSABLE_ENTITY.equals(status) || CONFLICT.equals(status),
|
|
|
+ r -> r.bodyToMono(ErrorResponse.class)
|
|
|
+ .flatMap(this::getMonoError))
|
|
|
+ .bodyToMono(SubjectIdResponse.class)
|
|
|
+ .as(m -> failoverAble(m, new FailoverMono<>(cluster.getSchemaRegistry(),
|
|
|
+ () -> submitNewSchema(subject, newSchemaSubject, cluster))));
|
|
|
+ }
|
|
|
+
|
|
|
+ @NotNull
|
|
|
+ private Mono<Throwable> getMonoError(ErrorResponse x) {
|
|
|
+ if (isUnrecognizedFieldSchemaTypeMessage(x.getMessage())) {
|
|
|
+ return Mono.error(new SchemaTypeNotSupportedException());
|
|
|
+ } else if (isIncompatibleSchemaMessage(x.getMessage())) {
|
|
|
+ return Mono.error(new SchemaCompatibilityException(x.getMessage()));
|
|
|
+ } else {
|
|
|
+ return Mono.error(new UnprocessableEntityException(x.getMessage()));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@NotNull
|
|
@@ -337,6 +349,10 @@ public class SchemaRegistryService {
|
|
|
return errorMessage.contains(UNRECOGNIZED_FIELD_SCHEMA_TYPE);
|
|
|
}
|
|
|
|
|
|
+ private boolean isIncompatibleSchemaMessage(String message) {
|
|
|
+ return message.contains(INCOMPATIBLE_WITH_AN_EARLIER_SCHEMA);
|
|
|
+ }
|
|
|
+
|
|
|
private WebClient.RequestBodySpec configuredWebClient(KafkaCluster cluster, HttpMethod method,
|
|
|
String uri) {
|
|
|
return configuredWebClient(cluster, method, uri, Collections.emptyList(),
|