refs resolving

This commit is contained in:
iliax 2023-04-27 22:14:46 +04:00
parent 864bff081a
commit f25242db57
3 changed files with 33 additions and 29 deletions

View file

@ -113,7 +113,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
.build());
return validateAccess.then(
topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
topicsService.deleteTopic(getCluster(clusterName), topicName).thenReturn(ResponseEntity.ok().build())
);
}

View file

@ -4,9 +4,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import java.util.List;
import java.util.Optional;
import reactor.core.publisher.Mono;
// logic copied from AbstractSchemaProvider:resolveReferences
@ -20,37 +23,38 @@ class SchemaReferencesResolver {
}
Mono<ImmutableMap<String, String>> resolve(List<SchemaReference> references) {
ImmutableMap<String, String> result = ImmutableMap.of();
ImmutableSet<String> visited = ImmutableSet.of();
return resolveReferences(references, result, visited);
return resolveReferences(references, new State(ImmutableMap.of(), ImmutableSet.of()))
.map(State::resolved);
}
private Mono<ImmutableMap<String, String>> resolveReferences(List<SchemaReference> references,
ImmutableMap<String, String> inputSchemas,
ImmutableSet<String> visited) {
if (references == null || references.isEmpty()) {
return Mono.just(inputSchemas);
private record State(ImmutableMap<String, String> resolved, ImmutableSet<String> visited) {
State visit(String name) {
return new State(resolved, ImmutableSet.<String>builder().addAll(visited).add(name).build());
}
Mono<ImmutableMap<String, String>> result = Mono.just(inputSchemas);
for (SchemaReference reference : references) {
if (!visited.contains(reference.getName())) {
visited = ImmutableSet.<String>builder().addAll(visited).add(reference.getName()).build();
final ImmutableSet<String> finalVisited = visited;
result = result.flatMap(registeredSchemas ->
client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true)
.flatMap(subj -> {
checkNotNull(subj.getSchema(), "Subject '%s' schema is null", subj.getSubject());
if (registeredSchemas.containsKey(reference.getName())) {
return Mono.just(registeredSchemas);
}
return resolveReferences(subj.getReferences(), registeredSchemas, finalVisited)
.map(updated -> ImmutableMap.<String, String>builder()
.putAll(updated)
.put(reference.getName(), subj.getSchema())
.build());
}));
}
State resolve(String ref, String schema) {
return new State(ImmutableMap.<String, String>builder().putAll(resolved).put(ref, schema).build(), visited);
}
}
private Mono<State> resolveReferences(List<SchemaReference> references,
State initState) {
Mono<State> result = Mono.just(initState);
for (var reference : Optional.ofNullable(references).orElse(List.of())) {
result = result.flatMap(state -> {
if (state.visited().contains(reference.getName())) {
return Mono.just(state);
} else {
final var newState = state.visit(reference.getName());
return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true)
.flatMap(subj ->
resolveReferences(subj.getReferences(), newState)
.map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema()))
);
}
}
);
}
return result;
}

View file

@ -110,7 +110,7 @@ class TopicsExporter {
});
}
private Mono<Tuple2<SchemaSubject, ImmutableMap<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster,
private Mono<Tuple2<SchemaSubject, Map<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster,
String subjectName) {
return cluster.getSchemaRegistryClient()
.mono(client ->