Merge branch 'feature/schema_registry_views' of github.com:provectus/kafka-ui into feature/schema_registry_views

This commit is contained in:
Guzel Kafizova 2021-02-12 11:09:16 +03:00
commit 8b4e5f0157
3 changed files with 44 additions and 17 deletions

View file

@ -9,6 +9,7 @@ import com.provectus.kafka.ui.model.CompatibilityCheckResponse;
import com.provectus.kafka.ui.model.CompatibilityLevel;
import com.provectus.kafka.ui.model.NewSchemaSubject;
import com.provectus.kafka.ui.model.SchemaSubject;
import java.util.Formatter;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.HttpStatus;
@ -51,9 +52,12 @@ public class SchemaRegistryService {
.map(cluster -> webClient.get()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, schemaName)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
.bodyToFlux(Integer.class))
.orElse(Flux.error(new NotFoundException("No such cluster")));
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(
new NotFoundException(formatted("No such schema %s"))
)
).bodyToFlux(Integer.class)
).orElse(Flux.error(new NotFoundException("No such cluster")));
}
public Mono<SchemaSubject> getSchemaSubjectByVersion(String clusterName, String schemaName, Integer version) {
@ -70,8 +74,12 @@ public class SchemaRegistryService {
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
.bodyToMono(SchemaSubject.class)
resp -> Mono.error(
new NotFoundException(
formatted("No such schema %s with version %s", schemaName, version)
)
)
).bodyToMono(SchemaSubject.class)
.zipWith(getSchemaCompatibilityInfoOrGlobal(clusterName, schemaName))
.map(tuple -> {
SchemaSubject schema = tuple.getT1();
@ -97,9 +105,13 @@ public class SchemaRegistryService {
.uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, schemaName, version)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s with version %s".formatted(schemaName, version))))
.toBodilessEntity())
.orElse(Mono.error(new NotFoundException("No such cluster")));
resp -> Mono.error(
new NotFoundException(
formatted("No such schema %s with version %s", schemaName, version)
)
)
).toBodilessEntity()
).orElse(Mono.error(new NotFoundException("No such cluster")));
}
public Mono<ResponseEntity<Void>> deleteSchemaSubject(String clusterName, String schemaName) {
@ -107,7 +119,13 @@ public class SchemaRegistryService {
.map(cluster -> webClient.delete()
.uri(cluster.getSchemaRegistry() + URL_SUBJECT, schemaName)
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(
new NotFoundException(
formatted("No such schema %s", schemaName)
)
)
)
.toBodilessEntity())
.orElse(Mono.error(new NotFoundException("No such cluster")));
}
@ -120,7 +138,9 @@ public class SchemaRegistryService {
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
resp -> Mono.error(
new NotFoundException(formatted("No such schema %s", schemaName)))
)
.toEntity(SchemaSubject.class)
.log())
.orElse(Mono.error(new NotFoundException("No such cluster")));
@ -142,7 +162,7 @@ public class SchemaRegistryService {
.body(BodyInserters.fromPublisher(compatibilityLevel, CompatibilityLevel.class))
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
.bodyToMono(Void.class);
}).orElse(Mono.error(new NotFoundException("No such cluster")));
}
@ -181,10 +201,14 @@ public class SchemaRegistryService {
.body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
.retrieve()
.onStatus(HttpStatus.NOT_FOUND::equals,
resp -> Mono.error(new NotFoundException("No such schema %s".formatted(schemaName))))
resp -> Mono.error(new NotFoundException(formatted("No such schema %s", schemaName))))
.bodyToMono(InternalCompatibilityCheck.class)
.map(mapper::toCompatibilityCheckResponse)
.log()
).orElse(Mono.error(new NotFoundException("No such cluster")));
}
public String formatted(String str, Object... args) {
return new Formatter().format(str, args).toString();
}
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.zookeeper;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import java.util.concurrent.ConcurrentHashMap;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.I0Itec.zkclient.ZkClient;
@ -14,7 +15,7 @@ import java.util.Map;
@Log4j2
public class ZookeeperService {
private final Map<String, ZkClient> cachedZkClient = new HashMap<>();
private final Map<String, ZkClient> cachedZkClient = new ConcurrentHashMap<>();
public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
var isConnected = false;
@ -33,7 +34,10 @@ public class ZookeeperService {
private ZkClient getOrCreateZkClient (KafkaCluster cluster) {
try {
return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), 1000));
return cachedZkClient.computeIfAbsent(
cluster.getName(),
(n) -> new ZkClient(cluster.getZookeeper(), 1000)
);
} catch (Exception e) {
log.error("Error while creating zookeeper client for cluster {}", cluster.getName());
return null;

View file

@ -1,4 +1,3 @@
import { flatten } from 'lodash';
import {
ApiClustersApi,
Configuration,
@ -261,13 +260,13 @@ export const fetchSchemasByClusterName = (
const schemaNames = await apiClient.getSchemas({ clusterName });
// TODO: Remove me after API refactoring
const schemas: SchemaSubject[][] = await Promise.all(
const schemas: SchemaSubject[] = await Promise.all(
schemaNames.map((schemaName) =>
apiClient.getLatestSchema({ clusterName, schemaName })
)
);
dispatch(actions.fetchSchemasByClusterNameAction.success(flatten(schemas)));
dispatch(actions.fetchSchemasByClusterNameAction.success(schemas));
} catch (e) {
dispatch(actions.fetchSchemasByClusterNameAction.failure());
}