From 18b90ce317de80c11f9d8372a0bbf6d48b2fb24d Mon Sep 17 00:00:00 2001 From: Ildar Almakaev Date: Fri, 22 Jan 2021 15:46:41 +0300 Subject: [PATCH] Add endpoint to retrieve subjects from schema registry by cluster name --- .../ui/cluster/service/ClusterService.java | 58 ++++++++++--------- .../service/SchemaRegistryService.java | 26 +++++++++ .../kafka/ui/rest/MetricsRestController.java | 10 +++- .../src/main/resources/application-local.yml | 4 +- .../main/resources/swagger/kafka-ui-api.yaml | 22 +++++++ 5 files changed, 89 insertions(+), 31 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index e18c632894..e3ea90990c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -31,6 +31,7 @@ public class ClusterService { private final ClusterMapper clusterMapper; private final KafkaService kafkaService; private final ConsumingService consumingService; + private final SchemaRegistryService schemaRegistryService; public List getClusters() { return clustersStorage.getKafkaClusters() @@ -41,8 +42,8 @@ public class ClusterService { public Mono getBrokerMetrics(String name, Integer id) { return Mono.justOrEmpty(clustersStorage.getClusterByName(name) - .map( c -> c.getMetrics().getInternalBrokerMetrics()) - .map( m -> m.get(id)) + .map(c -> c.getMetrics().getInternalBrokerMetrics()) + .map(m -> m.get(id)) .map(clusterMapper::toBrokerMetrics)); } @@ -74,17 +75,17 @@ public class ClusterService { public Optional getTopicDetails(String name, String topicName) { return clustersStorage.getClusterByName(name) - .flatMap( c -> + .flatMap(c -> Optional.ofNullable( - c.getTopics().get(topicName) + c.getTopics().get(topicName) ).map( - t -> t.toBuilder().partitions( - kafkaService.getTopicPartitions(c, t) - ).build() + t -> t.toBuilder().partitions( + kafkaService.getTopicPartitions(c, t) + ).build() ).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics())) ); } - + public Optional> getTopicConfigs(String name, String topicName) { return clustersStorage.getClusterByName(name) .map(KafkaCluster::getTopics) @@ -105,17 +106,17 @@ public class ClusterService { var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new); return kafkaService.getOrCreateAdminClient(cluster).map(ac -> - ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all() - ).flatMap(groups -> + ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all() + ).flatMap(groups -> groupMetadata(cluster, consumerGroupId) - .flatMap(offsets -> { - Map endOffsets = topicPartitionsEndOffsets(cluster, offsets.keySet()); + .flatMap(offsets -> { + Map endOffsets = topicPartitionsEndOffsets(cluster, offsets.keySet()); return ClusterUtil.toMono(groups).map(s -> s.get(consumerGroupId).members().stream() - .flatMap(c -> Stream.of(ClusterUtil.convertToConsumerTopicPartitionDetails(c, offsets, endOffsets))) + .flatMap(c -> Stream.of(ClusterUtil.convertToConsumerTopicPartitionDetails(c, offsets, endOffsets))) .collect(Collectors.toList()).stream().flatMap(t -> t.stream().flatMap(Stream::of)).collect(Collectors.toList())); - }) - ) - .map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId)); + }) + ) + .map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId)); } @@ -140,21 +141,21 @@ public class ClusterService { } @SneakyThrows - public Mono> getConsumerGroups (String clusterName) { - return clustersStorage.getClusterByName(clusterName) - .map(kafkaService::getConsumerGroups) - .orElse(Mono.empty()); + public Mono> getConsumerGroups(String clusterName) { + return clustersStorage.getClusterByName(clusterName) + .map(kafkaService::getConsumerGroups) + .orElse(Mono.empty()); } - public Flux getBrokers (String clusterName) { + public Flux getBrokers(String clusterName) { return kafkaService.getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow()) .flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes()) - .map(n -> n.stream().map(node -> { - Broker broker = new Broker(); - broker.setId(node.id()); - broker.setHost(node.host()); - return broker; - }).collect(Collectors.toList()))) + .map(n -> n.stream().map(node -> { + Broker broker = new Broker(); + broker.setId(node.id()); + broker.setHost(node.host()); + return broker; + }).collect(Collectors.toList()))) .flatMapMany(Flux::fromIterable); } @@ -180,4 +181,7 @@ public class ClusterService { .orElse(Flux.empty()); } + public Flux getSchemaSubjects(String clusterName) { + return schemaRegistryService.getAllSchemaSubjects(clusterName); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java new file mode 100644 index 0000000000..40bfc59319 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java @@ -0,0 +1,26 @@ +package com.provectus.kafka.ui.cluster.service; + +import com.provectus.kafka.ui.cluster.model.ClustersStorage; +import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; + +@Service +@Log4j2 +@RequiredArgsConstructor +public class SchemaRegistryService { + private final ClustersStorage clustersStorage; + public static final String URL_SUBJECTS = "/subjects"; + + public Flux getAllSchemaSubjects(String clusterName) { + KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow(); + WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry()); + return webClient.get() + .uri(URL_SUBJECTS) + .retrieve() + .bodyToFlux(String.class); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index e5885cc79c..5858ad9321 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -32,8 +32,8 @@ public class MetricsRestController implements ApiClustersApi { @Override public Mono> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) { return clusterService.getBrokerMetrics(clusterName, id) - .map(ResponseEntity::ok) - .onErrorReturn(ResponseEntity.notFound().build()); + .map(ResponseEntity::ok) + .onErrorReturn(ResponseEntity.notFound().build()); } @Override @@ -100,6 +100,12 @@ public class MetricsRestController implements ApiClustersApi { .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list } + @Override + public Mono>> getSchemaSubjects(String clusterName, ServerWebExchange exchange) { + Flux subjects = clusterService.getSchemaSubjects(clusterName); + return Mono.just(ResponseEntity.ok(subjects)); + } + @Override public Mono> getConsumerGroup(String clusterName, String consumerGroupId, ServerWebExchange exchange) { return clusterService.getConsumerGroupDetail(clusterName, consumerGroupId).map(ResponseEntity::ok); diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index e1f9351d51..0f771dc4b5 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -2,9 +2,9 @@ kafka: clusters: - name: local - bootstrapServers: localhost:9092 + bootstrapServers: localhost:9093 zookeeper: localhost:2181 - schemaRegistry: http://localhost:8085 + schemaRegistry: http://localhost:8081 # schemaNameTemplate: "%s-value" jmxPort: 9997 - diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 89acbb29ac..23062201c4 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -335,6 +335,28 @@ paths: items: $ref: '#/components/schemas/ConsumerGroup' + /api/clusters/{clusterName}/schema/subjects: + get: + tags: + - /api/clusters + summary: get all subjects from schema registry + operationId: getSchemaSubjects + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + type: string + components: schemas: Cluster: