|
@@ -47,15 +47,12 @@ import java.util.Optional;
|
|
|
import java.util.Properties;
|
|
|
import java.util.UUID;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
-import lombok.Setter;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
|
import org.apache.kafka.clients.admin.AdminClient;
|
|
|
-import org.apache.kafka.clients.admin.AdminClientConfig;
|
|
|
import org.apache.kafka.clients.admin.AlterConfigOp;
|
|
|
import org.apache.kafka.clients.admin.Config;
|
|
|
import org.apache.kafka.clients.admin.ConfigEntry;
|
|
@@ -88,7 +85,6 @@ import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
|
|
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
|
|
import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
@@ -104,13 +100,10 @@ public class KafkaService {
|
|
|
private static final ListTopicsOptions LIST_TOPICS_OPTIONS =
|
|
|
new ListTopicsOptions().listInternal(true);
|
|
|
private final ZookeeperService zookeeperService;
|
|
|
- private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
|
|
|
private final JmxClusterUtil jmxClusterUtil;
|
|
|
private final ClustersStorage clustersStorage;
|
|
|
private final DeserializationService deserializationService;
|
|
|
- @Setter // used in tests
|
|
|
- @Value("${kafka.admin-client-timeout}")
|
|
|
- private int clientTimeout;
|
|
|
+ private final AdminClientService adminClientService;
|
|
|
|
|
|
public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) {
|
|
|
final Map<String, InternalTopic> topics =
|
|
@@ -129,7 +122,7 @@ public class KafkaService {
|
|
|
|
|
|
@SneakyThrows
|
|
|
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.flatMap(
|
|
|
ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap(
|
|
|
version ->
|
|
@@ -306,36 +299,17 @@ public class KafkaService {
|
|
|
}
|
|
|
|
|
|
public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicCreation> topicCreation) {
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.flatMap(ac -> createTopic(ac.getAdminClient(), topicCreation));
|
|
|
}
|
|
|
|
|
|
public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.map(ExtendedAdminClient::getAdminClient)
|
|
|
.map(adminClient -> adminClient.deleteTopics(List.of(topicName)))
|
|
|
.then();
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- @SneakyThrows
|
|
|
- public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
|
|
|
- return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
|
|
|
- .switchIfEmpty(createAdminClient(cluster))
|
|
|
- .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));
|
|
|
- }
|
|
|
-
|
|
|
- public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
|
|
|
- return Mono.fromSupplier(() -> {
|
|
|
- Properties properties = new Properties();
|
|
|
- properties.putAll(kafkaCluster.getProperties());
|
|
|
- properties
|
|
|
- .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
|
|
|
- properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
|
|
- return AdminClient.create(properties);
|
|
|
- }).flatMap(ExtendedAdminClient::extendedAdminClient);
|
|
|
- }
|
|
|
-
|
|
|
@SneakyThrows
|
|
|
private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(
|
|
|
AdminClient adminClient, Collection<String> topicNames) {
|
|
@@ -353,45 +327,9 @@ public class KafkaService {
|
|
|
.collect(Collectors.toList()))));
|
|
|
}
|
|
|
|
|
|
- private Mono<Map<String, List<InternalBrokerConfig>>> loadBrokersConfig(
|
|
|
- AdminClient adminClient, List<Integer> brokersIds) {
|
|
|
- List<ConfigResource> resources = brokersIds.stream()
|
|
|
- .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
|
|
|
- .collect(Collectors.toList());
|
|
|
-
|
|
|
- return ClusterUtil.toMono(adminClient.describeConfigs(resources,
|
|
|
- new DescribeConfigsOptions().includeSynonyms(true)).all())
|
|
|
- .map(configs ->
|
|
|
- configs.entrySet().stream().collect(Collectors.toMap(
|
|
|
- c -> c.getKey().name(),
|
|
|
- c -> c.getValue().entries().stream()
|
|
|
- .map(ClusterUtil::mapToInternalBrokerConfig)
|
|
|
- .collect(Collectors.toList()))));
|
|
|
- }
|
|
|
-
|
|
|
- private Mono<List<InternalBrokerConfig>> loadBrokersConfig(
|
|
|
- AdminClient adminClient, Integer brokerId) {
|
|
|
- return loadBrokersConfig(adminClient, Collections.singletonList(brokerId))
|
|
|
- .map(map -> map.values().stream()
|
|
|
- .findFirst()
|
|
|
- .orElseThrow(() -> new IllegalEntityStateException(
|
|
|
- String.format("Config for broker %s not found", brokerId))));
|
|
|
- }
|
|
|
-
|
|
|
- public Mono<List<InternalBrokerConfig>> getBrokerConfigs(KafkaCluster cluster, Integer brokerId) {
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
- .flatMap(adminClient -> {
|
|
|
- if (!cluster.getBrokers().contains(brokerId)) {
|
|
|
- return Mono.error(
|
|
|
- new NotFoundException(String.format("Broker with id %s not found", brokerId)));
|
|
|
- }
|
|
|
- return loadBrokersConfig(adminClient.getAdminClient(), brokerId);
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
|
|
|
KafkaCluster cluster) {
|
|
|
- return getOrCreateAdminClient(cluster).flatMap(ac ->
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster).flatMap(ac ->
|
|
|
ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
|
|
|
.flatMap(s ->
|
|
|
getConsumerGroupsInternal(
|
|
@@ -404,7 +342,7 @@ public class KafkaService {
|
|
|
public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
|
|
|
KafkaCluster cluster, List<String> groupIds) {
|
|
|
|
|
|
- return getOrCreateAdminClient(cluster).flatMap(ac ->
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster).flatMap(ac ->
|
|
|
ClusterUtil.toMono(
|
|
|
ac.getAdminClient().describeConsumerGroups(groupIds).all()
|
|
|
).map(Map::values)
|
|
@@ -446,7 +384,7 @@ public class KafkaService {
|
|
|
|
|
|
public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
|
|
|
String consumerGroupId) {
|
|
|
- return getOrCreateAdminClient(cluster).map(ac ->
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster).map(ac ->
|
|
|
ac.getAdminClient()
|
|
|
.listConsumerGroupOffsets(consumerGroupId)
|
|
|
.partitionsToOffsetAndMetadata()
|
|
@@ -482,7 +420,7 @@ public class KafkaService {
|
|
|
public Mono<InternalTopic> updateTopic(KafkaCluster cluster, String topicName,
|
|
|
TopicUpdate topicUpdate) {
|
|
|
ConfigResource topicCr = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.flatMap(ac -> {
|
|
|
if (ac.getSupportedFeatures()
|
|
|
.contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
|
|
@@ -727,7 +665,8 @@ public class KafkaService {
|
|
|
var records = offsets.entrySet().stream()
|
|
|
.map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
|
- return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
+ .map(ExtendedAdminClient::getAdminClient)
|
|
|
.map(ac -> ac.deleteRecords(records)).then();
|
|
|
}
|
|
|
|
|
@@ -788,7 +727,7 @@ public class KafkaService {
|
|
|
KafkaCluster cluster,
|
|
|
String topicName,
|
|
|
PartitionsIncrease partitionsIncrease) {
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.flatMap(ac -> {
|
|
|
Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount();
|
|
|
Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
|
|
@@ -830,7 +769,7 @@ public class KafkaService {
|
|
|
KafkaCluster cluster,
|
|
|
String topicName,
|
|
|
ReplicationFactorChange replicationFactorChange) {
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.flatMap(ac -> {
|
|
|
Integer actual = cluster.getTopics().get(topicName).getReplicationFactor();
|
|
|
Integer requested = replicationFactorChange.getTotalReplicationFactor();
|
|
@@ -855,7 +794,7 @@ public class KafkaService {
|
|
|
|
|
|
public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(
|
|
|
KafkaCluster cluster, List<Integer> reqBrokers) {
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.map(admin -> {
|
|
|
List<Integer> brokers = new ArrayList<>(cluster.getBrokers());
|
|
|
if (reqBrokers != null && !reqBrokers.isEmpty()) {
|
|
@@ -971,7 +910,7 @@ public class KafkaService {
|
|
|
|
|
|
public Mono<Void> updateBrokerLogDir(KafkaCluster cluster, Integer broker,
|
|
|
BrokerLogdirUpdate brokerLogDir) {
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker));
|
|
|
}
|
|
|
|
|
@@ -996,7 +935,7 @@ public class KafkaService {
|
|
|
Integer broker,
|
|
|
String name,
|
|
|
String value) {
|
|
|
- return getOrCreateAdminClient(cluster)
|
|
|
+ return adminClientService.getOrCreateAdminClient(cluster)
|
|
|
.flatMap(ac -> updateBrokerConfigByName(ac, broker, name, value));
|
|
|
}
|
|
|
|