|
@@ -5,6 +5,7 @@ import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
|
|
|
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
|
|
import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck;
|
|
|
import com.provectus.kafka.ui.cluster.model.InternalCompatibilityLevel;
|
|
|
+import com.provectus.kafka.ui.cluster.model.schemaregistry.SubjectIdResponse;
|
|
|
import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
|
|
|
import com.provectus.kafka.ui.model.CompatibilityLevel;
|
|
|
import com.provectus.kafka.ui.model.NewSchemaSubject;
|
|
@@ -12,20 +13,20 @@ import com.provectus.kafka.ui.model.SchemaSubject;
|
|
|
import java.util.Formatter;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
|
-import org.springframework.core.ParameterizedTypeReference;
|
|
|
-import org.springframework.http.HttpEntity;
|
|
|
-import org.springframework.http.HttpStatus;
|
|
|
+import org.jetbrains.annotations.NotNull;
|
|
|
import org.springframework.http.MediaType;
|
|
|
import org.springframework.http.ResponseEntity;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.web.reactive.function.BodyInserters;
|
|
|
+import org.springframework.web.reactive.function.client.ClientResponse;
|
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.List;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.function.Function;
|
|
|
+
|
|
|
+import static org.springframework.http.HttpStatus.NOT_FOUND;
|
|
|
|
|
|
@Service
|
|
|
@Log4j2
|
|
@@ -69,10 +70,8 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.get()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(formatted("No such schema %s"))
|
|
|
- )
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted("No such schema %s"))
|
|
|
).bodyToFlux(Integer.class)
|
|
|
).orElse(Flux.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
@@ -90,12 +89,8 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.get()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(
|
|
|
- formatted("No such schema %s with version %s", schemaName, version)
|
|
|
- )
|
|
|
- )
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted("No such schema %s with version %s", schemaName, version))
|
|
|
).bodyToMono(SchemaSubject.class)
|
|
|
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
|
|
|
.map(tuple -> {
|
|
@@ -105,7 +100,7 @@ public class SchemaRegistryService {
|
|
|
return schema;
|
|
|
})
|
|
|
)
|
|
|
- .orElseThrow();
|
|
|
+ .orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
|
|
|
|
public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
|
|
@@ -121,46 +116,40 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.delete()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(
|
|
|
- formatted("No such schema %s with version %s", schemaName, version)
|
|
|
- )
|
|
|
- )
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted("No such schema %s with version %s", schemaName, version))
|
|
|
).toBodilessEntity()
|
|
|
).orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
|
|
|
|
- public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName) {
|
|
|
+ public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(String clusterName, String schemaName) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
.map(cluster -> webClient.delete()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(
|
|
|
- formatted("No such schema %s", schemaName)
|
|
|
- )
|
|
|
- )
|
|
|
+ .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName))
|
|
|
)
|
|
|
.toBodilessEntity())
|
|
|
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
|
|
|
|
- public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
|
- return clustersStorage.getClusterByName(clusterName)
|
|
|
+ public Mono<SchemaSubject> createNewSchema(String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
|
+ var response = clustersStorage.getClusterByName(clusterName)
|
|
|
.map(cluster -> webClient.post()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(formatted("No such schema %s", schemaName)))
|
|
|
- )
|
|
|
- .toEntity(SchemaSubject.class)
|
|
|
+ .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName)))
|
|
|
+ .bodyToMono(SubjectIdResponse.class)
|
|
|
.log())
|
|
|
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
+ return response.then(getLatestSchemaSubject(clusterName, schemaName));
|
|
|
+ }
|
|
|
+
|
|
|
+ @NotNull
|
|
|
+ private Function<ClientResponse, Mono<? extends Throwable>> throwIfNotFoundStatus(String formatted) {
|
|
|
+ return resp -> Mono.error(new NotFoundException(formatted));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -178,8 +167,8 @@ public class SchemaRegistryService {
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted("No such schema %s", schemaName)))
|
|
|
.bodyToMono(Void.class);
|
|
|
}).orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
}
|
|
@@ -217,8 +206,7 @@ public class SchemaRegistryService {
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
|
|
|
+ .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted("No such schema %s", schemaName)))
|
|
|
.bodyToMono(InternalCompatibilityCheck.class)
|
|
|
.map(mapper::toCompatibilityCheckResponse)
|
|
|
.log()
|