|
@@ -28,6 +28,7 @@ public class SchemaRegistryService {
|
|
|
private static final String URL_SUBJECT = "/subjects/{subjectName}";
|
|
|
private static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions";
|
|
|
private static final String URL_SUBJECT_BY_VERSION = "/subjects/{subjectName}/versions/{version}";
|
|
|
+ private static final String LATEST = "latest";
|
|
|
|
|
|
private final ClustersStorage clustersStorage;
|
|
|
private final WebClient webClient;
|
|
@@ -37,7 +38,9 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.get()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
|
|
|
.retrieve()
|
|
|
- .bodyToFlux(String.class))
|
|
|
+ .onStatus(HttpStatus::is5xxServerError, ClientResponse::createException)
|
|
|
+ .bodyToFlux(String.class)
|
|
|
+ .doOnError(log::error))
|
|
|
.orElse(Flux.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
|
|
|
@@ -52,6 +55,14 @@ public class SchemaRegistryService {
|
|
|
}
|
|
|
|
|
|
public Flux<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
|
|
|
+ return this.getSchemaSubject(clusterName, subjectName, String.valueOf(version));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Flux<SchemaSubject> getLatestSchemaSubject(String clusterName, String subjectName) {
|
|
|
+ return this.getSchemaSubject(clusterName, subjectName, LATEST);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Flux<SchemaSubject> getSchemaSubject(String clusterName, String subjectName, String version) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
.map(cluster -> webClient.get()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
|
|
@@ -62,6 +73,14 @@ public class SchemaRegistryService {
|
|
|
}
|
|
|
|
|
|
public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
|
|
|
+ return this.deleteSchemaSubject(clusterName, subjectName, String.valueOf(version));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<ResponseEntity<Void>> deleteLatestSchemaSubject(String clusterName, String subjectName) {
|
|
|
+ return this.deleteSchemaSubject(clusterName, subjectName, LATEST);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String subjectName, String version) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
.map(cluster -> webClient.delete()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
|
|
@@ -83,16 +102,14 @@ public class SchemaRegistryService {
|
|
|
|
|
|
public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName, String subjectSchema, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
- .map(cluster -> {
|
|
|
- log.info("Submitting a new subject: {} to Schema Registry: {}", subjectSchema, cluster.getSchemaRegistry());
|
|
|
- return webClient.post()
|
|
|
- .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema)
|
|
|
- .contentType(MediaType.APPLICATION_JSON)
|
|
|
- .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
- .retrieve()
|
|
|
- .onStatus(HttpStatus.INTERNAL_SERVER_ERROR::equals, ClientResponse::createException)
|
|
|
- .toEntity(SchemaSubject.class);
|
|
|
- })
|
|
|
+ .map(cluster -> webClient.post()
|
|
|
+ .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema)
|
|
|
+ .contentType(MediaType.APPLICATION_JSON)
|
|
|
+ .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
+ .retrieve()
|
|
|
+ .onStatus(HttpStatus::isError, ClientResponse::createException)
|
|
|
+ .toEntity(SchemaSubject.class)
|
|
|
+ .doOnError(log::error))
|
|
|
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
|
|