diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java index af2603ed10..9565ff9990 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java @@ -108,7 +108,7 @@ public class ConsumerGroupsController implements ConsumerGroupsApi { new ValidationException("Unknown resetType " + reset.getResetType()) ); } - }).map(o -> ResponseEntity.ok().build()); + }).thenReturn(ResponseEntity.ok().build()); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ExtendedAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ExtendedAdminClient.java deleted file mode 100644 index 63507baafa..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ExtendedAdminClient.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.provectus.kafka.ui.model; - -import com.provectus.kafka.ui.util.ClusterUtil; -import java.util.Set; -import lombok.Data; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.admin.AdminClient; -import reactor.core.publisher.Mono; - -@Data -@RequiredArgsConstructor -public class ExtendedAdminClient { - - private final AdminClient adminClient; - private final Set supportedFeatures; - - public static Mono extendedAdminClient(AdminClient adminClient) { - - return ClusterUtil.getSupportedFeatures(adminClient) - .map(s -> new ExtendedAdminClient(adminClient, s)); - } - - public enum SupportedFeature { - INCREMENTAL_ALTER_CONFIGS, - ALTER_CONFIGS - } -} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientService.java index fc611daed0..074f2ce88f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientService.java @@ -1,23 +1,10 @@ package com.provectus.kafka.ui.service; -import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.KafkaCluster; import reactor.core.publisher.Mono; public interface AdminClientService { - /** - * Get ExtendedAdminClient from cache if exists or create new if not. - * - * @param cluster - cluster - * @return The Mono of ExtendedAdminClient - */ - Mono getOrCreateAdminClient(KafkaCluster cluster); - /** - * Create new ExtendedAdminClient. - * - * @param cluster - cluster - * @return The Mono of ExtendedAdminClient - */ - Mono createAdminClient(KafkaCluster cluster); + Mono get(KafkaCluster cluster); + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java index 822436f5cc..89fa06484e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientServiceImpl.java @@ -1,7 +1,7 @@ package com.provectus.kafka.ui.service; -import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.KafkaCluster; +import java.io.Closeable; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -17,21 +17,20 @@ import reactor.core.publisher.Mono; @Service @RequiredArgsConstructor @Log4j2 -public class AdminClientServiceImpl implements AdminClientService { - private final Map adminClientCache = new ConcurrentHashMap<>(); +public class AdminClientServiceImpl implements AdminClientService, Closeable { + private final Map adminClientCache = new ConcurrentHashMap<>(); @Setter // used in tests @Value("${kafka.admin-client-timeout}") private int clientTimeout; @Override - public Mono getOrCreateAdminClient(KafkaCluster cluster) { + public Mono get(KafkaCluster cluster) { return Mono.justOrEmpty(adminClientCache.get(cluster.getName())) .switchIfEmpty(createAdminClient(cluster)) .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e)); } - @Override - public Mono createAdminClient(KafkaCluster cluster) { + private Mono createAdminClient(KafkaCluster cluster) { return Mono.fromSupplier(() -> { Properties properties = new Properties(); properties.putAll(cluster.getProperties()); @@ -39,6 +38,11 @@ public class AdminClientServiceImpl implements AdminClientService { .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); return AdminClient.create(properties); - }).flatMap(ExtendedAdminClient::extendedAdminClient); + }).flatMap(ReactiveAdminClient::create); + } + + @Override + public void close() { + adminClientCache.values().forEach(ReactiveAdminClient::close); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerServiceImpl.java index 4e25be8d63..1d6c3a3fcb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerServiceImpl.java @@ -3,22 +3,17 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.exception.IllegalEntityStateException; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.model.BrokerDTO; -import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.util.ClusterUtil; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.DescribeConfigsOptions; import org.apache.kafka.common.Node; -import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -32,23 +27,8 @@ public class BrokerServiceImpl implements BrokerService { private Mono>> loadBrokersConfig( KafkaCluster cluster, List brokersIds) { - List resources = brokersIds.stream() - .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId))) - .collect(Collectors.toList()); - - return adminClientService.getOrCreateAdminClient(cluster) - .map(ExtendedAdminClient::getAdminClient) - .flatMap(adminClient -> - ClusterUtil.toMono(adminClient.describeConfigs(resources, - new DescribeConfigsOptions().includeSynonyms(true)).all()) - .map(config -> config.entrySet() - .stream() - .collect(Collectors.toMap( - c -> Integer.valueOf(c.getKey().name()), - c -> List.copyOf(c.getValue().entries()) - )) - )) - .onErrorResume(UnsupportedVersionException.class, (e) -> Mono.just(new HashMap<>())); + return adminClientService.get(cluster) + .flatMap(ac -> ac.loadBrokersConfig(brokersIds)); } private Mono> loadBrokersConfig( @@ -87,22 +67,23 @@ public class BrokerServiceImpl implements BrokerService { @Override public Flux getBrokers(KafkaCluster cluster) { return adminClientService - .getOrCreateAdminClient(cluster) - .flatMap(client -> ClusterUtil.toMono(client.getAdminClient().describeCluster().nodes()) - .map(n -> n.stream().map(node -> { + .get(cluster) + .flatMap(ReactiveAdminClient::describeCluster) + .map(description -> description.getNodes().stream() + .map(node -> { BrokerDTO broker = new BrokerDTO(); broker.setId(node.id()); broker.setHost(node.host()); return broker; - }).collect(Collectors.toList()))) + }).collect(Collectors.toList())) .flatMapMany(Flux::fromIterable); } @Override public Mono getController(KafkaCluster cluster) { return adminClientService - .getOrCreateAdminClient(cluster) - .map(ExtendedAdminClient::getAdminClient) - .flatMap(adminClient -> ClusterUtil.toMono(adminClient.describeCluster().controller())); + .get(cluster) + .flatMap(ReactiveAdminClient::describeCluster) + .map(ReactiveAdminClient.ClusterDescription::getController); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index cdc9d0d320..de4e5928b5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -19,7 +19,6 @@ import com.provectus.kafka.ui.model.ConsumerGroupDTO; import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; -import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.Feature; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.KafkaCluster; @@ -310,13 +309,8 @@ public class ClusterService { public Mono deleteConsumerGroupById(String clusterName, String groupId) { return clustersStorage.getClusterByName(clusterName) - .map(cluster -> adminClientService.getOrCreateAdminClient(cluster) - .map(ExtendedAdminClient::getAdminClient) - .flatMap(adminClient -> - ClusterUtil.toMono( - adminClient.deleteConsumerGroups(List.of(groupId)).all() - ) - ) + .map(cluster -> adminClientService.get(cluster) + .flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId))) .onErrorResume(this::reThrowCustomException) ) .orElse(Mono.empty()); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index 1aec152c82..5b9cee3c85 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -8,7 +8,6 @@ import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO; import com.provectus.kafka.ui.model.CleanupPolicy; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; -import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics; @@ -31,7 +30,6 @@ import com.provectus.kafka.ui.util.ClusterUtil; import com.provectus.kafka.ui.util.JmxClusterUtil; import com.provectus.kafka.ui.util.JmxMetricsName; import com.provectus.kafka.ui.util.JmxMetricsValueName; -import com.provectus.kafka.ui.util.MapUtil; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collection; @@ -46,24 +44,13 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AlterConfigOp; -import org.apache.kafka.clients.admin.Config; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.ConsumerGroupListing; -import org.apache.kafka.clients.admin.DescribeConfigsOptions; -import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -71,7 +58,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; -import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.LogDirNotFoundException; import org.apache.kafka.common.errors.TimeoutException; @@ -95,8 +81,6 @@ import reactor.util.function.Tuples; @Log4j2 public class KafkaService { - private static final ListTopicsOptions LIST_TOPICS_OPTIONS = - new ListTopicsOptions().listInternal(true); private final ZookeeperService zookeeperService; private final JmxClusterUtil jmxClusterUtil; private final ClustersStorage clustersStorage; @@ -122,18 +106,17 @@ public class KafkaService { @SneakyThrows public Mono getUpdatedCluster(KafkaCluster cluster) { - return adminClientService.getOrCreateAdminClient(cluster) + return adminClientService.get(cluster) .flatMap( - ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap( + ac -> ac.getClusterVersion().flatMap( version -> - getClusterMetrics(ac.getAdminClient()) - .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient())) + getClusterMetrics(ac) + .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac)) .flatMap(clusterMetrics -> - getTopicsData(ac.getAdminClient()).flatMap(it -> { + getTopicsData(ac).flatMap(it -> { if (cluster.getDisableLogDirsCollection() == null || !cluster.getDisableLogDirsCollection()) { - return updateSegmentMetrics( - ac.getAdminClient(), clusterMetrics, it + return updateSegmentMetrics(ac, clusterMetrics, it ); } else { return emptySegmentMetrics(clusterMetrics, it); @@ -250,16 +233,16 @@ public class KafkaService { } @SneakyThrows - private Mono> getTopicsData(AdminClient adminClient) { - return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names()) - .flatMap(topics -> getTopicsData(adminClient, topics).collectList()); + private Mono> getTopicsData(ReactiveAdminClient client) { + return client.listTopics(true) + .flatMap(topics -> getTopicsData(client, topics).collectList()); } - private Flux getTopicsData(AdminClient adminClient, Collection topics) { + private Flux getTopicsData(ReactiveAdminClient client, Collection topics) { final Mono>> configsMono = - loadTopicsConfig(adminClient, topics); + loadTopicsConfig(client, topics); - return ClusterUtil.toMono(adminClient.describeTopics(topics).all()) + return client.describeTopics(topics) .map(m -> m.values().stream() .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())) .flatMap(internalTopics -> configsMono @@ -268,104 +251,72 @@ public class KafkaService { } - private Mono getClusterMetrics(AdminClient client) { - return ClusterUtil.toMono(client.describeCluster().nodes()) - .flatMap(brokers -> - ClusterUtil.toMono(client.describeCluster().controller()).map( - c -> { - InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = - InternalClusterMetrics.builder(); - metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0); - return metricsBuilder.build(); - } - ) - ); + private Mono getClusterMetrics(ReactiveAdminClient client) { + return client.describeCluster().map(desc -> + InternalClusterMetrics.builder() + .brokerCount(desc.getNodes().size()) + .activeControllers(desc.getController() != null ? 1 : 0) + .build() + ); } @SneakyThrows - private Mono createTopic(AdminClient adminClient, NewTopic newTopic) { - return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(), - newTopic.name()); - } - - @SneakyThrows - public Mono createTopic(AdminClient adminClient, + public Mono createTopic(ReactiveAdminClient adminClient, Mono topicCreation) { - return topicCreation.flatMap( - topicData -> { - NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), - topicData.getReplicationFactor().shortValue()); - newTopic.configs(topicData.getConfigs()); - return createTopic(adminClient, newTopic).map(v -> topicData); - }) + return topicCreation.flatMap(topicData -> + adminClient.createTopic( + topicData.getName(), + topicData.getPartitions(), + topicData.getReplicationFactor().shortValue(), + topicData.getConfigs() + ).thenReturn(topicData) + ) .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage()))) - .flatMap( - topicData -> - getTopicsData(adminClient, Collections.singleton(topicData.getName())) - .next() - ).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic"))); + .flatMap(topicData -> getUpdatedTopic(adminClient, topicData.getName())) + .switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic"))); } public Mono createTopic( KafkaCluster cluster, Mono topicCreation) { - return adminClientService.getOrCreateAdminClient(cluster) - .flatMap(ac -> createTopic(ac.getAdminClient(), topicCreation)); + return adminClientService.get(cluster).flatMap(ac -> createTopic(ac, topicCreation)); } public Mono deleteTopic(KafkaCluster cluster, String topicName) { - return adminClientService.getOrCreateAdminClient(cluster) - .map(ExtendedAdminClient::getAdminClient) - .flatMap(adminClient -> - ClusterUtil.toMono(adminClient.deleteTopics(List.of(topicName)).all()) - ); + return adminClientService.get(cluster).flatMap(c -> c.deleteTopic(topicName)); } @SneakyThrows private Mono>> loadTopicsConfig( - AdminClient adminClient, Collection topicNames) { - List resources = topicNames.stream() - .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) - .collect(Collectors.toList()); - - return ClusterUtil.toMono(adminClient.describeConfigs(resources, - new DescribeConfigsOptions().includeSynonyms(true)).all()) + ReactiveAdminClient client, Collection topicNames) { + return client.getTopicsConfig(topicNames) .map(configs -> configs.entrySet().stream().collect(Collectors.toMap( - c -> c.getKey().name(), - c -> c.getValue().entries().stream() + Map.Entry::getKey, + c -> c.getValue().stream() .map(ClusterUtil::mapToInternalTopicConfig) .collect(Collectors.toList())))); } - public Mono> getConsumerGroupsInternal( - KafkaCluster cluster) { - return adminClientService.getOrCreateAdminClient(cluster).flatMap(ac -> - ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()) - .flatMap(s -> - getConsumerGroupsInternal( - cluster, - s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())) - ) - ); + public Mono> getConsumerGroupsInternal(KafkaCluster cluster) { + return adminClientService.get(cluster).flatMap(ac -> + ac.listConsumerGroups() + .flatMap(groupIds -> getConsumerGroupsInternal(cluster, groupIds))); } - public Mono> getConsumerGroupsInternal( - KafkaCluster cluster, List groupIds) { - - return adminClientService.getOrCreateAdminClient(cluster).flatMap(ac -> - ClusterUtil.toMono( - ac.getAdminClient().describeConsumerGroups(groupIds).all() - ).map(Map::values) - ).flatMap(descriptions -> - Flux.fromIterable(descriptions) - .parallel() - .flatMap(d -> - groupMetadata(cluster, d.groupId()) - .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets)) - ) - .sequential() - .collectList() - ); + public Mono> getConsumerGroupsInternal(KafkaCluster cluster, + List groupIds) { + return adminClientService.get(cluster).flatMap(ac -> + ac.describeConsumerGroups(groupIds) + .map(Map::values) + .flatMap(descriptions -> + Flux.fromIterable(descriptions) + .parallel() + .flatMap(d -> + ac.listConsumerGroupOffsets(d.groupId()) + .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets)) + ) + .sequential() + .collectList())); } public Mono> getConsumerGroups( @@ -392,15 +343,6 @@ public class KafkaService { ); } - public Mono> groupMetadata(KafkaCluster cluster, - String consumerGroupId) { - return adminClientService.getOrCreateAdminClient(cluster).map(ac -> - ac.getAdminClient() - .listConsumerGroupOffsets(consumerGroupId) - .partitionsToOffsetAndMetadata() - ).flatMap(ClusterUtil::toMono).map(MapUtil::removeNullValues); - } - public Map topicPartitionsEndOffsets( KafkaCluster cluster, Collection topicPartitions) { try (KafkaConsumer consumer = createConsumer(cluster)) { @@ -427,47 +369,17 @@ public class KafkaService { } @SneakyThrows - public Mono updateTopic(KafkaCluster cluster, String topicName, + public Mono updateTopic(KafkaCluster cluster, + String topicName, TopicUpdateDTO topicUpdate) { - ConfigResource topicCr = new ConfigResource(ConfigResource.Type.TOPIC, topicName); - return adminClientService.getOrCreateAdminClient(cluster) - .flatMap(ac -> { - if (ac.getSupportedFeatures() - .contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) { - return incrementalAlterConfig(topicUpdate, topicCr, ac) - .flatMap(c -> getUpdatedTopic(ac, topicName)); - } else { - return alterConfig(topicUpdate, topicCr, ac) - .flatMap(c -> getUpdatedTopic(ac, topicName)); - } - }); + return adminClientService.get(cluster) + .flatMap(ac -> + ac.updateTopicConfig(topicName, + topicUpdate.getConfigs()).then(getUpdatedTopic(ac, topicName))); } - private Mono getUpdatedTopic(ExtendedAdminClient ac, String topicName) { - return getTopicsData(ac.getAdminClient()) - .map(s -> s.stream() - .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow()); - } - - private Mono incrementalAlterConfig(TopicUpdateDTO topicUpdate, ConfigResource topicCr, - ExtendedAdminClient ac) { - List listOp = topicUpdate.getConfigs().entrySet().stream() - .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), - AlterConfigOp.OpType.SET))).collect(Collectors.toList()); - return ClusterUtil.toMono( - ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCr, listOp)) - .all(), topicCr.name()); - } - - @SuppressWarnings("deprecation") - private Mono alterConfig(TopicUpdateDTO topicUpdate, ConfigResource topicCr, - ExtendedAdminClient ac) { - List configEntries = topicUpdate.getConfigs().entrySet().stream() - .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))) - .collect(Collectors.toList()); - Config config = new Config(configEntries); - Map map = Collections.singletonMap(topicCr, config); - return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCr.name()); + private Mono getUpdatedTopic(ReactiveAdminClient ac, String topicName) { + return getTopicsData(ac, List.of(topicName)).next(); } private InternalTopic mergeWithStats(InternalTopic topic, @@ -520,18 +432,12 @@ public class KafkaService { ); } - private Mono updateSegmentMetrics(AdminClient ac, + private Mono updateSegmentMetrics(ReactiveAdminClient ac, InternalClusterMetrics clusterMetrics, List internalTopics) { - List names = - internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList()); - return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic -> - ClusterUtil.toMono(ac.describeCluster().nodes()).flatMap(nodes -> - - ClusterUtil.toMono( - ac.describeLogDirs( - nodes.stream().map(Node::id).collect(Collectors.toList())).all() - ).map(log -> { + return ac.describeCluster().flatMap( + clusterDescription -> + ac.describeLogDirs().map(log -> { final List> topicPartitions = log.entrySet().stream().flatMap(b -> b.getValue().entrySet().stream().flatMap(topicMap -> @@ -598,7 +504,6 @@ public class KafkaService { ) .internalTopicWithSegmentSize(resultTopics).build(); }) - ) ); } @@ -612,15 +517,15 @@ public class KafkaService { } private Mono fillJmxMetrics(InternalClusterMetrics internalClusterMetrics, - String clusterName, AdminClient ac) { + String clusterName, ReactiveAdminClient ac) { return fillBrokerMetrics(internalClusterMetrics, clusterName, ac) .map(this::calculateClusterMetrics); } private Mono fillBrokerMetrics( - InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) { - return ClusterUtil.toMono(ac.describeCluster().nodes()) - .flatMapIterable(nodes -> nodes) + InternalClusterMetrics internalClusterMetrics, String clusterName, ReactiveAdminClient ac) { + return ac.describeCluster() + .flatMapIterable(desc -> desc.getNodes()) .map(broker -> Map.of(broker.id(), InternalBrokerMetrics.builder() .metrics(getJmxMetric(clusterName, broker)).build()) @@ -699,14 +604,7 @@ public class KafkaService { } public Mono deleteTopicMessages(KafkaCluster cluster, Map offsets) { - var records = offsets.entrySet().stream() - .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()))) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - return adminClientService.getOrCreateAdminClient(cluster) - .map(ExtendedAdminClient::getAdminClient) - .flatMap(ac -> - ClusterUtil.toMono(ac.deleteRecords(records).all()) - ); + return adminClientService.get(cluster).flatMap(ac -> ac.deleteRecords(offsets)); } public Mono sendMessage(KafkaCluster cluster, String topic, @@ -754,19 +652,11 @@ public class KafkaService { return headers; } - private Mono increaseTopicPartitions(AdminClient adminClient, - String topicName, - Map newPartitionsMap - ) { - return ClusterUtil.toMono(adminClient.createPartitions(newPartitionsMap).all(), topicName) - .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next()); - } - public Mono increaseTopicPartitions( KafkaCluster cluster, String topicName, PartitionsIncreaseDTO partitionsIncrease) { - return adminClientService.getOrCreateAdminClient(cluster) + return adminClientService.get(cluster) .flatMap(ac -> { Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount(); Integer requestedCount = partitionsIncrease.getTotalPartitionsCount(); @@ -787,18 +677,18 @@ public class KafkaService { topicName, NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount()) ); - return increaseTopicPartitions(ac.getAdminClient(), topicName, newPartitionsMap); + return ac.createPartitions(newPartitionsMap) + .then(getUpdatedTopic(ac, topicName)); }); } private Mono changeReplicationFactor( - AdminClient adminClient, + ReactiveAdminClient adminClient, String topicName, Map> reassignments ) { - return ClusterUtil.toMono(adminClient - .alterPartitionReassignments(reassignments).all(), topicName) - .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next()); + return adminClient.alterPartitionReassignments(reassignments) + .then(getUpdatedTopic(adminClient, topicName)); } /** @@ -808,7 +698,7 @@ public class KafkaService { KafkaCluster cluster, String topicName, ReplicationFactorChangeDTO replicationFactorChange) { - return adminClientService.getOrCreateAdminClient(cluster) + return adminClientService.get(cluster) .flatMap(ac -> { Integer actual = cluster.getTopics().get(topicName).getReplicationFactor(); Integer requested = replicationFactorChange.getTotalReplicationFactor(); @@ -825,7 +715,7 @@ public class KafkaService { String.format("Requested replication factor %s more than brokers count %s.", requested, brokersCount))); } - return changeReplicationFactor(ac.getAdminClient(), topicName, + return changeReplicationFactor(ac, topicName, getPartitionsReassignments(cluster, topicName, replicationFactorChange)); }); @@ -833,15 +723,14 @@ public class KafkaService { public Mono>> getClusterLogDirs( KafkaCluster cluster, List reqBrokers) { - return adminClientService.getOrCreateAdminClient(cluster) - .map(admin -> { + return adminClientService.get(cluster) + .flatMap(admin -> { List brokers = new ArrayList<>(cluster.getBrokers()); if (reqBrokers != null && !reqBrokers.isEmpty()) { brokers.retainAll(reqBrokers); } - return admin.getAdminClient().describeLogDirs(brokers); + return admin.describeLogDirs(brokers); }) - .flatMap(result -> ClusterUtil.toMono(result.all())) .onErrorResume(TimeoutException.class, (TimeoutException e) -> { log.error("Error during fetching log dirs", e); return Mono.just(new HashMap<>()); @@ -949,20 +838,18 @@ public class KafkaService { public Mono updateBrokerLogDir(KafkaCluster cluster, Integer broker, BrokerLogdirUpdateDTO brokerLogDir) { - return adminClientService.getOrCreateAdminClient(cluster) + return adminClientService.get(cluster) .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker)); } - private Mono updateBrokerLogDir(ExtendedAdminClient adminMono, + private Mono updateBrokerLogDir(ReactiveAdminClient admin, BrokerLogdirUpdateDTO b, Integer broker) { Map req = Map.of( new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker), b.getLogDir()); - return Mono.just(adminMono) - .map(admin -> admin.getAdminClient().alterReplicaLogDirs(req)) - .flatMap(result -> ClusterUtil.toMono(result.all())) + return admin.alterReplicaLogDirs(req) .onErrorResume(UnknownTopicOrPartitionException.class, e -> Mono.error(new TopicOrPartitionNotFoundException())) .onErrorResume(LogDirNotFoundException.class, @@ -974,20 +861,8 @@ public class KafkaService { Integer broker, String name, String value) { - return adminClientService.getOrCreateAdminClient(cluster) - .flatMap(ac -> updateBrokerConfigByName(ac, broker, name, value)); - } - - private Mono updateBrokerConfigByName(ExtendedAdminClient admin, - Integer broker, - String name, - String value) { - ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(broker)); - AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET); - - return Mono.just(admin) - .map(a -> a.getAdminClient().incrementalAlterConfigs(Map.of(cr, List.of(op)))) - .flatMap(result -> ClusterUtil.toMono(result.all())) + return adminClientService.get(cluster) + .flatMap(ac -> ac.updateBrokerConfigByName(broker, name, value)) .onErrorResume(InvalidRequestException.class, e -> Mono.error(new InvalidRequestApiException(e.getMessage()))) .doOnError(log::error); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java index 6420e6d7ec..190b13add6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/OffsetsResetService.java @@ -1,28 +1,23 @@ package com.provectus.kafka.ui.service; -import static com.google.common.base.Preconditions.checkArgument; -import static com.provectus.kafka.ui.util.ClusterUtil.toMono; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; import static org.apache.kafka.common.ConsumerGroupState.DEAD; import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import com.google.common.base.Preconditions; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaCluster; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; -import org.apache.kafka.clients.admin.ConsumerGroupDescription; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.common.TopicPartition; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; @@ -37,71 +32,74 @@ import reactor.core.publisher.Mono; @RequiredArgsConstructor public class OffsetsResetService { - private final KafkaService kafkaService; private final AdminClientService adminClientService; - public Mono> resetToEarliest( + public Mono resetToEarliest( KafkaCluster cluster, String group, String topic, Collection partitions) { return checkGroupCondition(cluster, group) - .flatMap(g -> { - try (var consumer = getConsumer(cluster, group)) { - var targetPartitions = getTargetPartitions(consumer, topic, partitions); - var offsets = consumer.beginningOffsets(targetPartitions); - return commitOffsets(consumer, offsets); - } - }); + .flatMap(ac -> + offsets(ac, topic, partitions, OffsetSpec.earliest()) + .flatMap(offsets -> resetOffsets(ac, group, offsets))); } - public Mono> resetToLatest( - KafkaCluster cluster, String group, String topic, Collection partitions) { - return checkGroupCondition(cluster, group).flatMap( - g -> { - try (var consumer = getConsumer(cluster, group)) { - var targetPartitions = getTargetPartitions(consumer, topic, partitions); - var offsets = consumer.endOffsets(targetPartitions); - return commitOffsets(consumer, offsets); - } - } + private Mono> offsets(ReactiveAdminClient client, + String topic, + @Nullable Collection partitions, + OffsetSpec spec) { + if (partitions == null) { + return client.listOffsets(topic, spec); + } + return client.listOffsets( + partitions.stream().map(idx -> new TopicPartition(topic, idx)).collect(toSet()), + spec ); } - public Mono> resetToTimestamp( + public Mono resetToLatest( + KafkaCluster cluster, String group, String topic, Collection partitions) { + return checkGroupCondition(cluster, group) + .flatMap(ac -> + offsets(ac, topic, partitions, OffsetSpec.latest()) + .flatMap(offsets -> resetOffsets(ac, group, offsets))); + } + + public Mono resetToTimestamp( KafkaCluster cluster, String group, String topic, Collection partitions, long targetTimestamp) { - return checkGroupCondition(cluster, group).flatMap( - g -> { - try (var consumer = getConsumer(cluster, group)) { - var targetPartitions = getTargetPartitions(consumer, topic, partitions); - var offsets = offsetsByTimestamp(consumer, targetPartitions, targetTimestamp); - return commitOffsets(consumer, offsets); - } - } - ); + return checkGroupCondition(cluster, group) + .flatMap(ac -> + offsets(ac, topic, partitions, OffsetSpec.forTimestamp(targetTimestamp)) + .flatMap( + foundOffsets -> offsets(ac, topic, partitions, OffsetSpec.latest()) + .map(endOffsets -> editTsOffsets(foundOffsets, endOffsets)) + ) + .flatMap(offsets -> resetOffsets(ac, group, offsets)) + ); } - public Mono> resetToOffsets( + public Mono resetToOffsets( KafkaCluster cluster, String group, String topic, Map targetOffsets) { + Preconditions.checkNotNull(targetOffsets); + var partitionOffsets = targetOffsets.entrySet().stream() + .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue)); return checkGroupCondition(cluster, group).flatMap( - g -> { - try (var consumer = getConsumer(cluster, group)) { - var offsets = targetOffsets.entrySet().stream() - .collect(toMap(e -> new TopicPartition(topic, e.getKey()), Map.Entry::getValue)); - offsets = editOffsetsIfNeeded(consumer, offsets); - return commitOffsets(consumer, offsets); - } - } + ac -> + ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.earliest()) + .flatMap(earliest -> + ac.listOffsets(partitionOffsets.keySet(), OffsetSpec.latest()) + .map(latest -> editOffsetsBounds(partitionOffsets, earliest, latest)) + .flatMap(offsetsToCommit -> resetOffsets(ac, group, offsetsToCommit))) ); } - private Mono checkGroupCondition(KafkaCluster cluster, String groupId) { - return adminClientService.getOrCreateAdminClient(cluster) + private Mono checkGroupCondition(KafkaCluster cluster, String groupId) { + return adminClientService.get(cluster) .flatMap(ac -> // we need to call listConsumerGroups() to check group existence, because // describeConsumerGroups() will return consumer group even if it doesn't exist - toMono(ac.getAdminClient().listConsumerGroups().all()) - .filter(cgs -> cgs.stream().anyMatch(g -> g.groupId().equals(groupId))) - .flatMap(cgs -> toMono( - ac.getAdminClient().describeConsumerGroups(List.of(groupId)).all())) + ac.listConsumerGroups() + .filter(cgs -> cgs.stream().anyMatch(g -> g.equals(groupId))) + .flatMap(cgs -> ac.describeConsumerGroups(List.of(groupId))) .filter(cgs -> cgs.containsKey(groupId)) .map(cgs -> cgs.get(groupId)) .flatMap(cg -> { @@ -116,47 +114,18 @@ public class OffsetsResetService { ) ); } - return Mono.just(cg); + return Mono.just(ac); }) .switchIfEmpty(Mono.error(new NotFoundException("Consumer group not found"))) ); } - private Map offsetsByTimestamp(Consumer consumer, - Set partitions, - long timestamp) { - Map timestampedOffsets = consumer - .offsetsForTimes(partitions.stream().collect(toMap(p -> p, p -> timestamp))); - - var foundOffsets = timestampedOffsets.entrySet().stream() - .filter(e -> e.getValue() != null) - .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())); - + private Map editTsOffsets(Map foundTsOffsets, + Map endOffsets) { // for partitions where we didnt find offset by timestamp, we use end offsets - Set endOffsets = new HashSet<>(partitions); - endOffsets.removeAll(foundOffsets.keySet()); - foundOffsets.putAll(consumer.endOffsets(endOffsets)); - - return foundOffsets; - } - - private Set getTargetPartitions(Consumer consumer, String topic, - Collection partitions) { - var allPartitions = allTopicPartitions(consumer, topic); - if (partitions == null || partitions.isEmpty()) { - return allPartitions; - } else { - return partitions.stream() - .map(idx -> new TopicPartition(topic, idx)) - .peek(tp -> checkArgument(allPartitions.contains(tp), "Invalid partition %s", tp)) - .collect(toSet()); - } - } - - private Set allTopicPartitions(Consumer consumer, String topic) { - return consumer.partitionsFor(topic).stream() - .map(info -> new TopicPartition(topic, info.partition())) - .collect(toSet()); + Map result = new HashMap<>(endOffsets); + result.putAll(foundTsOffsets); + return result; } /** @@ -164,10 +133,9 @@ public class OffsetsResetService { * fail we reset offset to either earliest or latest offsets (To follow logic from * kafka.admin.ConsumerGroupCommand.scala) */ - private Map editOffsetsIfNeeded(Consumer consumer, - Map offsetsToCheck) { - var earliestOffsets = consumer.beginningOffsets(offsetsToCheck.keySet()); - var latestOffsets = consumer.endOffsets(offsetsToCheck.keySet()); + private Map editOffsetsBounds(Map offsetsToCheck, + Map earliestOffsets, + Map latestOffsets) { var result = new HashMap(); offsetsToCheck.forEach((tp, offset) -> { if (earliestOffsets.get(tp) > offset) { @@ -184,17 +152,10 @@ public class OffsetsResetService { return result; } - private Mono> commitOffsets( - Consumer consumer, Map offsets - ) { - var toCommit = offsets.entrySet().stream() - .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - consumer.commitSync(toCommit); - return Mono.just(toCommit); - } - - private Consumer getConsumer(KafkaCluster cluster, String groupId) { - return kafkaService.createConsumer(cluster, Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId)); + private Mono resetOffsets(ReactiveAdminClient adminClient, + String groupId, + Map offsets) { + return adminClient.alterConsumerGroupOffsets(groupId, offsets); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java new file mode 100644 index 0000000000..644da58cd2 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -0,0 +1,311 @@ +package com.provectus.kafka.ui.service; + +import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +import com.provectus.kafka.ui.util.MapUtil; +import com.provectus.kafka.ui.util.NumberUtil; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import lombok.extern.log4j.Log4j2; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.clients.admin.DescribeConfigsOptions; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import reactor.core.publisher.Mono; + + +@Log4j2 +@RequiredArgsConstructor +public class ReactiveAdminClient implements Closeable { + + private enum SupportedFeature { + INCREMENTAL_ALTER_CONFIGS, + ALTER_CONFIGS + } + + @Value + public static class ClusterDescription { + Node controller; + String clusterId; + Collection nodes; + Set authorizedOperations; + } + + public static Mono create(AdminClient adminClient) { + return getClusterVersionImpl(adminClient) + .map(ver -> + new ReactiveAdminClient( + adminClient, + Set.of(getSupportedUpdateFeatureForVersion(ver)))); + } + + private static SupportedFeature getSupportedUpdateFeatureForVersion(String versionStr) { + float version = NumberUtil.parserClusterVersion(versionStr); + return version <= 2.3f + ? SupportedFeature.ALTER_CONFIGS + : SupportedFeature.INCREMENTAL_ALTER_CONFIGS; + } + + //TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here + private static Mono toMono(KafkaFuture future) { + return Mono.create(sink -> future.whenComplete((res, ex) -> { + if (ex != null) { + sink.error(ex); + } else { + sink.success(res); + } + })); + } + + //--------------------------------------------------------------------------------- + + private final AdminClient client; + private final Set features; + + public Mono> listTopics(boolean listInternal) { + return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names()); + } + + public Mono deleteTopic(String topicName) { + return toMono(client.deleteTopics(List.of(topicName)).all()); + } + + public Mono>> getTopicsConfig(Collection topicNames) { + List resources = topicNames.stream() + .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) + .collect(toList()); + + return toMono( + client.describeConfigs( + resources, + new DescribeConfigsOptions().includeSynonyms(true) + ).all()) + .map(config -> config.entrySet().stream() + .collect(toMap( + c -> c.getKey().name(), + c -> new ArrayList<>(c.getValue().entries())))); + } + + public Mono>> loadBrokersConfig(List brokerIds) { + List resources = brokerIds.stream() + .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId))) + .collect(toList()); + return toMono(client.describeConfigs(resources).all()) + .map(config -> config.entrySet().stream() + .collect(toMap( + c -> Integer.valueOf(c.getKey().name()), + c -> new ArrayList<>(c.getValue().entries())))); + } + + public Mono> describeTopics(Collection topics) { + return toMono(client.describeTopics(topics).all()); + } + + public Mono>> describeLogDirs() { + return describeCluster() + .map(d -> d.getNodes().stream().map(Node::id).collect(toList())) + .flatMap(this::describeLogDirs); + } + + public Mono>> describeLogDirs( + Collection brokerIds) { + return toMono(client.describeLogDirs(brokerIds).all()); + } + + public Mono describeCluster() { + var r = client.describeCluster(new DescribeClusterOptions().includeAuthorizedOperations(true)); + var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations()); + return Mono.create(sink -> all.whenComplete((res, ex) -> { + if (ex != null) { + sink.error(ex); + } else { + try { + sink.success( + new ClusterDescription( + getUninterruptibly(r.controller()), + getUninterruptibly(r.clusterId()), + getUninterruptibly(r.nodes()), + getUninterruptibly(r.authorizedOperations()) + ) + ); + } catch (ExecutionException e) { + // can't be here, because all futures already completed + } + } + })); + } + + private static Mono getClusterVersionImpl(AdminClient client) { + return toMono(client.describeCluster().controller()).flatMap(controller -> + toMono(client.describeConfigs( + List.of(new ConfigResource( + ConfigResource.Type.BROKER, String.valueOf(controller.id())))) + .all() + .thenApply(configs -> + configs.values().stream() + .map(Config::entries) + .flatMap(Collection::stream) + .filter(entry -> entry.name().contains("inter.broker.protocol.version")) + .findFirst().map(ConfigEntry::value) + .orElse("1.0-UNKNOWN") + ))); + } + + public Mono getClusterVersion() { + return getClusterVersionImpl(client); + } + + public Mono deleteConsumerGroups(Collection groupIds) { + return toMono(client.deleteConsumerGroups(groupIds).all()); + } + + public Mono createTopic(String name, + int numPartitions, + short replicationFactor, + Map configs) { + return toMono(client.createTopics( + List.of(new NewTopic(name, numPartitions, replicationFactor).configs(configs))).all()); + } + + public Mono alterPartitionReassignments( + Map> reassignments) { + return toMono(client.alterPartitionReassignments(reassignments).all()); + } + + public Mono createPartitions(Map newPartitionsMap) { + return toMono(client.createPartitions(newPartitionsMap).all()); + } + + public Mono updateTopicConfig(String topicName, Map configs) { + if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) { + return incrementalAlterConfig(topicName, configs); + } else { + return alterConfig(topicName, configs); + } + } + + public Mono> listConsumerGroups() { + return toMono(client.listConsumerGroups().all()) + .map(lst -> lst.stream().map(ConsumerGroupListing::groupId).collect(toList())); + } + + public Mono> describeConsumerGroups(List groupIds) { + return toMono(client.describeConsumerGroups(groupIds).all()); + } + + public Mono> listConsumerGroupOffsets(String groupId) { + return toMono(client.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()) + .map(MapUtil::removeNullValues); + } + + public Mono alterConsumerGroupOffsets(String groupId, Map offsets) { + return toMono(client.alterConsumerGroupOffsets( + groupId, + offsets.entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())))) + .all()); + } + + public Mono> listOffsets(String topic, + OffsetSpec offsetSpec) { + return topicPartitions(topic).flatMap(tps -> listOffsets(tps, offsetSpec)); + } + + public Mono> listOffsets(Set partitions, + OffsetSpec offsetSpec) { + return toMono( + client.listOffsets(partitions.stream().collect(toMap(tp -> tp, tp -> offsetSpec))).all()) + .map(offsets -> offsets.entrySet() + .stream() + // filtering partitions for which offsets were not found + .filter(e -> e.getValue().offset() >= 0) + .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()))); + } + + private Mono> topicPartitions(String topic) { + return toMono(client.describeTopics(List.of(topic)).all()) + .map(r -> r.values().stream() + .findFirst() + .stream() + .flatMap(d -> d.partitions().stream()) + .map(p -> new TopicPartition(topic, p.partition())) + .collect(Collectors.toSet()) + ); + } + + public Mono updateBrokerConfigByName(Integer brokerId, String name, String value) { + ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); + AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET); + return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all()); + } + + public Mono deleteRecords(Map offsets) { + var records = offsets.entrySet().stream() + .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()))) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + return toMono(client.deleteRecords(records).all()); + } + + public Mono alterReplicaLogDirs(Map replicaAssignment) { + return toMono(client.alterReplicaLogDirs(replicaAssignment).all()); + } + + private Mono incrementalAlterConfig(String topicName, Map configs) { + var config = configs.entrySet().stream() + .flatMap(cfg -> Stream.of( + new AlterConfigOp( + new ConfigEntry( + cfg.getKey(), + cfg.getValue()), + AlterConfigOp.OpType.SET))) + .collect(toList()); + var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); + return toMono(client.incrementalAlterConfigs(Map.of(topicResource, config)).all()); + } + + @SuppressWarnings("deprecation") + private Mono alterConfig(String topicName, Map configs) { + List configEntries = configs.entrySet().stream() + .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))) + .collect(toList()); + Config config = new Config(configEntries); + var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); + return toMono(client.alterConfigs(Map.of(topicResource, config)).all()); + } + + @Override + public void close() { + client.close(); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index ee77461381..84fcd3543c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -8,7 +8,6 @@ import com.provectus.kafka.ui.model.ConsumerGroupDTO; import com.provectus.kafka.ui.model.ConsumerGroupDetailsDTO; import com.provectus.kafka.ui.model.ConsumerGroupStateDTO; import com.provectus.kafka.ui.model.ConsumerGroupTopicPartitionDTO; -import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.InternalPartition; @@ -24,7 +23,6 @@ import java.time.OffsetDateTime; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,48 +31,21 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.log4j.Log4j2; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Bytes; -import reactor.core.publisher.Mono; @Log4j2 public class ClusterUtil { - private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version"; - private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); - public static Mono toMono(KafkaFuture future) { - return Mono.create(sink -> future.whenComplete((res, ex) -> { - if (ex != null) { - sink.error(ex); - } else { - sink.success(res); - } - })); - } - - public static Mono toMono(KafkaFuture future, String topicName) { - return Mono.create(sink -> future.whenComplete((res, ex) -> { - if (ex != null) { - sink.error(ex); - } else { - sink.success(topicName); - } - })); - } - public static InternalConsumerGroup convertToInternalConsumerGroup( ConsumerGroupDescription description, Map offsets) { @@ -334,46 +305,6 @@ public class ClusterUtil { } } - public static Mono> getSupportedFeatures( - AdminClient adminClient) { - return getClusterVersion(adminClient) - .map(ClusterUtil::getSupportedUpdateFeature) - .map(Collections::singleton); - } - - private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(String version) { - try { - final String[] parts = version.split("\\."); - if (parts.length > 2) { - version = parts[0] + "." + parts[1]; - } - return Float.parseFloat(version.split("-")[0]) <= 2.3f - ? ExtendedAdminClient.SupportedFeature.ALTER_CONFIGS : - ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS; - } catch (Exception e) { - log.error("Conversion clusterVersion {} to float value failed", version); - throw e; - } - } - - public static Mono getClusterVersion(AdminClient adminClient) { - return ClusterUtil.toMono(adminClient.describeCluster().controller()) - .map(Node::id) - .map(id -> Collections - .singletonList(new ConfigResource(ConfigResource.Type.BROKER, id.toString()))) - .map(brokerCR -> adminClient.describeConfigs(brokerCR).all()) - .flatMap(ClusterUtil::toMono) - .map(ClusterUtil::getClusterVersion); - } - - public static String getClusterVersion(Map configs) { - return configs.values().stream() - .map(Config::entries) - .flatMap(Collection::stream) - .filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY)) - .findFirst().map(ConfigEntry::value).orElse("1.0-UNKNOWN"); - } - public static Map toSingleMap(Stream> streamOfMaps) { return streamOfMaps diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java index a729f16c47..d01dfafb38 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/NumberUtil.java @@ -1,7 +1,9 @@ package com.provectus.kafka.ui.util; +import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.math.NumberUtils; +@Log4j2 public class NumberUtil { private NumberUtil() { @@ -10,4 +12,17 @@ public class NumberUtil { public static boolean isNumeric(Object value) { return value != null && NumberUtils.isCreatable(value.toString()); } + + public static float parserClusterVersion(String version) { + try { + final String[] parts = version.split("\\."); + if (parts.length > 2) { + version = parts[0] + "." + parts[1]; + } + return Float.parseFloat(version.split("-")[0]); + } catch (Exception e) { + log.error("Conversion clusterVersion {} to float value failed", version); + throw e; + } + } } \ No newline at end of file diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/ReadOnlyModeTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/ReadOnlyModeTests.java index fd74ccc114..f0f1c00c7d 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/ReadOnlyModeTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/ReadOnlyModeTests.java @@ -61,19 +61,21 @@ public class ReadOnlyModeTests extends AbstractBaseTest { .name(topicName) .partitions(1) .replicationFactor(1) - .configs(Map.of()) ) .exchange() .expectStatus() .isOk(); + webTestClient.patch() .uri("/api/clusters/{clusterName}/topics/{topicName}", LOCAL, topicName) .bodyValue(new TopicUpdateDTO() - .configs(Map.of()) + .configs(Map.of("cleanup.policy", "compact")) ) .exchange() .expectStatus() - .isOk(); + .isOk() + .expectBody() + .jsonPath("$.cleanUpPolicy").isEqualTo("COMPACT"); } @Test diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java index 290cfec9d7..0efef1d205 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java @@ -19,11 +19,13 @@ import java.util.stream.Stream; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.AfterEach; @@ -46,17 +48,13 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { private final String groupId = "OffsetsResetServiceTestGroup-" + UUID.randomUUID(); private final String topic = "OffsetsResetServiceTestTopic-" + UUID.randomUUID(); - private KafkaService kafkaService; private OffsetsResetService offsetsResetService; @BeforeEach void init() { AdminClientServiceImpl adminClientService = new AdminClientServiceImpl(); - BrokerService brokerService = new BrokerServiceImpl(adminClientService); - FeatureService featureService = new FeatureServiceImpl(brokerService); adminClientService.setClientTimeout(5_000); - kafkaService = new KafkaService(null, null, null, null, adminClientService, featureService); - offsetsResetService = new OffsetsResetService(kafkaService, adminClientService); + offsetsResetService = new OffsetsResetService(adminClientService); createTopic(new NewTopic(topic, PARTITIONS, (short) 1)); createConsumerGroup(); @@ -228,7 +226,14 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { } private Consumer groupConsumer() { - return kafkaService.createConsumer(CLUSTER, Map.of(ConsumerConfig.GROUP_ID_CONFIG, groupId)); + Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID()); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.getBootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + return new KafkaConsumer<>(props); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java index d69f2f8cff..a642364d91 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java @@ -200,7 +200,7 @@ public class SendAndReadTests extends AbstractBaseTest { .withKeySchema(AVRO_SCHEMA_PRIMITIVE_STRING) .withValueSchema(AVRO_SCHEMA_PRIMITIVE_INT) .withMsgToSend( - new CreateTopicMessage() + new CreateTopicMessageDTO() .key("\"some string\"") .content("123") )