GetBrokers endpoint fixed, now returns just brokers ids, also removed id param from Cluster and KafkaCluster (#46)

Co-authored-by: Roman Nedzvetskiy <roman@Romans-MacBook-Pro.local>
This commit is contained in:
Roman Nedzvetskiy 2020-05-12 14:12:50 +03:00 committed by GitHub
parent 42085b5f07
commit 5b4a9c5a03
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 27 additions and 20 deletions

View file

@ -26,7 +26,7 @@ public class ClustersMetricsScheduler {
.subscribeOn(Schedulers.parallel())
.map(Map.Entry::getValue)
.flatMap(metricsUpdateService::updateMetrics)
.doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s))
.doOnNext(s -> clustersStorage.setKafkaCluster(s.getName(), s))
.subscribe();
}
}

View file

@ -10,7 +10,6 @@ import java.util.Map;
@Builder(toBuilder = true)
public class KafkaCluster {
private final String id = "";
private final String name;
private final String jmxHost;
private final String jmxPort;

View file

@ -3,11 +3,13 @@ package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
@ -69,4 +71,15 @@ public class ClusterService {
.map(kafkaService::getConsumerGroups)
.orElse(Mono.empty());
}
public Flux<Broker> getBrokers (String clusterName) {
return kafkaService.getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow())
.flatMap(client -> ClusterUtil.toMono(client.describeCluster().nodes())
.map(n -> n.stream().map(node -> {
Broker broker = new Broker();
broker.setId(node.idString());
return broker;
}).collect(Collectors.toList())))
.flatMapMany(Flux::fromIterable);
}
}

View file

@ -28,7 +28,6 @@ public class ClusterUtil {
public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
ConsumerGroup consumerGroup = new ConsumerGroup();
consumerGroup.setClusterId(cluster.getId());
consumerGroup.setConsumerGroupId(c.groupId());
consumerGroup.setNumConsumers(c.members().size());
int numTopics = c.members().stream().mapToInt( m -> m.assignment().topicPartitions().size()).sum();

View file

@ -179,7 +179,7 @@ public class KafkaService {
public Mono<AdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
AdminClient adminClient = adminClientCache.computeIfAbsent(
cluster.getId(),
cluster.getName(),
(id) -> createAdminClient(cluster)
);

View file

@ -12,7 +12,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.validation.Valid;
import java.util.ArrayList;
@RestController
@RequiredArgsConstructor
@ -26,32 +25,32 @@ public class MetricsRestController implements ApiClustersApi {
}
@Override
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterId, ServerWebExchange exchange) {
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterName, ServerWebExchange exchange) {
return Mono.just(
clusterService.getBrokersMetrics(clusterId)
clusterService.getBrokersMetrics(clusterName)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterId, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterId))));
public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));
}
@Override
public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterId, String topicName, ServerWebExchange exchange) {
public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterName, String topicName, ServerWebExchange exchange) {
return Mono.just(
clusterService.getTopicDetails(clusterId, topicName)
clusterService.getTopicDetails(clusterName, topicName)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterId, String topicName, ServerWebExchange exchange) {
public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterName, String topicName, ServerWebExchange exchange) {
return Mono.just(
clusterService.getTopicConfigs(clusterId, topicName)
clusterService.getTopicConfigs(clusterName, topicName)
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
@ -59,16 +58,15 @@ public class MetricsRestController implements ApiClustersApi {
}
@Override
public Mono<ResponseEntity<Topic>> createTopic(String clusterId, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.createTopic(clusterId, topicFormData)
public Mono<ResponseEntity<Topic>> createTopic(String clusterName, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.createTopic(clusterName, topicFormData)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
@Override
public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
//TODO: ????
return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterName, ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(clusterService.getBrokers(clusterName)));
}
@Override

View file

@ -196,8 +196,6 @@ components:
Cluster:
type: object
properties:
id:
type: string
name:
type: string
defaultCluster: