diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java index 3d923ad07b..f6038d01e2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/ClustersMetricsScheduler.java @@ -26,7 +26,7 @@ public class ClustersMetricsScheduler { .subscribeOn(Schedulers.parallel()) .map(Map.Entry::getValue) .flatMap(metricsUpdateService::updateMetrics) - .doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s)) + .doOnNext(s -> clustersStorage.setKafkaCluster(s.getName(), s)) .subscribe(); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java index f51815d945..427b6ae31a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java @@ -10,7 +10,6 @@ import java.util.Map; @Builder(toBuilder = true) public class KafkaCluster { - private final String id = ""; private final String name; private final String jmxHost; private final String jmxPort; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java index e3a8b8edeb..88f1c672dd 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java @@ -3,11 +3,13 @@ package com.provectus.kafka.ui.cluster.service; import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; import com.provectus.kafka.ui.cluster.model.ClustersStorage; import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.cluster.util.ClusterUtil; import com.provectus.kafka.ui.kafka.KafkaService; import com.provectus.kafka.ui.model.*; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Collections; @@ -69,4 +71,15 @@ public class ClusterService { .map(kafkaService::getConsumerGroups) .orElse(Mono.empty()); } + + public Flux getBrokers (String clusterName) { + return kafkaService.getOrCreateAdminClient(clustersStorage.getClusterByName(clusterName).orElseThrow()) + .flatMap(client -> ClusterUtil.toMono(client.describeCluster().nodes()) + .map(n -> n.stream().map(node -> { + Broker broker = new Broker(); + broker.setId(node.idString()); + return broker; + }).collect(Collectors.toList()))) + .flatMapMany(Flux::fromIterable); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java index e343e8eee4..adf1caa480 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java @@ -28,7 +28,6 @@ public class ClusterUtil { public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) { ConsumerGroup consumerGroup = new ConsumerGroup(); - consumerGroup.setClusterId(cluster.getId()); consumerGroup.setConsumerGroupId(c.groupId()); consumerGroup.setNumConsumers(c.members().size()); int numTopics = c.members().stream().mapToInt( m -> m.assignment().topicPartitions().size()).sum(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java index 4df4d56d78..2941c51d72 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java @@ -179,7 +179,7 @@ public class KafkaService { public Mono getOrCreateAdminClient(KafkaCluster cluster) { AdminClient adminClient = adminClientCache.computeIfAbsent( - cluster.getId(), + cluster.getName(), (id) -> createAdminClient(cluster) ); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java index be3e18ceea..82348aa4a1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java @@ -12,7 +12,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import javax.validation.Valid; -import java.util.ArrayList; @RestController @RequiredArgsConstructor @@ -26,32 +25,32 @@ public class MetricsRestController implements ApiClustersApi { } @Override - public Mono> getBrokersMetrics(String clusterId, ServerWebExchange exchange) { + public Mono> getBrokersMetrics(String clusterName, ServerWebExchange exchange) { return Mono.just( - clusterService.getBrokersMetrics(clusterId) + clusterService.getBrokersMetrics(clusterName) .map(ResponseEntity::ok) .orElse(ResponseEntity.notFound().build()) ); } @Override - public Mono>> getTopics(String clusterId, ServerWebExchange exchange) { - return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterId)))); + public Mono>> getTopics(String clusterName, ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName)))); } @Override - public Mono> getTopicDetails(String clusterId, String topicName, ServerWebExchange exchange) { + public Mono> getTopicDetails(String clusterName, String topicName, ServerWebExchange exchange) { return Mono.just( - clusterService.getTopicDetails(clusterId, topicName) + clusterService.getTopicDetails(clusterName, topicName) .map(ResponseEntity::ok) .orElse(ResponseEntity.notFound().build()) ); } @Override - public Mono>> getTopicConfigs(String clusterId, String topicName, ServerWebExchange exchange) { + public Mono>> getTopicConfigs(String clusterName, String topicName, ServerWebExchange exchange) { return Mono.just( - clusterService.getTopicConfigs(clusterId, topicName) + clusterService.getTopicConfigs(clusterName, topicName) .map(Flux::fromIterable) .map(ResponseEntity::ok) .orElse(ResponseEntity.notFound().build()) @@ -59,16 +58,15 @@ public class MetricsRestController implements ApiClustersApi { } @Override - public Mono> createTopic(String clusterId, @Valid Mono topicFormData, ServerWebExchange exchange) { - return clusterService.createTopic(clusterId, topicFormData) + public Mono> createTopic(String clusterName, @Valid Mono topicFormData, ServerWebExchange exchange) { + return clusterService.createTopic(clusterName, topicFormData) .map(s -> new ResponseEntity<>(s, HttpStatus.OK)) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); } @Override - public Mono>> getBrokers(String clusterId, ServerWebExchange exchange) { - //TODO: ???? - return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>()))); + public Mono>> getBrokers(String clusterName, ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(clusterService.getBrokers(clusterName))); } @Override diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index e79ec16102..acb19f1981 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -196,8 +196,6 @@ components: Cluster: type: object properties: - id: - type: string name: type: string defaultCluster: