|
@@ -16,7 +16,6 @@ import org.springframework.http.MediaType;
|
|
|
import org.springframework.http.ResponseEntity;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.web.reactive.function.BodyInserters;
|
|
|
-import org.springframework.web.reactive.function.client.ClientResponse;
|
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
@@ -42,7 +41,6 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.get()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus::is5xxServerError, ClientResponse::createException)
|
|
|
.bodyToFlux(String.class)
|
|
|
.doOnError(log::error))
|
|
|
.orElse(Flux.error(new NotFoundException("No such cluster")));
|
|
@@ -53,7 +51,7 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.get()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject")))
|
|
|
+ .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
|
|
.bodyToFlux(Integer.class))
|
|
|
.orElse(Flux.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
@@ -71,7 +69,8 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.get()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
|
|
|
+ .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
+ resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
|
|
|
.bodyToMono(SchemaSubject.class)
|
|
|
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
|
|
|
.map(tuple -> {
|
|
@@ -81,7 +80,7 @@ public class SchemaRegistryService {
|
|
|
return schema;
|
|
|
})
|
|
|
)
|
|
|
- .orElse(Mono.error(new NotFoundException()));
|
|
|
+ .orElseThrow();
|
|
|
}
|
|
|
|
|
|
public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
|
|
@@ -97,7 +96,8 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.delete()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
|
|
|
+ .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
+ resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
|
|
|
.toBodilessEntity())
|
|
|
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
@@ -107,19 +107,20 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.delete()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
|
|
|
+ .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
|
|
.toBodilessEntity())
|
|
|
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
|
|
|
|
- public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName, String subjectSchema, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
|
+ public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
.map(cluster -> webClient.post()
|
|
|
- .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema)
|
|
|
+ .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus::isError, ClientResponse::createException)
|
|
|
+ .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
+ resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
|
|
.toEntity(SchemaSubject.class)
|
|
|
.log())
|
|
|
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
@@ -140,6 +141,8 @@ public class SchemaRegistryService {
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
|
|
|
.retrieve()
|
|
|
+ .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
+ resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
|
|
.bodyToMono(Void.class);
|
|
|
}).orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
@@ -177,6 +180,8 @@ public class SchemaRegistryService {
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
.retrieve()
|
|
|
+ .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
+ resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
|
|
|
.bodyToMono(InternalCompatibilityCheck.class)
|
|
|
.map(mapper::toCompatibilityCheckResponse)
|
|
|
.log()
|