Add endpoint to retrieve subjects from schema registry by cluster name

This commit is contained in:
Ildar Almakaev 2021-01-22 15:46:41 +03:00
parent 98baabd087
commit 18b90ce317
5 changed files with 89 additions and 31 deletions

View file

@ -31,6 +31,7 @@ public class ClusterService {
private final ClusterMapper clusterMapper;
private final KafkaService kafkaService;
private final ConsumingService consumingService;
private final SchemaRegistryService schemaRegistryService;
public List<Cluster> getClusters() {
return clustersStorage.getKafkaClusters()
@ -41,8 +42,8 @@ public class ClusterService {
public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
.map( c -> c.getMetrics().getInternalBrokerMetrics())
.map( m -> m.get(id))
.map(c -> c.getMetrics().getInternalBrokerMetrics())
.map(m -> m.get(id))
.map(clusterMapper::toBrokerMetrics));
}
@ -74,17 +75,17 @@ public class ClusterService {
public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
return clustersStorage.getClusterByName(name)
.flatMap( c ->
.flatMap(c ->
Optional.ofNullable(
c.getTopics().get(topicName)
c.getTopics().get(topicName)
).map(
t -> t.toBuilder().partitions(
kafkaService.getTopicPartitions(c, t)
).build()
t -> t.toBuilder().partitions(
kafkaService.getTopicPartitions(c, t)
).build()
).map(t -> clusterMapper.toTopicDetails(t, c.getMetrics()))
);
}
public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
return clustersStorage.getClusterByName(name)
.map(KafkaCluster::getTopics)
@ -105,17 +106,17 @@ public class ClusterService {
var cluster = clustersStorage.getClusterByName(clusterName).orElseThrow(Throwable::new);
return kafkaService.getOrCreateAdminClient(cluster).map(ac ->
ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
).flatMap(groups ->
ac.getAdminClient().describeConsumerGroups(Collections.singletonList(consumerGroupId)).all()
).flatMap(groups ->
groupMetadata(cluster, consumerGroupId)
.flatMap(offsets -> {
Map<TopicPartition, Long> endOffsets = topicPartitionsEndOffsets(cluster, offsets.keySet());
.flatMap(offsets -> {
Map<TopicPartition, Long> endOffsets = topicPartitionsEndOffsets(cluster, offsets.keySet());
return ClusterUtil.toMono(groups).map(s -> s.get(consumerGroupId).members().stream()
.flatMap(c -> Stream.of(ClusterUtil.convertToConsumerTopicPartitionDetails(c, offsets, endOffsets)))
.flatMap(c -> Stream.of(ClusterUtil.convertToConsumerTopicPartitionDetails(c, offsets, endOffsets)))
.collect(Collectors.toList()).stream().flatMap(t -> t.stream().flatMap(Stream::of)).collect(Collectors.toList()));
})
)
.map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId));
})
)
.map(c -> new ConsumerGroupDetails().consumers(c).consumerGroupId(consumerGroupId));
}
@ -140,21 +141,21 @@ public class ClusterService {
}
@SneakyThrows
public Mono<List<ConsumerGroup>> getConsumerGroups (String clusterName) {
return clustersStorage.getClusterByName(clusterName)
.map(kafkaService::getConsumerGroups)
.orElse(Mono.empty());
public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
return clustersStorage.getClusterByName(clusterName)
.map(kafkaService::getConsumerGroups)
.orElse(Mono.empty());
}
public Flux<Broker> getBrokers (String clusterName) {
public Flux<Broker> getBrokers(String clusterName) {
return kafkaService.getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
.flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes())
.map(n -> n.stream().map(node -> {
Broker broker = new Broker();
broker.setId(node.id());
broker.setHost(node.host());
return broker;
}).collect(Collectors.toList())))
.map(n -> n.stream().map(node -> {
Broker broker = new Broker();
broker.setId(node.id());
broker.setHost(node.host());
return broker;
}).collect(Collectors.toList())))
.flatMapMany(Flux::fromIterable);
}
@ -180,4 +181,7 @@ public class ClusterService {
.orElse(Flux.empty());
}
public Flux<String> getSchemaSubjects(String clusterName) {
return schemaRegistryService.getAllSchemaSubjects(clusterName);
}
}

View file

@ -0,0 +1,26 @@
package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
@Service
@Log4j2
@RequiredArgsConstructor
public class SchemaRegistryService {
private final ClustersStorage clustersStorage;
public static final String URL_SUBJECTS = "/subjects";
public Flux<String> getAllSchemaSubjects(String clusterName) {
KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
return webClient.get()
.uri(URL_SUBJECTS)
.retrieve()
.bodyToFlux(String.class);
}
}

View file

@ -32,8 +32,8 @@ public class MetricsRestController implements ApiClustersApi {
@Override
public Mono<ResponseEntity<BrokerMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
return clusterService.getBrokerMetrics(clusterName, id)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build());
}
@Override
@ -100,6 +100,12 @@ public class MetricsRestController implements ApiClustersApi {
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
}
@Override
public Mono<ResponseEntity<Flux<String>>> getSchemaSubjects(String clusterName, ServerWebExchange exchange) {
Flux<String> subjects = clusterService.getSchemaSubjects(clusterName);
return Mono.just(ResponseEntity.ok(subjects));
}
@Override
public Mono<ResponseEntity<ConsumerGroupDetails>> getConsumerGroup(String clusterName, String consumerGroupId, ServerWebExchange exchange) {
return clusterService.getConsumerGroupDetail(clusterName, consumerGroupId).map(ResponseEntity::ok);

View file

@ -2,9 +2,9 @@ kafka:
clusters:
-
name: local
bootstrapServers: localhost:9092
bootstrapServers: localhost:9093
zookeeper: localhost:2181
schemaRegistry: http://localhost:8085
schemaRegistry: http://localhost:8081
# schemaNameTemplate: "%s-value"
jmxPort: 9997
-

View file

@ -335,6 +335,28 @@ paths:
items:
$ref: '#/components/schemas/ConsumerGroup'
/api/clusters/{clusterName}/schema/subjects:
get:
tags:
- /api/clusters
summary: get all subjects from schema registry
operationId: getSchemaSubjects
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
type: string
components:
schemas:
Cluster: