package com.provectus.kafka.ui.service; import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.provectus.kafka.ui.exception.IllegalEntityStateException; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.util.MapUtil; import com.provectus.kafka.ui.util.NumberUtil; import com.provectus.kafka.ui.util.annotations.KafkaClientInternalsDependant; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.DescribeConfigsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @Slf4j @RequiredArgsConstructor public class ReactiveAdminClient implements Closeable { private enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), CONFIG_DOCUMENTATION_RETRIEVAL(2.6f); private final float sinceVersion; SupportedFeature(float sinceVersion) { this.sinceVersion = sinceVersion; } static Set forVersion(float kafkaVersion) { return Arrays.stream(SupportedFeature.values()) .filter(f -> kafkaVersion >= f.sinceVersion) .collect(Collectors.toSet()); } static Set defaultFeatures() { return Set.of(); } } @Value public static class ClusterDescription { @Nullable Node controller; String clusterId; Collection nodes; Set authorizedOperations; } public static Mono create(AdminClient adminClient) { return getClusterVersion(adminClient) .map(ver -> new ReactiveAdminClient( adminClient, ver, getSupportedUpdateFeaturesForVersion(ver))); } private static Set getSupportedUpdateFeaturesForVersion(String versionStr) { try { float version = NumberUtil.parserClusterVersion(versionStr); return SupportedFeature.forVersion(version); } catch (NumberFormatException e) { return SupportedFeature.defaultFeatures(); } } //TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here private static Mono toMono(KafkaFuture future) { return Mono.create(sink -> future.whenComplete((res, ex) -> { if (ex != null) { // KafkaFuture doc is unclear about what exception wrapper will be used // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both if (ex instanceof CompletionException || ex instanceof ExecutionException) { sink.error(ex.getCause()); //unwrapping exception } else { sink.error(ex); } } else { sink.success(res); } })).doOnCancel(() -> future.cancel(true)) // AdminClient is using single thread for kafka communication // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread. // If some of downstream operation are blocking (by mistake) this can lead to // other AdminClient's requests stucking, which can cause timeout exceptions. // So, we explicitly setting Scheduler for downstream processing. .publishOn(Schedulers.parallel()); } //--------------------------------------------------------------------------------- @Getter(AccessLevel.PACKAGE) // visible for testing private final AdminClient client; private final String version; private final Set features; public Mono> listTopics(boolean listInternal) { return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names()); } public Mono deleteTopic(String topicName) { return toMono(client.deleteTopics(List.of(topicName)).all()); } public String getVersion() { return version; } public Mono>> getTopicsConfig() { return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false)); } //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient) public Mono>> getTopicsConfig(Collection topicNames, boolean includeDoc) { var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc; // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topicNames, 200, part -> getTopicsConfigImpl(part, includeDocFixed), (m1, m2) -> ImmutableMap.>builder().putAll(m1).putAll(m2).build() ); } private Mono>> getTopicsConfigImpl(Collection topicNames, boolean includeDoc) { List resources = topicNames.stream() .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .collect(toList()); return toMonoWithExceptionFilter( client.describeConfigs( resources, new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(), UnknownTopicOrPartitionException.class ).map(config -> config.entrySet().stream() .collect(toMap( c -> c.getKey().name(), c -> List.copyOf(c.getValue().entries())))); } private static Mono>> loadBrokersConfig(AdminClient client, List brokerIds) { List resources = brokerIds.stream() .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId))) .collect(toList()); return toMono(client.describeConfigs(resources).all()) .doOnError(InvalidRequestException.class, th -> log.trace("Error while getting broker {} configs", brokerIds, th)) // some kafka backends (like MSK serverless) do not support broker's configs retrieval, // in that case InvalidRequestException will be thrown .onErrorResume(InvalidRequestException.class, th -> Mono.just(Map.of())) .map(config -> config.entrySet().stream() .collect(toMap( c -> Integer.valueOf(c.getKey().name()), c -> new ArrayList<>(c.getValue().entries())))); } /** * Return per-broker configs or empty map if broker's configs retrieval not supported. */ public Mono>> loadBrokersConfig(List brokerIds) { return loadBrokersConfig(client, brokerIds); } public Mono> describeTopics() { return listTopics(true).flatMap(this::describeTopics); } public Mono> describeTopics(Collection topics) { // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topics, 200, this::describeTopicsImpl, (m1, m2) -> ImmutableMap.builder().putAll(m1).putAll(m2).build() ); } private Mono> describeTopicsImpl(Collection topics) { return toMonoWithExceptionFilter( client.describeTopics(topics).values(), UnknownTopicOrPartitionException.class ); } /** * Returns TopicDescription mono, or Empty Mono if topic not found. */ public Mono describeTopic(String topic) { return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic))); } /** * Kafka API often returns Map responses with KafkaFuture values. If we do allOf() * logic resulting Mono will be failing if any of Futures finished with error. * In some situations it is not what we want, ex. we call describeTopics(List names) method and * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put * such topics in resulting map. *

