package com.provectus.kafka.ui.service; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import com.google.common.collect.ImmutableTable; import com.google.common.collect.Iterables; import com.google.common.collect.Table; import com.provectus.kafka.ui.exception.IllegalEntityStateException; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.util.KafkaVersion; import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.clients.admin.DescribeConfigsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @Slf4j @RequiredArgsConstructor public class ReactiveAdminClient implements Closeable { private enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), CONFIG_DOCUMENTATION_RETRIEVAL(2.6f), DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f); private final float sinceVersion; SupportedFeature(float sinceVersion) { this.sinceVersion = sinceVersion; } static Set forVersion(float kafkaVersion) { return Arrays.stream(SupportedFeature.values()) .filter(f -> kafkaVersion >= f.sinceVersion) .collect(Collectors.toSet()); } static Set defaultFeatures() { return Set.of(); } } @Value public static class ClusterDescription { @Nullable Node controller; String clusterId; Collection nodes; Set authorizedOperations; } public static Mono create(AdminClient adminClient) { return getClusterVersion(adminClient) .map(ver -> new ReactiveAdminClient( adminClient, ver, getSupportedUpdateFeaturesForVersion(ver))); } private static Set getSupportedUpdateFeaturesForVersion(String versionStr) { try { float version = KafkaVersion.parse(versionStr); return SupportedFeature.forVersion(version); } catch (NumberFormatException e) { return SupportedFeature.defaultFeatures(); } } // NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results // (see MonoSink.success(..) javadoc for details) public static Mono toMono(KafkaFuture future) { return Mono.create(sink -> future.whenComplete((res, ex) -> { if (ex != null) { // KafkaFuture doc is unclear about what exception wrapper will be used // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both if (ex instanceof CompletionException || ex instanceof ExecutionException) { sink.error(ex.getCause()); //unwrapping exception } else { sink.error(ex); } } else { sink.success(res); } })).doOnCancel(() -> future.cancel(true)) // AdminClient is using single thread for kafka communication // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread. // If some of downstream operation are blocking (by mistake) this can lead to // other AdminClient's requests stucking, which can cause timeout exceptions. // So, we explicitly setting Scheduler for downstream processing. .publishOn(Schedulers.parallel()); } //--------------------------------------------------------------------------------- @Getter(AccessLevel.PACKAGE) // visible for testing private final AdminClient client; private final String version; private final Set features; public Mono> listTopics(boolean listInternal) { return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names()); } public Mono deleteTopic(String topicName) { return toMono(client.deleteTopics(List.of(topicName)).all()); } public String getVersion() { return version; } public Mono>> getTopicsConfig() { return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false)); } //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient) //and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown) public Mono>> getTopicsConfig(Collection topicNames, boolean includeDoc) { var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc; // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topicNames, 200, part -> getTopicsConfigImpl(part, includeDocFixed), mapMerger() ); } private Mono>> getTopicsConfigImpl(Collection topicNames, boolean includeDoc) { List resources = topicNames.stream() .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .collect(toList()); return toMonoWithExceptionFilter( client.describeConfigs( resources, new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(), UnknownTopicOrPartitionException.class, TopicAuthorizationException.class ).map(config -> config.entrySet().stream() .collect(toMap( c -> c.getKey().name(), c -> List.copyOf(c.getValue().entries())))); } private static Mono>> loadBrokersConfig(AdminClient client, List brokerIds) { List resources = brokerIds.stream() .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId))) .collect(toList()); return toMono(client.describeConfigs(resources).all()) // some kafka backends (like MSK serverless) do not support broker's configs retrieval, // in that case InvalidRequestException will be thrown .onErrorResume(InvalidRequestException.class, th -> { log.trace("Error while getting broker {} configs", brokerIds, th); return Mono.just(Map.of()); }) // there are situations when kafka-ui user has no DESCRIBE_CONFIGS permission on cluster .onErrorResume(ClusterAuthorizationException.class, th -> { log.trace("AuthorizationException while getting configs for brokers {}", brokerIds, th); return Mono.just(Map.of()); }) .map(config -> config.entrySet().stream() .collect(toMap( c -> Integer.valueOf(c.getKey().name()), c -> new ArrayList<>(c.getValue().entries())))); } /** * Return per-broker configs or empty map if broker's configs retrieval not supported. */ public Mono>> loadBrokersConfig(List brokerIds) { return loadBrokersConfig(client, brokerIds); } public Mono> describeTopics() { return listTopics(true).flatMap(this::describeTopics); } public Mono> describeTopics(Collection topics) { // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topics, 200, this::describeTopicsImpl, mapMerger() ); } private Mono> describeTopicsImpl(Collection topics) { return toMonoWithExceptionFilter( client.describeTopics(topics).topicNameValues(), UnknownTopicOrPartitionException.class, // we only describe topics that we see from listTopics() API, so we should have permission to do it, // but also adding this exception here for rare case when access restricted after we called listTopics() TopicAuthorizationException.class ); } /** * Returns TopicDescription mono, or Empty Mono if topic not visible. */ public Mono describeTopic(String topic) { return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic))); } /** * Kafka API often returns Map responses with KafkaFuture values. If we do allOf() * logic resulting Mono will be failing if any of Futures finished with error. * In some situations it is not what we want, ex. we call describeTopics(List names) method and * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put * such topics in resulting map. *

* This method converts input map into Mono[Map] ignoring keys for which KafkaFutures * finished with classes exceptions and empty Monos. */ @SafeVarargs static Mono> toMonoWithExceptionFilter(Map> values, Class... classes) { if (values.isEmpty()) { return Mono.just(Map.of()); } List>>> monos = values.entrySet().stream() .map(e -> toMono(e.getValue()) .map(r -> Tuples.of(e.getKey(), Optional.of(r))) .defaultIfEmpty(Tuples.of(e.getKey(), Optional.empty())) //tracking empty Monos .onErrorResume( // tracking Monos with suppressible error th -> Stream.of(classes).anyMatch(clazz -> th.getClass().isAssignableFrom(clazz)), th -> Mono.just(Tuples.of(e.getKey(), Optional.empty())))) .toList(); return Mono.zip( monos, resultsArr -> Stream.of(resultsArr) .map(obj -> (Tuple2>) obj) .filter(t -> t.getT2().isPresent()) //skipping empty & suppressible-errors .collect(Collectors.toMap(Tuple2::getT1, t -> t.getT2().get())) ); } public Mono>> describeLogDirs() { return describeCluster() .map(d -> d.getNodes().stream().map(Node::id).collect(toList())) .flatMap(this::describeLogDirs); } public Mono>> describeLogDirs( Collection brokerIds) { return toMono(client.describeLogDirs(brokerIds).all()) .onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of())) .onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of())) .onErrorResume(th -> true, th -> { log.warn("Error while calling describeLogDirs", th); return Mono.just(Map.of()); }); } public Mono describeCluster() { return describeClusterImpl(client, features); } private static Mono describeClusterImpl(AdminClient client, Set features) { boolean includeAuthorizedOperations = features.contains(SupportedFeature.DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS); DescribeClusterResult result = client.describeCluster( new DescribeClusterOptions().includeAuthorizedOperations(includeAuthorizedOperations)); var allOfFuture = KafkaFuture.allOf( result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations()); return toMono(allOfFuture).then( Mono.fromCallable(() -> new ClusterDescription( result.controller().get(), result.clusterId().get(), result.nodes().get(), result.authorizedOperations().get() ) ) ); } private static Mono getClusterVersion(AdminClient client) { return describeClusterImpl(client, Set.of()) // choosing node from which we will get configs (starting with controller) .flatMap(descr -> descr.controller != null ? Mono.just(descr.controller) : Mono.justOrEmpty(descr.nodes.stream().findFirst()) ) .flatMap(node -> loadBrokersConfig(client, List.of(node.id()))) .flatMap(configs -> configs.values().stream() .flatMap(Collection::stream) .filter(entry -> entry.name().contains("inter.broker.protocol.version")) .findFirst() .map(configEntry -> Mono.just(configEntry.value())) .orElse(Mono.empty())) .switchIfEmpty(Mono.just("1.0-UNKNOWN")); } public Mono deleteConsumerGroups(Collection groupIds) { return toMono(client.deleteConsumerGroups(groupIds).all()) .onErrorResume(GroupIdNotFoundException.class, th -> Mono.error(new NotFoundException("The group id does not exist"))) .onErrorResume(GroupNotEmptyException.class, th -> Mono.error(new IllegalEntityStateException("The group is not empty"))); } public Mono createTopic(String name, int numPartitions, @Nullable Integer replicationFactor, Map configs) { var newTopic = new NewTopic( name, Optional.of(numPartitions), Optional.ofNullable(replicationFactor).map(Integer::shortValue) ).configs(configs); return toMono(client.createTopics(List.of(newTopic)).all()); } public Mono alterPartitionReassignments( Map> reassignments) { return toMono(client.alterPartitionReassignments(reassignments).all()); } public Mono createPartitions(Map newPartitionsMap) { return toMono(client.createPartitions(newPartitionsMap).all()); } // NOTE: places whole current topic config with new one. Entries that were present in old config, // but missed in new will be set to default public Mono updateTopicConfig(String topicName, Map configs) { if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) { return getTopicsConfigImpl(List.of(topicName), false) .map(conf -> conf.getOrDefault(topicName, List.of())) .flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs)); } else { return alterConfig(topicName, configs); } } public Mono> listConsumerGroupNames() { return listConsumerGroups().map(lst -> lst.stream().map(ConsumerGroupListing::groupId).toList()); } public Mono> listConsumerGroups() { return toMono(client.listConsumerGroups().all()); } public Mono> describeConsumerGroups(Collection groupIds) { return partitionCalls( groupIds, 25, 4, ids -> toMono(client.describeConsumerGroups(ids).all()), mapMerger() ); } // group -> partition -> offset // NOTE: partitions with no committed offsets will be skipped public Mono> listConsumerGroupOffsets(List consumerGroups, // all partitions if null passed @Nullable List partitions) { Function, Mono>>> call = groups -> toMono( client.listConsumerGroupOffsets( groups.stream() .collect(Collectors.toMap( g -> g, g -> new ListConsumerGroupOffsetsSpec().topicPartitions(partitions) ))).all() ); Mono>> merged = partitionCalls( consumerGroups, 25, 4, call, mapMerger() ); return merged.map(map -> { var table = ImmutableTable.builder(); map.forEach((g, tpOffsets) -> tpOffsets.forEach((tp, offset) -> { if (offset != null) { // offset will be null for partitions that don't have committed offset for this group table.put(g, tp, offset.offset()); } })); return table.build(); }); } public Mono alterConsumerGroupOffsets(String groupId, Map offsets) { return toMono(client.alterConsumerGroupOffsets( groupId, offsets.entrySet().stream() .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())))) .all()); } /** * List offset for the topic's partitions and OffsetSpec. * * @param failOnUnknownLeader true - throw exception in case of no-leader partitions, * false - skip partitions with no leader */ public Mono> listTopicOffsets(String topic, OffsetSpec offsetSpec, boolean failOnUnknownLeader) { return describeTopic(topic) .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader)) .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec)); } /** * List offset for the specified partitions and OffsetSpec. * * @param failOnUnknownLeader true - throw exception in case of no-leader partitions, * false - skip partitions with no leader */ public Mono> listOffsets(Collection partitions, OffsetSpec offsetSpec, boolean failOnUnknownLeader) { return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader) .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec)); } private Mono> filterPartitionsWithLeaderCheck(Collection partitions, boolean failOnUnknownLeader) { var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()); return describeTopicsImpl(targetTopics) .map(descriptions -> filterPartitionsWithLeaderCheck( descriptions.values(), partitions::contains, failOnUnknownLeader)); } private Set filterPartitionsWithLeaderCheck(Collection topicDescriptions, Predicate partitionPredicate, boolean failOnUnknownLeader) { var goodPartitions = new HashSet(); for (TopicDescription description : topicDescriptions) { for (TopicPartitionInfo partitionInfo : description.partitions()) { TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition()); if (!partitionPredicate.test(topicPartition)) { continue; } if (partitionInfo.leader() != null) { goodPartitions.add(topicPartition); } else if (failOnUnknownLeader) { throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition)); } } } return goodPartitions; } // 1. NOTE(!): should only apply for partitions with existing leader, // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout) // 2. NOTE(!): Skips partitions that were not initialized yet // (UnknownTopicOrPartitionException thrown, ex. after topic creation) // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead @KafkaClientInternalsDependant public Mono> listOffsetsUnsafe(Collection partitions, OffsetSpec offsetSpec) { Function, Mono>> call = parts -> { ListOffsetsResult r = client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec))); Map> perPartitionResults = new HashMap<>(); parts.forEach(p -> perPartitionResults.put(p, r.partitionResult(p))); return toMonoWithExceptionFilter(perPartitionResults, UnknownTopicOrPartitionException.class) .map(offsets -> offsets.entrySet().stream() // filtering partitions for which offsets were not found .filter(e -> e.getValue().offset() >= 0) .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()))); }; return partitionCalls( partitions, 200, call, mapMerger() ); } public Mono updateBrokerConfigByName(Integer brokerId, String name, String value) { ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET); return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all()); } public Mono deleteRecords(Map offsets) { var records = offsets.entrySet().stream() .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()))) .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); return toMono(client.deleteRecords(records).all()); } public Mono alterReplicaLogDirs(Map replicaAssignment) { return toMono(client.alterReplicaLogDirs(replicaAssignment).all()); } private Mono incrementalAlterConfig(String topicName, List currentConfigs, Map newConfigs) { var configsToDelete = currentConfigs.stream() .filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only .filter(e -> !newConfigs.containsKey(e.name())) .map(e -> new AlterConfigOp(e, AlterConfigOp.OpType.DELETE)); var configsToSet = newConfigs.entrySet().stream() .map(e -> new AlterConfigOp(new ConfigEntry(e.getKey(), e.getValue()), AlterConfigOp.OpType.SET)); return toMono(client.incrementalAlterConfigs( Map.of( new ConfigResource(ConfigResource.Type.TOPIC, topicName), Stream.concat(configsToDelete, configsToSet).toList() )).all()); } @SuppressWarnings("deprecation") private Mono alterConfig(String topicName, Map configs) { List configEntries = configs.entrySet().stream() .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))) .collect(toList()); Config config = new Config(configEntries); var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); return toMono(client.alterConfigs(Map.of(topicResource, config)).all()); } /** * Splits input collection into batches, converts each batch into Mono, sequentially subscribes to them * and merges output Monos into one Mono. */ private static Mono partitionCalls(Collection items, int partitionSize, Function, Mono> call, BiFunction merger) { if (items.isEmpty()) { return call.apply(items); } Iterable> parts = Iterables.partition(items, partitionSize); return Flux.fromIterable(parts) .concatMap(call) .reduce(merger); } /** * Splits input collection into batches, converts each batch into Mono, subscribes to them (concurrently, * with specified concurrency level) and merges output Monos into one Mono. */ private static Mono partitionCalls(Collection items, int partitionSize, int concurrency, Function, Mono> call, BiFunction merger) { if (items.isEmpty()) { return call.apply(items); } Iterable> parts = Iterables.partition(items, partitionSize); return Flux.fromIterable(parts) .flatMap(call, concurrency) .reduce(merger); } private static BiFunction, Map, Map> mapMerger() { return (m1, m2) -> { var merged = new HashMap(); merged.putAll(m1); merged.putAll(m2); return merged; }; } @Override public void close() { client.close(); } }