|
@@ -1,10 +1,13 @@
|
|
|
package com.provectus.kafka.ui.service;
|
|
|
|
|
|
+import com.provectus.kafka.ui.exception.IllegalEntityStateException;
|
|
|
+import com.provectus.kafka.ui.exception.NotFoundException;
|
|
|
import com.provectus.kafka.ui.exception.TopicMetadataException;
|
|
|
import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
import com.provectus.kafka.ui.model.CleanupPolicy;
|
|
|
import com.provectus.kafka.ui.model.CreateTopicMessage;
|
|
|
import com.provectus.kafka.ui.model.ExtendedAdminClient;
|
|
|
+import com.provectus.kafka.ui.model.InternalBrokerConfig;
|
|
|
import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
|
|
|
import com.provectus.kafka.ui.model.InternalBrokerMetrics;
|
|
|
import com.provectus.kafka.ui.model.InternalClusterMetrics;
|
|
@@ -54,6 +57,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp;
|
|
|
import org.apache.kafka.clients.admin.Config;
|
|
|
import org.apache.kafka.clients.admin.ConfigEntry;
|
|
|
import org.apache.kafka.clients.admin.ConsumerGroupListing;
|
|
|
+import org.apache.kafka.clients.admin.DescribeConfigsOptions;
|
|
|
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
|
|
|
import org.apache.kafka.clients.admin.ListTopicsOptions;
|
|
|
import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
|
@@ -328,17 +332,47 @@ public class KafkaService {
|
|
|
|
|
|
return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
|
|
|
.map(configs ->
|
|
|
- configs.entrySet().stream().map(
|
|
|
- c -> Tuples.of(
|
|
|
- c.getKey().name(),
|
|
|
- c.getValue().entries().stream().map(ClusterUtil::mapToInternalTopicConfig)
|
|
|
- .collect(Collectors.toList())
|
|
|
- )
|
|
|
- ).collect(Collectors.toMap(
|
|
|
- Tuple2::getT1,
|
|
|
- Tuple2::getT2
|
|
|
- ))
|
|
|
- );
|
|
|
+ configs.entrySet().stream().collect(Collectors.toMap(
|
|
|
+ c -> c.getKey().name(),
|
|
|
+ c -> c.getValue().entries().stream()
|
|
|
+ .map(ClusterUtil::mapToInternalTopicConfig)
|
|
|
+ .collect(Collectors.toList()))));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<Map<String, List<InternalBrokerConfig>>> loadBrokersConfig(
|
|
|
+ AdminClient adminClient, List<Integer> brokersIds) {
|
|
|
+ List<ConfigResource> resources = brokersIds.stream()
|
|
|
+ .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ return ClusterUtil.toMono(adminClient.describeConfigs(resources,
|
|
|
+ new DescribeConfigsOptions().includeSynonyms(true)).all())
|
|
|
+ .map(configs ->
|
|
|
+ configs.entrySet().stream().collect(Collectors.toMap(
|
|
|
+ c -> c.getKey().name(),
|
|
|
+ c -> c.getValue().entries().stream()
|
|
|
+ .map(ClusterUtil::mapToInternalBrokerConfig)
|
|
|
+ .collect(Collectors.toList()))));
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<List<InternalBrokerConfig>> loadBrokersConfig(
|
|
|
+ AdminClient adminClient, Integer brokerId) {
|
|
|
+ return loadBrokersConfig(adminClient, Collections.singletonList(brokerId))
|
|
|
+ .map(map -> map.values().stream()
|
|
|
+ .findFirst()
|
|
|
+ .orElseThrow(() -> new IllegalEntityStateException(
|
|
|
+ String.format("Config for broker %s not found", brokerId))));
|
|
|
+ }
|
|
|
+
|
|
|
+ public Mono<List<InternalBrokerConfig>> getBrokerConfigs(KafkaCluster cluster, Integer brokerId) {
|
|
|
+ return getOrCreateAdminClient(cluster)
|
|
|
+ .flatMap(adminClient -> {
|
|
|
+ if (!cluster.getBrokers().contains(brokerId)) {
|
|
|
+ return Mono.error(
|
|
|
+ new NotFoundException(String.format("Broker with id %s not found", brokerId)));
|
|
|
+ }
|
|
|
+ return loadBrokersConfig(adminClient.getAdminClient(), brokerId);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
|