|
@@ -4,6 +4,7 @@ import com.provectus.kafka.ui.cluster.model.*;
|
|
|
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
|
|
|
import com.provectus.kafka.ui.model.ConsumerGroup;
|
|
|
import com.provectus.kafka.ui.model.ServerStatus;
|
|
|
+import com.provectus.kafka.ui.model.Topic;
|
|
|
import com.provectus.kafka.ui.model.TopicFormData;
|
|
|
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
@@ -16,6 +17,7 @@ import org.apache.kafka.common.KafkaFuture;
|
|
|
import org.apache.kafka.common.Node;
|
|
|
import org.apache.kafka.common.TopicPartition;
|
|
|
import org.apache.kafka.common.config.ConfigResource;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
|
import org.springframework.stereotype.Service;
|
|
@@ -33,20 +35,23 @@ import java.util.stream.Stream;
|
|
|
@Log4j2
|
|
|
public class KafkaService {
|
|
|
|
|
|
+ @Value("${kafka.admin-client-timeout}")
|
|
|
+ private int clientTimeout;
|
|
|
+
|
|
|
private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true);
|
|
|
|
|
|
private final ZookeeperService zookeeperService;
|
|
|
- private final Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
|
|
|
+ private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
|
|
|
private final Map<AdminClient, Map<TopicPartition, Integer>> leadersCache = new ConcurrentHashMap<>();
|
|
|
|
|
|
@SneakyThrows
|
|
|
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
|
|
|
return getOrCreateAdminClient(cluster).flatMap(
|
|
|
- ac -> getClusterMetrics(ac)
|
|
|
+ ac -> getClusterMetrics(ac.getAdminClient())
|
|
|
|
|
|
.flatMap( clusterMetrics ->
|
|
|
- getTopicsData(ac).flatMap( topics ->
|
|
|
- loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
|
|
|
+ getTopicsData(ac.getAdminClient()).flatMap( topics ->
|
|
|
+ loadTopicsConfig(ac.getAdminClient(), topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
|
|
|
.map( configs -> mergeWithConfigs(topics, configs))
|
|
|
.flatMap(it -> updateSegmentMetrics(ac, clusterMetrics, it))
|
|
|
).map( segmentSizeDto -> buildFromData(cluster, segmentSizeDto))
|
|
@@ -170,8 +175,7 @@ public class KafkaService {
|
|
|
|
|
|
|
|
|
public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
|
|
|
- AdminClient adminClient = this.createAdminClient(cluster);
|
|
|
- return this.createTopic(adminClient, topicFormData);
|
|
|
+ return getOrCreateAdminClient(cluster).flatMap(ac -> createTopic(ac.getAdminClient(), topicFormData));
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
@@ -200,24 +204,18 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
|
|
|
- public Mono<AdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
|
|
|
- AdminClient adminClient = adminClientCache.computeIfAbsent(
|
|
|
- cluster.getName(),
|
|
|
- (id) -> createAdminClient(cluster)
|
|
|
- );
|
|
|
-
|
|
|
- return isAdminClientConnected(adminClient);
|
|
|
+ public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
|
|
|
+ return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
|
|
|
+ .switchIfEmpty(createAdminClient(cluster))
|
|
|
+ .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));
|
|
|
}
|
|
|
|
|
|
- public AdminClient createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
+ public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
Properties properties = new Properties();
|
|
|
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
- properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
|
|
|
- return AdminClient.create(properties);
|
|
|
- }
|
|
|
-
|
|
|
- private Mono<AdminClient> isAdminClientConnected(AdminClient adminClient) {
|
|
|
- return getClusterId(adminClient).map( r -> adminClient);
|
|
|
+ properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
|
|
+ AdminClient adminClient = AdminClient.create(properties);
|
|
|
+ return ExtendedAdminClient.extendedAdminClient(adminClient);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -251,12 +249,11 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
|
|
|
- var adminClient = this.createAdminClient(cluster);
|
|
|
- return ClusterUtil.toMono(adminClient.listConsumerGroups().all())
|
|
|
- .flatMap(s -> ClusterUtil.toMono(adminClient
|
|
|
+ return getOrCreateAdminClient(cluster).flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
|
|
|
+ .flatMap(s -> ClusterUtil.toMono(ac.getAdminClient()
|
|
|
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
|
|
|
.map(s -> s.values().stream()
|
|
|
- .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
|
|
|
+ .map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList())));
|
|
|
}
|
|
|
|
|
|
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
|
|
@@ -271,12 +268,47 @@ public class KafkaService {
|
|
|
|
|
|
|
|
|
@SneakyThrows
|
|
|
- private Mono<Void> createTopic(AdminClient adminClient, NewTopic newTopic) {
|
|
|
- return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
|
|
|
- .values()
|
|
|
- .values()
|
|
|
- .iterator()
|
|
|
- .next());
|
|
|
+ private Mono<String> createTopic(AdminClient adminClient, NewTopic newTopic) {
|
|
|
+ return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(), newTopic.name());
|
|
|
+ }
|
|
|
+
|
|
|
+ @SneakyThrows
|
|
|
+ public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
|
|
|
+ ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
|
|
|
+ return getOrCreateAdminClient(cluster)
|
|
|
+ .flatMap(ac -> {
|
|
|
+ if (ac.getSupportedFeatures().contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
|
|
|
+ return incrementalAlterConfig(topicFormData, topicCR, ac)
|
|
|
+ .flatMap(c -> getUpdatedTopic(ac, topicName));
|
|
|
+ } else {
|
|
|
+ return alterConfig(topicFormData, topicCR, ac)
|
|
|
+ .flatMap(c -> getUpdatedTopic(ac, topicName));
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private Mono<Topic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
|
|
|
+ return getTopicsData(ac.getAdminClient())
|
|
|
+ .map(s -> s.stream()
|
|
|
+ .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow())
|
|
|
+ .map(ClusterUtil::convertToTopic);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<String> incrementalAlterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) {
|
|
|
+ List<AlterConfigOp> listOp = topicFormData.getConfigs().entrySet().stream()
|
|
|
+ .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList());
|
|
|
+ return ClusterUtil.toMono(ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCR, listOp)).all(), topicCR.name());
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<String> alterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) {
|
|
|
+ List<ConfigEntry> configEntries = topicFormData.getConfigs().entrySet().stream()
|
|
|
+ .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))).collect(Collectors.toList());
|
|
|
+ Config config = new Config(configEntries);
|
|
|
+ Map<ConfigResource, Config> map = Collections.singletonMap(topicCR, config);
|
|
|
+ return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCR.name());
|
|
|
+
|
|
|
}
|
|
|
|
|
|
private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, Map<String, InternalTopic> internalTopic) {
|