ReactiveAdminClient.java 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646
  1. package com.provectus.kafka.ui.service;
  2. import static java.util.stream.Collectors.toList;
  3. import static java.util.stream.Collectors.toMap;
  4. import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
  5. import com.google.common.collect.ImmutableTable;
  6. import com.google.common.collect.Iterables;
  7. import com.google.common.collect.Table;
  8. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  9. import com.provectus.kafka.ui.exception.NotFoundException;
  10. import com.provectus.kafka.ui.exception.ValidationException;
  11. import com.provectus.kafka.ui.util.KafkaVersion;
  12. import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
  13. import java.io.Closeable;
  14. import java.util.ArrayList;
  15. import java.util.Arrays;
  16. import java.util.Collection;
  17. import java.util.HashMap;
  18. import java.util.HashSet;
  19. import java.util.List;
  20. import java.util.Map;
  21. import java.util.Optional;
  22. import java.util.Set;
  23. import java.util.concurrent.CompletionException;
  24. import java.util.concurrent.ExecutionException;
  25. import java.util.function.BiFunction;
  26. import java.util.function.Function;
  27. import java.util.function.Predicate;
  28. import java.util.stream.Collectors;
  29. import java.util.stream.Stream;
  30. import javax.annotation.Nullable;
  31. import lombok.AccessLevel;
  32. import lombok.Getter;
  33. import lombok.RequiredArgsConstructor;
  34. import lombok.Value;
  35. import lombok.extern.slf4j.Slf4j;
  36. import org.apache.kafka.clients.admin.AdminClient;
  37. import org.apache.kafka.clients.admin.AlterConfigOp;
  38. import org.apache.kafka.clients.admin.Config;
  39. import org.apache.kafka.clients.admin.ConfigEntry;
  40. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  41. import org.apache.kafka.clients.admin.ConsumerGroupListing;
  42. import org.apache.kafka.clients.admin.DescribeClusterOptions;
  43. import org.apache.kafka.clients.admin.DescribeClusterResult;
  44. import org.apache.kafka.clients.admin.DescribeConfigsOptions;
  45. import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
  46. import org.apache.kafka.clients.admin.ListOffsetsResult;
  47. import org.apache.kafka.clients.admin.ListTopicsOptions;
  48. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  49. import org.apache.kafka.clients.admin.NewPartitions;
  50. import org.apache.kafka.clients.admin.NewTopic;
  51. import org.apache.kafka.clients.admin.OffsetSpec;
  52. import org.apache.kafka.clients.admin.RecordsToDelete;
  53. import org.apache.kafka.clients.admin.TopicDescription;
  54. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  55. import org.apache.kafka.common.KafkaException;
  56. import org.apache.kafka.common.KafkaFuture;
  57. import org.apache.kafka.common.Node;
  58. import org.apache.kafka.common.TopicPartition;
  59. import org.apache.kafka.common.TopicPartitionInfo;
  60. import org.apache.kafka.common.TopicPartitionReplica;
  61. import org.apache.kafka.common.acl.AclOperation;
  62. import org.apache.kafka.common.config.ConfigResource;
  63. import org.apache.kafka.common.errors.ClusterAuthorizationException;
  64. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  65. import org.apache.kafka.common.errors.GroupNotEmptyException;
  66. import org.apache.kafka.common.errors.InvalidRequestException;
  67. import org.apache.kafka.common.errors.TopicAuthorizationException;
  68. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  69. import org.apache.kafka.common.errors.UnsupportedVersionException;
  70. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  71. import reactor.core.publisher.Flux;
  72. import reactor.core.publisher.Mono;
  73. import reactor.core.scheduler.Schedulers;
  74. import reactor.util.function.Tuple2;
  75. import reactor.util.function.Tuples;
  76. @Slf4j
  77. @RequiredArgsConstructor
  78. public class ReactiveAdminClient implements Closeable {
  79. private enum SupportedFeature {
  80. INCREMENTAL_ALTER_CONFIGS(2.3f),
  81. CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
  82. DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f);
  83. private final float sinceVersion;
  84. SupportedFeature(float sinceVersion) {
  85. this.sinceVersion = sinceVersion;
  86. }
  87. static Set<SupportedFeature> forVersion(float kafkaVersion) {
  88. return Arrays.stream(SupportedFeature.values())
  89. .filter(f -> kafkaVersion >= f.sinceVersion)
  90. .collect(Collectors.toSet());
  91. }
  92. static Set<SupportedFeature> defaultFeatures() {
  93. return Set.of();
  94. }
  95. }
  96. @Value
  97. public static class ClusterDescription {
  98. @Nullable
  99. Node controller;
  100. String clusterId;
  101. Collection<Node> nodes;
  102. Set<AclOperation> authorizedOperations;
  103. }
  104. public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
  105. return getClusterVersion(adminClient)
  106. .map(ver ->
  107. new ReactiveAdminClient(
  108. adminClient,
  109. ver,
  110. getSupportedUpdateFeaturesForVersion(ver)));
  111. }
  112. private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
  113. try {
  114. float version = KafkaVersion.parse(versionStr);
  115. return SupportedFeature.forVersion(version);
  116. } catch (NumberFormatException e) {
  117. return SupportedFeature.defaultFeatures();
  118. }
  119. }
  120. // NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
  121. // (see MonoSink.success(..) javadoc for details)
  122. public static <T> Mono<T> toMono(KafkaFuture<T> future) {
  123. return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
  124. if (ex != null) {
  125. // KafkaFuture doc is unclear about what exception wrapper will be used
  126. // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
  127. if (ex instanceof CompletionException || ex instanceof ExecutionException) {
  128. sink.error(ex.getCause()); //unwrapping exception
  129. } else {
  130. sink.error(ex);
  131. }
  132. } else {
  133. sink.success(res);
  134. }
  135. })).doOnCancel(() -> future.cancel(true))
  136. // AdminClient is using single thread for kafka communication
  137. // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
  138. // If some of downstream operation are blocking (by mistake) this can lead to
  139. // other AdminClient's requests stucking, which can cause timeout exceptions.
  140. // So, we explicitly setting Scheduler for downstream processing.
  141. .publishOn(Schedulers.parallel());
  142. }
  143. //---------------------------------------------------------------------------------
  144. @Getter(AccessLevel.PACKAGE) // visible for testing
  145. private final AdminClient client;
  146. private final String version;
  147. private final Set<SupportedFeature> features;
  148. public Mono<Set<String>> listTopics(boolean listInternal) {
  149. return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
  150. }
  151. public Mono<Void> deleteTopic(String topicName) {
  152. return toMono(client.deleteTopics(List.of(topicName)).all());
  153. }
  154. public String getVersion() {
  155. return version;
  156. }
  157. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
  158. return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
  159. }
  160. //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
  161. //and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
  162. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
  163. var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
  164. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  165. return partitionCalls(
  166. topicNames,
  167. 200,
  168. part -> getTopicsConfigImpl(part, includeDocFixed),
  169. mapMerger()
  170. );
  171. }
  172. private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
  173. List<ConfigResource> resources = topicNames.stream()
  174. .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
  175. .collect(toList());
  176. return toMonoWithExceptionFilter(
  177. client.describeConfigs(
  178. resources,
  179. new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
  180. UnknownTopicOrPartitionException.class,
  181. TopicAuthorizationException.class
  182. ).map(config -> config.entrySet().stream()
  183. .collect(toMap(
  184. c -> c.getKey().name(),
  185. c -> List.copyOf(c.getValue().entries()))));
  186. }
  187. private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClient client, List<Integer> brokerIds) {
  188. List<ConfigResource> resources = brokerIds.stream()
  189. .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
  190. .collect(toList());
  191. return toMono(client.describeConfigs(resources).all())
  192. // some kafka backends (like MSK serverless) do not support broker's configs retrieval,
  193. // in that case InvalidRequestException will be thrown
  194. .onErrorResume(InvalidRequestException.class, th -> {
  195. log.trace("Error while getting broker {} configs", brokerIds, th);
  196. return Mono.just(Map.of());
  197. })
  198. // there are situations when kafka-ui user has no DESCRIBE_CONFIGS permission on cluster
  199. .onErrorResume(ClusterAuthorizationException.class, th -> {
  200. log.trace("AuthorizationException while getting configs for brokers {}", brokerIds, th);
  201. return Mono.just(Map.of());
  202. })
  203. .map(config -> config.entrySet().stream()
  204. .collect(toMap(
  205. c -> Integer.valueOf(c.getKey().name()),
  206. c -> new ArrayList<>(c.getValue().entries()))));
  207. }
  208. /**
  209. * Return per-broker configs or empty map if broker's configs retrieval not supported.
  210. */
  211. public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
  212. return loadBrokersConfig(client, brokerIds);
  213. }
  214. public Mono<Map<String, TopicDescription>> describeTopics() {
  215. return listTopics(true).flatMap(this::describeTopics);
  216. }
  217. public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
  218. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  219. return partitionCalls(
  220. topics,
  221. 200,
  222. this::describeTopicsImpl,
  223. mapMerger()
  224. );
  225. }
  226. private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
  227. return toMonoWithExceptionFilter(
  228. client.describeTopics(topics).topicNameValues(),
  229. UnknownTopicOrPartitionException.class,
  230. // we only describe topics that we see from listTopics() API, so we should have permission to do it,
  231. // but also adding this exception here for rare case when access restricted after we called listTopics()
  232. TopicAuthorizationException.class
  233. );
  234. }
  235. /**
  236. * Returns TopicDescription mono, or Empty Mono if topic not visible.
  237. */
  238. public Mono<TopicDescription> describeTopic(String topic) {
  239. return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic)));
  240. }
  241. /**
  242. * Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
  243. * logic resulting Mono will be failing if any of Futures finished with error.
  244. * In some situations it is not what we want, ex. we call describeTopics(List names) method and
  245. * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
  246. * such topics in resulting map.
  247. * <p/>
  248. * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
  249. * finished with <code>classes</code> exceptions and empty Monos.
  250. */
  251. @SafeVarargs
  252. static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
  253. Class<? extends KafkaException>... classes) {
  254. if (values.isEmpty()) {
  255. return Mono.just(Map.of());
  256. }
  257. List<Mono<Tuple2<K, Optional<V>>>> monos = values.entrySet().stream()
  258. .map(e ->
  259. toMono(e.getValue())
  260. .map(r -> Tuples.of(e.getKey(), Optional.of(r)))
  261. .defaultIfEmpty(Tuples.of(e.getKey(), Optional.empty())) //tracking empty Monos
  262. .onErrorResume(
  263. // tracking Monos with suppressible error
  264. th -> Stream.of(classes).anyMatch(clazz -> th.getClass().isAssignableFrom(clazz)),
  265. th -> Mono.just(Tuples.of(e.getKey(), Optional.empty()))))
  266. .toList();
  267. return Mono.zip(
  268. monos,
  269. resultsArr -> Stream.of(resultsArr)
  270. .map(obj -> (Tuple2<K, Optional<V>>) obj)
  271. .filter(t -> t.getT2().isPresent()) //skipping empty & suppressible-errors
  272. .collect(Collectors.toMap(Tuple2::getT1, t -> t.getT2().get()))
  273. );
  274. }
  275. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
  276. return describeCluster()
  277. .map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
  278. .flatMap(this::describeLogDirs);
  279. }
  280. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
  281. Collection<Integer> brokerIds) {
  282. return toMono(client.describeLogDirs(brokerIds).all())
  283. .onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of()))
  284. .onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of()))
  285. .onErrorResume(th -> true, th -> {
  286. log.warn("Error while calling describeLogDirs", th);
  287. return Mono.just(Map.of());
  288. });
  289. }
  290. public Mono<ClusterDescription> describeCluster() {
  291. return describeClusterImpl(client, features);
  292. }
  293. private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
  294. boolean includeAuthorizedOperations =
  295. features.contains(SupportedFeature.DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS);
  296. DescribeClusterResult result = client.describeCluster(
  297. new DescribeClusterOptions().includeAuthorizedOperations(includeAuthorizedOperations));
  298. var allOfFuture = KafkaFuture.allOf(
  299. result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
  300. return toMono(allOfFuture).then(
  301. Mono.fromCallable(() ->
  302. new ClusterDescription(
  303. result.controller().get(),
  304. result.clusterId().get(),
  305. result.nodes().get(),
  306. result.authorizedOperations().get()
  307. )
  308. )
  309. );
  310. }
  311. private static Mono<String> getClusterVersion(AdminClient client) {
  312. return describeClusterImpl(client, Set.of())
  313. // choosing node from which we will get configs (starting with controller)
  314. .flatMap(descr -> descr.controller != null
  315. ? Mono.just(descr.controller)
  316. : Mono.justOrEmpty(descr.nodes.stream().findFirst())
  317. )
  318. .flatMap(node -> loadBrokersConfig(client, List.of(node.id())))
  319. .flatMap(configs -> configs.values().stream()
  320. .flatMap(Collection::stream)
  321. .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
  322. .findFirst()
  323. .map(configEntry -> Mono.just(configEntry.value()))
  324. .orElse(Mono.empty()))
  325. .switchIfEmpty(Mono.just("1.0-UNKNOWN"));
  326. }
  327. public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
  328. return toMono(client.deleteConsumerGroups(groupIds).all())
  329. .onErrorResume(GroupIdNotFoundException.class,
  330. th -> Mono.error(new NotFoundException("The group id does not exist")))
  331. .onErrorResume(GroupNotEmptyException.class,
  332. th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
  333. }
  334. public Mono<Void> createTopic(String name,
  335. int numPartitions,
  336. @Nullable Integer replicationFactor,
  337. Map<String, String> configs) {
  338. var newTopic = new NewTopic(
  339. name,
  340. Optional.of(numPartitions),
  341. Optional.ofNullable(replicationFactor).map(Integer::shortValue)
  342. ).configs(configs);
  343. return toMono(client.createTopics(List.of(newTopic)).all());
  344. }
  345. public Mono<Void> alterPartitionReassignments(
  346. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
  347. return toMono(client.alterPartitionReassignments(reassignments).all());
  348. }
  349. public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
  350. return toMono(client.createPartitions(newPartitionsMap).all());
  351. }
  352. // NOTE: places whole current topic config with new one. Entries that were present in old config,
  353. // but missed in new will be set to default
  354. public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
  355. if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
  356. return getTopicsConfigImpl(List.of(topicName), false)
  357. .map(conf -> conf.getOrDefault(topicName, List.of()))
  358. .flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
  359. } else {
  360. return alterConfig(topicName, configs);
  361. }
  362. }
  363. public Mono<List<String>> listConsumerGroupNames() {
  364. return listConsumerGroups().map(lst -> lst.stream().map(ConsumerGroupListing::groupId).toList());
  365. }
  366. public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
  367. return toMono(client.listConsumerGroups().all());
  368. }
  369. public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
  370. return partitionCalls(
  371. groupIds,
  372. 25,
  373. 4,
  374. ids -> toMono(client.describeConsumerGroups(ids).all()),
  375. mapMerger()
  376. );
  377. }
  378. // group -> partition -> offset
  379. // NOTE: partitions with no committed offsets will be skipped
  380. public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<String> consumerGroups,
  381. // all partitions if null passed
  382. @Nullable List<TopicPartition> partitions) {
  383. Function<Collection<String>, Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>>> call =
  384. groups -> toMono(
  385. client.listConsumerGroupOffsets(
  386. groups.stream()
  387. .collect(Collectors.toMap(
  388. g -> g,
  389. g -> new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)
  390. ))).all()
  391. );
  392. Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
  393. consumerGroups,
  394. 25,
  395. 4,
  396. call,
  397. mapMerger()
  398. );
  399. return merged.map(map -> {
  400. var table = ImmutableTable.<String, TopicPartition, Long>builder();
  401. map.forEach((g, tpOffsets) -> tpOffsets.forEach((tp, offset) -> {
  402. if (offset != null) {
  403. // offset will be null for partitions that don't have committed offset for this group
  404. table.put(g, tp, offset.offset());
  405. }
  406. }));
  407. return table.build();
  408. });
  409. }
  410. public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
  411. return toMono(client.alterConsumerGroupOffsets(
  412. groupId,
  413. offsets.entrySet().stream()
  414. .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))))
  415. .all());
  416. }
  417. /**
  418. * List offset for the topic's partitions and OffsetSpec.
  419. *
  420. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  421. * false - skip partitions with no leader
  422. */
  423. public Mono<Map<TopicPartition, Long>> listTopicOffsets(String topic,
  424. OffsetSpec offsetSpec,
  425. boolean failOnUnknownLeader) {
  426. return describeTopic(topic)
  427. .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader))
  428. .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec));
  429. }
  430. /**
  431. * List offset for the specified partitions and OffsetSpec.
  432. *
  433. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  434. * false - skip partitions with no leader
  435. */
  436. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> partitions,
  437. OffsetSpec offsetSpec,
  438. boolean failOnUnknownLeader) {
  439. return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader)
  440. .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
  441. }
  442. private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
  443. boolean failOnUnknownLeader) {
  444. var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
  445. return describeTopicsImpl(targetTopics)
  446. .map(descriptions ->
  447. filterPartitionsWithLeaderCheck(
  448. descriptions.values(), partitions::contains, failOnUnknownLeader));
  449. }
  450. private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
  451. Predicate<TopicPartition> partitionPredicate,
  452. boolean failOnUnknownLeader) {
  453. var goodPartitions = new HashSet<TopicPartition>();
  454. for (TopicDescription description : topicDescriptions) {
  455. for (TopicPartitionInfo partitionInfo : description.partitions()) {
  456. TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
  457. if (!partitionPredicate.test(topicPartition)) {
  458. continue;
  459. }
  460. if (partitionInfo.leader() != null) {
  461. goodPartitions.add(topicPartition);
  462. } else if (failOnUnknownLeader) {
  463. throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
  464. }
  465. }
  466. }
  467. return goodPartitions;
  468. }
  469. // 1. NOTE(!): should only apply for partitions with existing leader,
  470. // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
  471. // 2. NOTE(!): Skips partitions that were not initialized yet
  472. // (UnknownTopicOrPartitionException thrown, ex. after topic creation)
  473. // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
  474. @KafkaClientInternalsDependant
  475. public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
  476. OffsetSpec offsetSpec) {
  477. Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
  478. parts -> {
  479. ListOffsetsResult r = client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec)));
  480. Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> perPartitionResults = new HashMap<>();
  481. parts.forEach(p -> perPartitionResults.put(p, r.partitionResult(p)));
  482. return toMonoWithExceptionFilter(perPartitionResults, UnknownTopicOrPartitionException.class)
  483. .map(offsets -> offsets.entrySet().stream()
  484. // filtering partitions for which offsets were not found
  485. .filter(e -> e.getValue().offset() >= 0)
  486. .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
  487. };
  488. return partitionCalls(
  489. partitions,
  490. 200,
  491. call,
  492. mapMerger()
  493. );
  494. }
  495. public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
  496. ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
  497. AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
  498. return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
  499. }
  500. public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
  501. var records = offsets.entrySet().stream()
  502. .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
  503. .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
  504. return toMono(client.deleteRecords(records).all());
  505. }
  506. public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
  507. return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
  508. }
  509. private Mono<Void> incrementalAlterConfig(String topicName,
  510. List<ConfigEntry> currentConfigs,
  511. Map<String, String> newConfigs) {
  512. var configsToDelete = currentConfigs.stream()
  513. .filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only
  514. .filter(e -> !newConfigs.containsKey(e.name()))
  515. .map(e -> new AlterConfigOp(e, AlterConfigOp.OpType.DELETE));
  516. var configsToSet = newConfigs.entrySet().stream()
  517. .map(e -> new AlterConfigOp(new ConfigEntry(e.getKey(), e.getValue()), AlterConfigOp.OpType.SET));
  518. return toMono(client.incrementalAlterConfigs(
  519. Map.of(
  520. new ConfigResource(ConfigResource.Type.TOPIC, topicName),
  521. Stream.concat(configsToDelete, configsToSet).toList()
  522. )).all());
  523. }
  524. @SuppressWarnings("deprecation")
  525. private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
  526. List<ConfigEntry> configEntries = configs.entrySet().stream()
  527. .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
  528. .collect(toList());
  529. Config config = new Config(configEntries);
  530. var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  531. return toMono(client.alterConfigs(Map.of(topicResource, config)).all());
  532. }
  533. /**
  534. * Splits input collection into batches, converts each batch into Mono, sequentially subscribes to them
  535. * and merges output Monos into one Mono.
  536. */
  537. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  538. int partitionSize,
  539. Function<Collection<I>, Mono<R>> call,
  540. BiFunction<R, R, R> merger) {
  541. if (items.isEmpty()) {
  542. return call.apply(items);
  543. }
  544. Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
  545. return Flux.fromIterable(parts)
  546. .concatMap(call)
  547. .reduce(merger);
  548. }
  549. /**
  550. * Splits input collection into batches, converts each batch into Mono, subscribes to them (concurrently,
  551. * with specified concurrency level) and merges output Monos into one Mono.
  552. */
  553. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  554. int partitionSize,
  555. int concurrency,
  556. Function<Collection<I>, Mono<R>> call,
  557. BiFunction<R, R, R> merger) {
  558. if (items.isEmpty()) {
  559. return call.apply(items);
  560. }
  561. Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
  562. return Flux.fromIterable(parts)
  563. .flatMap(call, concurrency)
  564. .reduce(merger);
  565. }
  566. private static <K, V> BiFunction<Map<K, V>, Map<K, V>, Map<K, V>> mapMerger() {
  567. return (m1, m2) -> {
  568. var merged = new HashMap<K, V>();
  569. merged.putAll(m1);
  570. merged.putAll(m2);
  571. return merged;
  572. };
  573. }
  574. @Override
  575. public void close() {
  576. client.close();
  577. }
  578. }