* This method converts input map into Mono[Map] ignoring keys for which KafkaFutures * finished with clazz exception. */ static Mono> toMonoWithExceptionFilter(Map> values, Class clazz) { if (values.isEmpty()) { return Mono.just(Map.of()); } List>> monos = values.entrySet().stream() .map(e -> toMono(e.getValue()).map(r -> Tuples.of(e.getKey(), r))) .collect(toList()); return Mono.create(sink -> { var finishedCnt = new AtomicInteger(); var results = new ConcurrentHashMap(); monos.forEach(mono -> mono.subscribe( r -> { results.put(r.getT1(), r.getT2()); if (finishedCnt.incrementAndGet() == monos.size()) { sink.success(results); } }, th -> { if (!th.getClass().isAssignableFrom(clazz)) { sink.error(th); } else if (finishedCnt.incrementAndGet() == monos.size()) { sink.success(results); } } )); }); } public Mono>> describeLogDirs() { return describeCluster() .map(d -> d.getNodes().stream().map(Node::id).collect(toList())) .flatMap(this::describeLogDirs); } public Mono>> describeLogDirs( Collection brokerIds) { return toMono(client.describeLogDirs(brokerIds).all()); } public Mono describeCluster() { var r = client.describeCluster(); var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations()); return Mono.create(sink -> all.whenComplete((res, ex) -> { if (ex != null) { sink.error(ex); } else { try { sink.success( new ClusterDescription( getUninterruptibly(r.controller()), getUninterruptibly(r.clusterId()), getUninterruptibly(r.nodes()), getUninterruptibly(r.authorizedOperations()) ) ); } catch (ExecutionException e) { // can't be here, because all futures already completed } } })); } private static Mono getClusterVersion(AdminClient client) { return toMono(client.describeCluster().controller()) .flatMap(controller -> loadBrokersConfig(client, List.of(controller.id()))) .map(configs -> configs.values().stream() .flatMap(Collection::stream) .filter(entry -> entry.name().contains("inter.broker.protocol.version")) .findFirst() .map(ConfigEntry::value) .orElse("1.0-UNKNOWN") ); } public Mono deleteConsumerGroups(Collection groupIds) { return toMono(client.deleteConsumerGroups(groupIds).all()) .onErrorResume(GroupIdNotFoundException.class, th -> Mono.error(new NotFoundException("The group id does not exist"))) .onErrorResume(GroupNotEmptyException.class, th -> Mono.error(new IllegalEntityStateException("The group is not empty"))); } public Mono createTopic(String name, int numPartitions, @Nullable Integer replicationFactor, Map configs) { var newTopic = new NewTopic( name, Optional.of(numPartitions), Optional.ofNullable(replicationFactor).map(Integer::shortValue) ).configs(configs); return toMono(client.createTopics(List.of(newTopic)).all()); } public Mono alterPartitionReassignments( Map> reassignments) { return toMono(client.alterPartitionReassignments(reassignments).all()); } public Mono createPartitions(Map newPartitionsMap) { return toMono(client.createPartitions(newPartitionsMap).all()); } // NOTE: places whole current topic config with new one. Entries that were present in old config, // but missed in new will be set to default public Mono updateTopicConfig(String topicName, Map configs) { if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) { return getTopicsConfigImpl(List.of(topicName), false) .map(conf -> conf.getOrDefault(topicName, List.of())) .flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs)); } else { return alterConfig(topicName, configs); } } public Mono> listConsumerGroups() { return toMono(client.listConsumerGroups().all()) .map(lst -> lst.stream().map(ConsumerGroupListing::groupId).collect(toList())); } public Mono> describeConsumerGroups(Collection groupIds) { return toMono(client.describeConsumerGroups(groupIds).all()); } public Mono> listConsumerGroupOffsets(String groupId) { return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions()); } public Mono> listConsumerGroupOffsets( String groupId, List partitions) { return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions().topicPartitions(partitions)); } private Mono> listConsumerGroupOffsets( String groupId, ListConsumerGroupOffsetsOptions options) { return toMono(client.listConsumerGroupOffsets(groupId, options).partitionsToOffsetAndMetadata()) .map(MapUtil::removeNullValues) .map(m -> m.entrySet().stream() .map(e -> Tuples.of(e.getKey(), e.getValue().offset())) .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2))); } public Mono alterConsumerGroupOffsets(String groupId, Map offsets) { return toMono(client.alterConsumerGroupOffsets( groupId, offsets.entrySet().stream() .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())))) .all()); } /** * List offset for the topic's partitions and OffsetSpec. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions, * false - skip partitions with no leader */ public Mono> listTopicOffsets(String topic, OffsetSpec offsetSpec, boolean failOnUnknownLeader) { return describeTopic(topic) .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader)) .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec)); } /** * List offset for the specified partitions and OffsetSpec. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions, * false - skip partitions with no leader */ public Mono> listOffsets(Collection partitions, OffsetSpec offsetSpec, boolean failOnUnknownLeader) { return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader) .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec)); } private Mono> filterPartitionsWithLeaderCheck(Collection partitions, boolean failOnUnknownLeader) { var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()); return describeTopicsImpl(targetTopics) .map(descriptions -> filterPartitionsWithLeaderCheck( descriptions.values(), partitions::contains, failOnUnknownLeader)); } private Set filterPartitionsWithLeaderCheck(Collection topicDescriptions, Predicate partitionPredicate, boolean failOnUnknownLeader) { var goodPartitions = new HashSet(); for (TopicDescription description : topicDescriptions) { for (TopicPartitionInfo partitionInfo : description.partitions()) { TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition()); if (!partitionPredicate.test(topicPartition)) { continue; } if (partitionInfo.leader() != null) { goodPartitions.add(topicPartition); } else if (failOnUnknownLeader) { throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition)); } } } return goodPartitions; } // 1. NOTE(!): should only apply for partitions with existing leader, // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout) // 2. NOTE(!): Skips partitions that were not initialized yet // (UnknownTopicOrPartitionException thrown, ex. after topic creation) // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead @KafkaClientInternalsDependant public Mono> listOffsetsUnsafe(Collection partitions, OffsetSpec offsetSpec) { Function, Mono>> call = parts -> { ListOffsetsResult r = client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec))); Map> perPartitionResults = new HashMap<>(); parts.forEach(p -> perPartitionResults.put(p, r.partitionResult(p))); return toMonoWithExceptionFilter(perPartitionResults, UnknownTopicOrPartitionException.class) .map(offsets -> offsets.entrySet().stream() // filtering partitions for which offsets were not found .filter(e -> e.getValue().offset() >= 0) .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()))); }; return partitionCalls( partitions, 200, call, (m1, m2) -> ImmutableMap.builder().putAll(m1).putAll(m2).build() ); } public Mono updateBrokerConfigByName(Integer brokerId, String name, String value) { ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET); return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all()); } public Mono deleteRecords(Map offsets) { var records = offsets.entrySet().stream() .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()))) .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); return toMono(client.deleteRecords(records).all()); } public Mono alterReplicaLogDirs(Map replicaAssignment) { return toMono(client.alterReplicaLogDirs(replicaAssignment).all()); } private Mono incrementalAlterConfig(String topicName, List currentConfigs, Map newConfigs) { var configsToDelete = currentConfigs.stream() .filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only .filter(e -> !newConfigs.containsKey(e.name())) .map(e -> new AlterConfigOp(e, AlterConfigOp.OpType.DELETE)); var configsToSet = newConfigs.entrySet().stream() .map(e -> new AlterConfigOp(new ConfigEntry(e.getKey(), e.getValue()), AlterConfigOp.OpType.SET)); return toMono(client.incrementalAlterConfigs( Map.of( new ConfigResource(ConfigResource.Type.TOPIC, topicName), Stream.concat(configsToDelete, configsToSet).toList() )).all()); } @SuppressWarnings("deprecation") private Mono alterConfig(String topicName, Map configs) { List configEntries = configs.entrySet().stream() .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))) .collect(toList()); Config config = new Config(configEntries); var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); return toMono(client.alterConfigs(Map.of(topicResource, config)).all()); } /** * Splits input collection into batches, applies each batch sequentially to function * and merges output Monos into one Mono. */ private static Mono partitionCalls(Collection items, int partitionSize, Function, Mono> call, BiFunction merger) { if (items.isEmpty()) { return call.apply(items); } Iterator> parts = Iterators.partition(items.iterator(), partitionSize); Mono mono = call.apply(parts.next()); while (parts.hasNext()) { var nextPart = parts.next(); // calls will be executed sequentially mono = mono.flatMap(res1 -> call.apply(nextPart).map(res2 -> merger.apply(res1, res2))); } return mono; } @Override public void close() { client.close(); } }