|
@@ -1,36 +1,43 @@
|
|
|
package com.provectus.kafka.ui.cluster.service;
|
|
|
|
|
|
+import com.provectus.kafka.ui.cluster.exception.DuplicateEntityException;
|
|
|
import com.provectus.kafka.ui.cluster.exception.NotFoundException;
|
|
|
+import com.provectus.kafka.ui.cluster.exception.UnprocessableEntityException;
|
|
|
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
|
|
|
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
|
|
|
-import com.provectus.kafka.ui.cluster.model.InternalCompatibilityCheck;
|
|
|
-import com.provectus.kafka.ui.cluster.model.InternalCompatibilityLevel;
|
|
|
-import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
|
|
|
-import com.provectus.kafka.ui.model.CompatibilityLevel;
|
|
|
-import com.provectus.kafka.ui.model.NewSchemaSubject;
|
|
|
-import com.provectus.kafka.ui.model.SchemaSubject;
|
|
|
-import java.util.Formatter;
|
|
|
+import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
+import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityCheck;
|
|
|
+import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalCompatibilityLevel;
|
|
|
+import com.provectus.kafka.ui.cluster.model.schemaregistry.InternalNewSchema;
|
|
|
+import com.provectus.kafka.ui.cluster.model.schemaregistry.SubjectIdResponse;
|
|
|
+import com.provectus.kafka.ui.model.*;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
|
-import org.springframework.core.ParameterizedTypeReference;
|
|
|
-import org.springframework.http.HttpEntity;
|
|
|
-import org.springframework.http.HttpStatus;
|
|
|
+import org.jetbrains.annotations.NotNull;
|
|
|
import org.springframework.http.MediaType;
|
|
|
import org.springframework.http.ResponseEntity;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.web.reactive.function.BodyInserters;
|
|
|
+import org.springframework.web.reactive.function.client.ClientResponse;
|
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.List;
|
|
|
+import java.util.Formatter;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.function.Function;
|
|
|
+
|
|
|
+import static org.springframework.http.HttpStatus.NOT_FOUND;
|
|
|
+import static org.springframework.http.HttpStatus.UNPROCESSABLE_ENTITY;
|
|
|
|
|
|
@Service
|
|
|
@Log4j2
|
|
|
@RequiredArgsConstructor
|
|
|
public class SchemaRegistryService {
|
|
|
+ public static final String NO_SUCH_SCHEMA_VERSION = "No such schema %s with version %s";
|
|
|
+ public static final String NO_SUCH_SCHEMA = "No such schema %s";
|
|
|
+ public static final String NO_SUCH_CLUSTER = "No such cluster";
|
|
|
+
|
|
|
private static final String URL_SUBJECTS = "/subjects";
|
|
|
private static final String URL_SUBJECT = "/subjects/{schemaName}";
|
|
|
private static final String URL_SUBJECT_VERSIONS = "/subjects/{schemaName}/versions";
|
|
@@ -45,7 +52,7 @@ public class SchemaRegistryService {
|
|
|
var allSubjectNames = getAllSubjectNames(clusterName);
|
|
|
return allSubjectNames
|
|
|
.flatMapMany(Flux::fromArray)
|
|
|
- .flatMap(subject -> getLatestSchemaSubject(clusterName, subject));
|
|
|
+ .flatMap(subject -> getLatestSchemaVersionBySubject(clusterName, subject));
|
|
|
}
|
|
|
|
|
|
public Mono<String[]> getAllSubjectNames(String clusterName) {
|
|
@@ -56,7 +63,7 @@ public class SchemaRegistryService {
|
|
|
.bodyToMono(String[].class)
|
|
|
.doOnError(log::error)
|
|
|
)
|
|
|
- .orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
+ .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
|
|
|
}
|
|
|
|
|
|
public Flux<SchemaSubject> getAllVersionsBySubject(String clusterName, String subject) {
|
|
@@ -69,19 +76,17 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.get()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(formatted("No such schema %s"))
|
|
|
- )
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA))
|
|
|
).bodyToFlux(Integer.class)
|
|
|
- ).orElse(Flux.error(new NotFoundException("No such cluster")));
|
|
|
+ ).orElse(Flux.error(new NotFoundException(NO_SUCH_CLUSTER)));
|
|
|
}
|
|
|
|
|
|
public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
|
|
|
return this.getSchemaSubject(clusterName, schemaName, String.valueOf(version));
|
|
|
}
|
|
|
|
|
|
- public Mono<SchemaSubject> getLatestSchemaSubject(String clusterName, String schemaName) {
|
|
|
+ public Mono<SchemaSubject> getLatestSchemaVersionBySubject(String clusterName, String schemaName) {
|
|
|
return this.getSchemaSubject(clusterName, schemaName, LATEST);
|
|
|
}
|
|
|
|
|
@@ -90,13 +95,10 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.get()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(
|
|
|
- formatted("No such schema %s with version %s", schemaName, version)
|
|
|
- )
|
|
|
- )
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
|
|
|
).bodyToMono(SchemaSubject.class)
|
|
|
+ .map(this::withSchemaType)
|
|
|
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
|
|
|
.map(tuple -> {
|
|
|
SchemaSubject schema = tuple.getT1();
|
|
@@ -105,7 +107,21 @@ public class SchemaRegistryService {
|
|
|
return schema;
|
|
|
})
|
|
|
)
|
|
|
- .orElseThrow();
|
|
|
+ .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * If {@link SchemaSubject#getSchemaType()} is null, then AVRO, otherwise, adds the schema type as is.
|
|
|
+ */
|
|
|
+ @NotNull
|
|
|
+ private SchemaSubject withSchemaType(SchemaSubject s) {
|
|
|
+ SchemaType schemaType = Objects.nonNull(s.getSchemaType()) ? s.getSchemaType() : SchemaType.AVRO;
|
|
|
+ return new SchemaSubject()
|
|
|
+ .schema(s.getSchema())
|
|
|
+ .subject(s.getSubject())
|
|
|
+ .version(s.getVersion())
|
|
|
+ .id(s.getId())
|
|
|
+ .schemaType(schemaType);
|
|
|
}
|
|
|
|
|
|
public Mono<ResponseEntity<Void>> deleteSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
|
|
@@ -121,46 +137,71 @@ public class SchemaRegistryService {
|
|
|
.map(cluster -> webClient.delete()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(
|
|
|
- formatted("No such schema %s with version %s", schemaName, version)
|
|
|
- )
|
|
|
- )
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA_VERSION, schemaName, version))
|
|
|
).toBodilessEntity()
|
|
|
- ).orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
+ ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
|
|
|
}
|
|
|
|
|
|
- public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName) {
|
|
|
+ public Mono<ResponseEntity<Void>> deleteSchemaSubjectEntirely(String clusterName, String schemaName) {
|
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
|
.map(cluster -> webClient.delete()
|
|
|
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(
|
|
|
- formatted("No such schema %s", schemaName)
|
|
|
- )
|
|
|
- )
|
|
|
+ .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName))
|
|
|
)
|
|
|
.toBodilessEntity())
|
|
|
- .orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
+ .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
|
|
|
}
|
|
|
|
|
|
- public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName, String schemaName, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
|
- return clustersStorage.getClusterByName(clusterName)
|
|
|
- .map(cluster -> webClient.post()
|
|
|
- .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
|
|
|
- .contentType(MediaType.APPLICATION_JSON)
|
|
|
- .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
- .retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(
|
|
|
- new NotFoundException(formatted("No such schema %s", schemaName)))
|
|
|
- )
|
|
|
- .toEntity(SchemaSubject.class)
|
|
|
- .log())
|
|
|
- .orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
+ /**
|
|
|
+ * Checks whether the provided schema duplicates the previous or not, creates a new schema
|
|
|
+ * and then returns the whole content by requesting its latest version.
|
|
|
+ */
|
|
|
+ public Mono<SchemaSubject> registerNewSchema(String clusterName, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
|
+ return newSchemaSubject
|
|
|
+ .flatMap(schema -> {
|
|
|
+ SchemaType schemaType = SchemaType.AVRO == schema.getSchemaType() ? null : schema.getSchemaType();
|
|
|
+ Mono<InternalNewSchema> newSchema = Mono.just(new InternalNewSchema(schema.getSchema(), schemaType));
|
|
|
+ String subject = schema.getSubject();
|
|
|
+ return clustersStorage.getClusterByName(clusterName)
|
|
|
+ .map(KafkaCluster::getSchemaRegistry)
|
|
|
+ .map(schemaRegistryUrl -> checkSchemaOnDuplicate(subject, newSchema, schemaRegistryUrl)
|
|
|
+ .flatMap(s -> submitNewSchema(subject, newSchema, schemaRegistryUrl))
|
|
|
+ .flatMap(resp -> getLatestSchemaVersionBySubject(clusterName, subject))
|
|
|
+ )
|
|
|
+ .orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @NotNull
|
|
|
+ private Mono<SubjectIdResponse> submitNewSchema(String subject, Mono<InternalNewSchema> newSchemaSubject, String schemaRegistryUrl) {
|
|
|
+ return webClient.post()
|
|
|
+ .uri(schemaRegistryUrl + URL_SUBJECT_VERSIONS, subject)
|
|
|
+ .contentType(MediaType.APPLICATION_JSON)
|
|
|
+ .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
|
|
|
+ .retrieve()
|
|
|
+ .onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params")))
|
|
|
+ .bodyToMono(SubjectIdResponse.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ @NotNull
|
|
|
+ private Mono<SchemaSubject> checkSchemaOnDuplicate(String subject, Mono<InternalNewSchema> newSchemaSubject, String schemaRegistryUrl) {
|
|
|
+ return webClient.post()
|
|
|
+ .uri(schemaRegistryUrl + URL_SUBJECT, subject)
|
|
|
+ .contentType(MediaType.APPLICATION_JSON)
|
|
|
+ .body(BodyInserters.fromPublisher(newSchemaSubject, InternalNewSchema.class))
|
|
|
+ .retrieve()
|
|
|
+ .onStatus(NOT_FOUND::equals, res -> Mono.empty())
|
|
|
+ .onStatus(UNPROCESSABLE_ENTITY::equals, r -> Mono.error(new UnprocessableEntityException("Invalid params")))
|
|
|
+ .bodyToMono(SchemaSubject.class)
|
|
|
+ .filter(s -> Objects.isNull(s.getId()))
|
|
|
+ .switchIfEmpty(Mono.error(new DuplicateEntityException("Such schema already exists")));
|
|
|
+ }
|
|
|
+
|
|
|
+ @NotNull
|
|
|
+ private Function<ClientResponse, Mono<? extends Throwable>> throwIfNotFoundStatus(String formatted) {
|
|
|
+ return resp -> Mono.error(new NotFoundException(formatted));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -178,10 +219,10 @@ public class SchemaRegistryService {
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
|
|
|
+ .onStatus(NOT_FOUND::equals,
|
|
|
+ throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
|
|
|
.bodyToMono(Void.class);
|
|
|
- }).orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
+ }).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
|
|
|
}
|
|
|
|
|
|
public Mono<Void> updateSchemaCompatibility(String clusterName, Mono<CompatibilityLevel> compatibilityLevel) {
|
|
@@ -217,12 +258,11 @@ public class SchemaRegistryService {
|
|
|
.contentType(MediaType.APPLICATION_JSON)
|
|
|
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
.retrieve()
|
|
|
- .onStatus(HttpStatus.NOT_FOUND::equals,
|
|
|
- resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
|
|
|
+ .onStatus(NOT_FOUND::equals, throwIfNotFoundStatus(formatted(NO_SUCH_SCHEMA, schemaName)))
|
|
|
.bodyToMono(InternalCompatibilityCheck.class)
|
|
|
.map(mapper::toCompatibilityCheckResponse)
|
|
|
.log()
|
|
|
- ).orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
|
+ ).orElse(Mono.error(new NotFoundException(NO_SUCH_CLUSTER)));
|
|
|
}
|
|
|
|
|
|
public String formatted(String str, Object... args) {
|