|
@@ -1,12 +1,10 @@
|
|
|
package com.provectus.kafka.ui.kafka;
|
|
|
|
|
|
-import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
|
|
|
-import com.provectus.kafka.ui.cluster.model.InternalTopic;
|
|
|
-import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
|
|
|
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
|
|
|
+import com.provectus.kafka.ui.cluster.model.*;
|
|
|
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
|
|
|
import com.provectus.kafka.ui.model.ConsumerGroup;
|
|
|
import com.provectus.kafka.ui.model.ServerStatus;
|
|
|
+import com.provectus.kafka.ui.model.Topic;
|
|
|
import com.provectus.kafka.ui.model.TopicFormData;
|
|
|
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
@@ -16,6 +14,7 @@ import org.apache.kafka.clients.admin.*;
|
|
|
import org.apache.kafka.common.KafkaFuture;
|
|
|
import org.apache.kafka.common.Node;
|
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
import reactor.util.function.Tuple2;
|
|
@@ -27,23 +26,27 @@ import java.util.Map;
|
|
|
import java.util.Properties;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
@Service
|
|
|
@RequiredArgsConstructor
|
|
|
@Log4j2
|
|
|
public class KafkaService {
|
|
|
|
|
|
+ @Value("${kafka.admin-client-timeout}")
|
|
|
+ private int clientTimeout;
|
|
|
+
|
|
|
private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true);
|
|
|
|
|
|
private final ZookeeperService zookeeperService;
|
|
|
- private final Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
|
|
|
+ private final Map<String, Mono<ExtendedAdminClient>> adminClientCache = new ConcurrentHashMap<>();
|
|
|
|
|
|
@SneakyThrows
|
|
|
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
|
|
|
return getOrCreateAdminClient(cluster).flatMap(
|
|
|
- ac -> getClusterMetrics(ac).flatMap( clusterMetrics ->
|
|
|
- getTopicsData(ac).flatMap( topics ->
|
|
|
- loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
|
|
|
+ ac -> getClusterMetrics(ac.getAdminClient()).flatMap( clusterMetrics ->
|
|
|
+ getTopicsData(ac.getAdminClient()).flatMap( topics ->
|
|
|
+ loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
|
|
|
.map( configs -> mergeWithConfigs(topics, configs) )
|
|
|
).map( topics -> buildFromData(cluster, clusterMetrics, topics))
|
|
|
)
|
|
@@ -150,8 +153,7 @@ public class KafkaService {
|
|
|
|
|
|
|
|
|
public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
|
|
|
- AdminClient adminClient = this.createAdminClient(cluster);
|
|
|
- return this.createTopic(adminClient, topicFormData);
|
|
|
+ return getOrCreateAdminClient(cluster).flatMap(ac -> createTopic(ac.getAdminClient(), topicFormData));
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@@ -180,24 +182,23 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
|
|
|
- public Mono<AdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
|
|
|
- AdminClient adminClient = adminClientCache.computeIfAbsent(
|
|
|
+ public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
|
|
|
+ return adminClientCache.computeIfAbsent(
|
|
|
cluster.getName(),
|
|
|
(id) -> createAdminClient(cluster)
|
|
|
- );
|
|
|
-
|
|
|
- return isAdminClientConnected(adminClient);
|
|
|
+ ).flatMap(this::isAdminClientConnected);
|
|
|
}
|
|
|
|
|
|
- public AdminClient createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
+ public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
Properties properties = new Properties();
|
|
|
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
- properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
|
|
|
- return AdminClient.create(properties);
|
|
|
+ properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
|
|
+ AdminClient adminClient = AdminClient.create(properties);
|
|
|
+ return ExtendedAdminClient.extendedAdminClient(adminClient);
|
|
|
}
|
|
|
|
|
|
- private Mono<AdminClient> isAdminClientConnected(AdminClient adminClient) {
|
|
|
- return getClusterId(adminClient).map( r -> adminClient);
|
|
|
+ private Mono<ExtendedAdminClient> isAdminClientConnected(ExtendedAdminClient adminClient) {
|
|
|
+ return getClusterId(adminClient.getAdminClient()).map( r -> adminClient);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -231,22 +232,55 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
|
|
|
- var adminClient = this.createAdminClient(cluster);
|
|
|
-
|
|
|
- return ClusterUtil.toMono(adminClient.listConsumerGroups().all())
|
|
|
- .flatMap(s -> ClusterUtil.toMono(adminClient
|
|
|
+ return getOrCreateAdminClient(cluster).flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
|
|
|
+ .flatMap(s -> ClusterUtil.toMono(ac.getAdminClient()
|
|
|
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
|
|
|
.map(s -> s.values().stream()
|
|
|
- .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
|
|
|
+ .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList())));
|
|
|
}
|
|
|
|
|
|
|
|
|
@SneakyThrows
|
|
|
- private Mono<Void> createTopic(AdminClient adminClient, NewTopic newTopic) {
|
|
|
- return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
|
|
|
- .values()
|
|
|
- .values()
|
|
|
- .iterator()
|
|
|
- .next());
|
|
|
+ private Mono<String> createTopic(AdminClient adminClient, NewTopic newTopic) {
|
|
|
+ return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(), newTopic.name());
|
|
|
+ }
|
|
|
+
|
|
|
+ @SneakyThrows
|
|
|
+ public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
|
|
|
+ ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
|
|
|
+ return getOrCreateAdminClient(cluster)
|
|
|
+ .flatMap(ac -> {
|
|
|
+ if (ac.getSupportedFeatures().contains(ExtendedAdminClient.SupportedFeatures.INCREMENTAL_ALTER_CONFIGS)) {
|
|
|
+ return incrementalAlterConfig(topicFormData, topicCR, ac)
|
|
|
+ .flatMap(c -> getUpdatedTopic(ac, topicName));
|
|
|
+ } else {
|
|
|
+ return alterConfig(topicFormData, topicCR, ac)
|
|
|
+ .flatMap(c -> getUpdatedTopic(ac, topicName));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private Mono<Topic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
|
|
|
+ return getTopicsData(ac.getAdminClient())
|
|
|
+ .map(s -> s.stream()
|
|
|
+ .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow())
|
|
|
+ .map(ClusterUtil::convertToTopic);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<String> incrementalAlterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) {
|
|
|
+ List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream()
|
|
|
+ .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList());
|
|
|
+ return ClusterUtil.toMono(ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCR, listOp)).all(), topicCR.name());
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<String> alterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) {
|
|
|
+ List<ConfigEntry> configEntries = topicFormData.getConfigs().entrySet().stream()
|
|
|
+ .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))).collect(Collectors.toList());
|
|
|
+ Config config = new Config(configEntries);
|
|
|
+ Map<ConfigResource, Config> map = Collections.singletonMap(topicCR, config);
|
|
|
+ return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCR.name());
|
|
|
+
|
|
|
}
|
|
|
}
|