|
@@ -3,7 +3,7 @@ package com.provectus.kafka.ui.cluster.service;
|
|
import com.provectus.kafka.ui.cluster.exception.NotFoundException;
|
|
import com.provectus.kafka.ui.cluster.exception.NotFoundException;
|
|
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
|
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
|
import com.provectus.kafka.ui.model.NewSchemaSubject;
|
|
import com.provectus.kafka.ui.model.NewSchemaSubject;
|
|
-import com.provectus.kafka.ui.model.SubjectSchema;
|
|
|
|
|
|
+import com.provectus.kafka.ui.model.SchemaSubject;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.extern.log4j.Log4j2;
|
|
import lombok.extern.log4j.Log4j2;
|
|
import org.springframework.http.HttpStatus;
|
|
import org.springframework.http.HttpStatus;
|
|
@@ -19,10 +19,10 @@ import reactor.core.publisher.Mono;
|
|
@Log4j2
|
|
@Log4j2
|
|
@RequiredArgsConstructor
|
|
@RequiredArgsConstructor
|
|
public class SchemaRegistryService {
|
|
public class SchemaRegistryService {
|
|
- public static final String URL_SUBJECTS = "/subjects";
|
|
|
|
- public static final String URL_SUBJECT = "/subjects/{subjectName}";
|
|
|
|
- public static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions";
|
|
|
|
- public static final String URL_SUBJECT_BY_VERSION = "/subjects/{subjectName}/versions/{version}";
|
|
|
|
|
|
+ private static final String URL_SUBJECTS = "/subjects";
|
|
|
|
+ private static final String URL_SUBJECT = "/subjects/{subjectName}";
|
|
|
|
+ private static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions";
|
|
|
|
+ private static final String URL_SUBJECT_BY_VERSION = "/subjects/{subjectName}/versions/{version}";
|
|
|
|
|
|
private final ClustersStorage clustersStorage;
|
|
private final ClustersStorage clustersStorage;
|
|
private final WebClient webClient;
|
|
private final WebClient webClient;
|
|
@@ -33,7 +33,7 @@ public class SchemaRegistryService {
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
|
|
.retrieve()
|
|
.retrieve()
|
|
.bodyToFlux(String.class))
|
|
.bodyToFlux(String.class))
|
|
- .orElse(Flux.empty());
|
|
|
|
|
|
+ .orElse(Flux.error(new NotFoundException("No such cluster")));
|
|
}
|
|
}
|
|
|
|
|
|
public Flux<Integer> getSchemaSubjectVersions(String clusterName, String subjectName) {
|
|
public Flux<Integer> getSchemaSubjectVersions(String clusterName, String subjectName) {
|
|
@@ -43,47 +43,47 @@ public class SchemaRegistryService {
|
|
.retrieve()
|
|
.retrieve()
|
|
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject")))
|
|
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject")))
|
|
.bodyToFlux(Integer.class))
|
|
.bodyToFlux(Integer.class))
|
|
- .orElse(Flux.empty());
|
|
|
|
|
|
+ .orElse(Flux.error(new NotFoundException("No such cluster")));
|
|
}
|
|
}
|
|
|
|
|
|
- public Flux<SubjectSchema> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
|
|
|
|
|
|
+ public Flux<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
.map(cluster -> webClient.get()
|
|
.map(cluster -> webClient.get()
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
|
|
.retrieve()
|
|
.retrieve()
|
|
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
|
|
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
|
|
- .bodyToFlux(SubjectSchema.class))
|
|
|
|
- .orElse(Flux.empty());
|
|
|
|
|
|
+ .bodyToFlux(SchemaSubject.class))
|
|
|
|
+ .orElse(Flux.error(new NotFoundException()));
|
|
}
|
|
}
|
|
|
|
|
|
- public Mono<Object> deleteSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
|
|
|
|
|
|
+ public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
.map(cluster -> webClient.delete()
|
|
.map(cluster -> webClient.delete()
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
|
|
.retrieve()
|
|
.retrieve()
|
|
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
|
|
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
|
|
- .bodyToMono(Object.class))
|
|
|
|
- .orElse(Mono.empty());
|
|
|
|
|
|
+ .toBodilessEntity())
|
|
|
|
+ .orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
}
|
|
}
|
|
|
|
|
|
- public Mono<Object> deleteSchemaSubject(String clusterName, String subjectName) {
|
|
|
|
|
|
+ public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String subjectName) {
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
.map(cluster -> webClient.delete()
|
|
.map(cluster -> webClient.delete()
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, subjectName)
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, subjectName)
|
|
.retrieve()
|
|
.retrieve()
|
|
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
|
|
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
|
|
- .bodyToMono(Object.class))
|
|
|
|
- .orElse(Mono.empty());
|
|
|
|
|
|
+ .toBodilessEntity())
|
|
|
|
+ .orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
}
|
|
}
|
|
|
|
|
|
- public Mono<ResponseEntity<SubjectSchema>> createNewSubject(String clusterName, String subjectSchema, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
|
|
|
|
+ public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName, String subjectSchema, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
.map(cluster -> webClient.post()
|
|
.map(cluster -> webClient.post()
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema)
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema)
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
.retrieve()
|
|
.retrieve()
|
|
- .toEntity(SubjectSchema.class))
|
|
|
|
- .orElse(Mono.empty());
|
|
|
|
|
|
+ .toEntity(SchemaSubject.class))
|
|
|
|
+ .orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
}
|
|
}
|
|
}
|
|
}
|