123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package com.provectus.kafka.ui.service;
- import com.fasterxml.jackson.core.type.TypeReference;
- import com.fasterxml.jackson.databind.json.JsonMapper;
- import com.provectus.kafka.ui.exception.SchemaCompatibilityException;
- import com.provectus.kafka.ui.exception.SchemaNotFoundException;
- import com.provectus.kafka.ui.exception.ValidationException;
- import com.provectus.kafka.ui.model.KafkaCluster;
- import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
- import com.provectus.kafka.ui.sr.model.Compatibility;
- import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
- import com.provectus.kafka.ui.sr.model.CompatibilityConfig;
- import com.provectus.kafka.ui.sr.model.CompatibilityLevelChange;
- import com.provectus.kafka.ui.sr.model.NewSubject;
- import com.provectus.kafka.ui.sr.model.SchemaSubject;
- import com.provectus.kafka.ui.util.ReactiveFailover;
- import java.nio.charset.Charset;
- import java.util.List;
- import java.util.stream.Collectors;
- import lombok.AllArgsConstructor;
- import lombok.Getter;
- import lombok.RequiredArgsConstructor;
- import lombok.SneakyThrows;
- import lombok.experimental.Delegate;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- import org.springframework.web.reactive.function.client.WebClientResponseException;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
- @Service
- @Slf4j
- @RequiredArgsConstructor
- public class SchemaRegistryService {
- private static final String LATEST = "latest";
- @AllArgsConstructor
- public static class SubjectWithCompatibilityLevel {
- @Delegate
- SchemaSubject subject;
- @Getter
- Compatibility compatibility;
- }
- private ReactiveFailover<KafkaSrClientApi> api(KafkaCluster cluster) {
- return cluster.getSchemaRegistryClient();
- }
- public Mono<List<SubjectWithCompatibilityLevel>> getAllLatestVersionSchemas(KafkaCluster cluster,
- List<String> subjects) {
- return Flux.fromIterable(subjects)
- .concatMap(subject -> getLatestSchemaVersionBySubject(cluster, subject))
- .collect(Collectors.toList());
- }
- public Mono<List<String>> getAllSubjectNames(KafkaCluster cluster) {
- return api(cluster)
- .mono(c -> c.getAllSubjectNames(null, false))
- .flatMapIterable(this::parseSubjectListString)
- .collectList();
- }
- @SneakyThrows
- private List<String> parseSubjectListString(String subjectNamesStr) {
- //workaround for https://github.com/spring-projects/spring-framework/issues/24734
- return new JsonMapper().readValue(subjectNamesStr, new TypeReference<List<String>>() {
- });
- }
- public Flux<SubjectWithCompatibilityLevel> getAllVersionsBySubject(KafkaCluster cluster, String subject) {
- Flux<Integer> versions = getSubjectVersions(cluster, subject);
- return versions.flatMap(version -> getSchemaSubjectByVersion(cluster, subject, version));
- }
- private Flux<Integer> getSubjectVersions(KafkaCluster cluster, String schemaName) {
- return api(cluster).flux(c -> c.getSubjectVersions(schemaName));
- }
- public Mono<SubjectWithCompatibilityLevel> getSchemaSubjectByVersion(KafkaCluster cluster,
- String schemaName,
- Integer version) {
- return getSchemaSubject(cluster, schemaName, String.valueOf(version));
- }
- public Mono<SubjectWithCompatibilityLevel> getLatestSchemaVersionBySubject(KafkaCluster cluster,
- String schemaName) {
- return getSchemaSubject(cluster, schemaName, LATEST);
- }
- private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
- String version) {
- return api(cluster)
- .mono(c -> c.getSubjectVersion(schemaName, version))
- .zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
- .map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
- .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
- }
- public Mono<Void> deleteSchemaSubjectByVersion(KafkaCluster cluster, String schemaName, Integer version) {
- return deleteSchemaSubject(cluster, schemaName, String.valueOf(version));
- }
- public Mono<Void> deleteLatestSchemaSubject(KafkaCluster cluster, String schemaName) {
- return deleteSchemaSubject(cluster, schemaName, LATEST);
- }
- private Mono<Void> deleteSchemaSubject(KafkaCluster cluster, String schemaName, String version) {
- return api(cluster).mono(c -> c.deleteSubjectVersion(schemaName, version, false));
- }
- public Mono<Void> deleteSchemaSubjectEntirely(KafkaCluster cluster, String schemaName) {
- return api(cluster).mono(c -> c.deleteAllSubjectVersions(schemaName, false));
- }
- /**
- * Checks whether the provided schema duplicates the previous or not, creates a new schema
- * and then returns the whole content by requesting its latest version.
- */
- public Mono<SubjectWithCompatibilityLevel> registerNewSchema(KafkaCluster cluster,
- String subject,
- NewSubject newSchemaSubject) {
- return api(cluster)
- .mono(c -> c.registerNewSchema(subject, newSchemaSubject))
- .onErrorMap(WebClientResponseException.Conflict.class,
- th -> new SchemaCompatibilityException())
- .onErrorMap(WebClientResponseException.UnprocessableEntity.class,
- th -> new ValidationException("Invalid schema"))
- .then(getLatestSchemaVersionBySubject(cluster, subject));
- }
- public Mono<Void> updateSchemaCompatibility(KafkaCluster cluster,
- String schemaName,
- Compatibility compatibility) {
- return api(cluster)
- .mono(c -> c.updateSubjectCompatibilityLevel(
- schemaName, new CompatibilityLevelChange().compatibility(compatibility)))
- .then();
- }
- public Mono<Void> updateGlobalSchemaCompatibility(KafkaCluster cluster,
- Compatibility compatibility) {
- return api(cluster)
- .mono(c -> c.updateGlobalCompatibilityLevel(new CompatibilityLevelChange().compatibility(compatibility)))
- .then();
- }
- public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
- String schemaName) {
- return api(cluster)
- .mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
- .map(CompatibilityConfig::getCompatibilityLevel)
- .onErrorResume(error -> Mono.empty());
- }
- public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
- return api(cluster)
- .mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
- .map(CompatibilityConfig::getCompatibilityLevel);
- }
- private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
- String schemaName) {
- return getSchemaCompatibilityLevel(cluster, schemaName)
- .switchIfEmpty(this.getGlobalSchemaCompatibilityLevel(cluster));
- }
- public Mono<CompatibilityCheckResponse> checksSchemaCompatibility(KafkaCluster cluster,
- String schemaName,
- NewSubject newSchemaSubject) {
- return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject));
- }
- }
|