ReactiveAdminClient.java 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. package com.provectus.kafka.ui.service;
  2. import static java.util.stream.Collectors.toList;
  3. import static java.util.stream.Collectors.toMap;
  4. import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
  5. import com.google.common.annotations.VisibleForTesting;
  6. import com.google.common.collect.ImmutableTable;
  7. import com.google.common.collect.Iterables;
  8. import com.google.common.collect.Table;
  9. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  10. import com.provectus.kafka.ui.exception.NotFoundException;
  11. import com.provectus.kafka.ui.exception.ValidationException;
  12. import com.provectus.kafka.ui.util.KafkaVersion;
  13. import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
  14. import java.io.Closeable;
  15. import java.util.ArrayList;
  16. import java.util.Arrays;
  17. import java.util.Collection;
  18. import java.util.HashMap;
  19. import java.util.HashSet;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.Optional;
  23. import java.util.Set;
  24. import java.util.concurrent.CompletionException;
  25. import java.util.concurrent.ExecutionException;
  26. import java.util.function.BiFunction;
  27. import java.util.function.Function;
  28. import java.util.function.Predicate;
  29. import java.util.stream.Collectors;
  30. import java.util.stream.Stream;
  31. import javax.annotation.Nullable;
  32. import lombok.AccessLevel;
  33. import lombok.Getter;
  34. import lombok.RequiredArgsConstructor;
  35. import lombok.Value;
  36. import lombok.extern.slf4j.Slf4j;
  37. import org.apache.kafka.clients.admin.AdminClient;
  38. import org.apache.kafka.clients.admin.AlterConfigOp;
  39. import org.apache.kafka.clients.admin.Config;
  40. import org.apache.kafka.clients.admin.ConfigEntry;
  41. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  42. import org.apache.kafka.clients.admin.ConsumerGroupListing;
  43. import org.apache.kafka.clients.admin.DescribeClusterOptions;
  44. import org.apache.kafka.clients.admin.DescribeClusterResult;
  45. import org.apache.kafka.clients.admin.DescribeConfigsOptions;
  46. import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
  47. import org.apache.kafka.clients.admin.ListOffsetsResult;
  48. import org.apache.kafka.clients.admin.ListTopicsOptions;
  49. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  50. import org.apache.kafka.clients.admin.NewPartitions;
  51. import org.apache.kafka.clients.admin.NewTopic;
  52. import org.apache.kafka.clients.admin.OffsetSpec;
  53. import org.apache.kafka.clients.admin.RecordsToDelete;
  54. import org.apache.kafka.clients.admin.TopicDescription;
  55. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  56. import org.apache.kafka.common.KafkaException;
  57. import org.apache.kafka.common.KafkaFuture;
  58. import org.apache.kafka.common.Node;
  59. import org.apache.kafka.common.TopicPartition;
  60. import org.apache.kafka.common.TopicPartitionInfo;
  61. import org.apache.kafka.common.TopicPartitionReplica;
  62. import org.apache.kafka.common.acl.AclOperation;
  63. import org.apache.kafka.common.config.ConfigResource;
  64. import org.apache.kafka.common.errors.ClusterAuthorizationException;
  65. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  66. import org.apache.kafka.common.errors.GroupNotEmptyException;
  67. import org.apache.kafka.common.errors.InvalidRequestException;
  68. import org.apache.kafka.common.errors.TopicAuthorizationException;
  69. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  70. import org.apache.kafka.common.errors.UnsupportedVersionException;
  71. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  72. import reactor.core.publisher.Flux;
  73. import reactor.core.publisher.Mono;
  74. import reactor.core.scheduler.Schedulers;
  75. import reactor.util.function.Tuple2;
  76. import reactor.util.function.Tuples;
  77. @Slf4j
  78. @RequiredArgsConstructor
  79. public class ReactiveAdminClient implements Closeable {
  80. private enum SupportedFeature {
  81. INCREMENTAL_ALTER_CONFIGS(2.3f),
  82. CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
  83. DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f);
  84. private final float sinceVersion;
  85. SupportedFeature(float sinceVersion) {
  86. this.sinceVersion = sinceVersion;
  87. }
  88. static Set<SupportedFeature> forVersion(float kafkaVersion) {
  89. return Arrays.stream(SupportedFeature.values())
  90. .filter(f -> kafkaVersion >= f.sinceVersion)
  91. .collect(Collectors.toSet());
  92. }
  93. static Set<SupportedFeature> defaultFeatures() {
  94. return Set.of();
  95. }
  96. }
  97. @Value
  98. public static class ClusterDescription {
  99. @Nullable
  100. Node controller;
  101. String clusterId;
  102. Collection<Node> nodes;
  103. Set<AclOperation> authorizedOperations;
  104. }
  105. public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
  106. return getClusterVersion(adminClient)
  107. .map(ver ->
  108. new ReactiveAdminClient(
  109. adminClient,
  110. ver,
  111. getSupportedUpdateFeaturesForVersion(ver)));
  112. }
  113. private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
  114. try {
  115. float version = KafkaVersion.parse(versionStr);
  116. return SupportedFeature.forVersion(version);
  117. } catch (NumberFormatException e) {
  118. return SupportedFeature.defaultFeatures();
  119. }
  120. }
  121. // NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
  122. // (see MonoSink.success(..) javadoc for details)
  123. public static <T> Mono<T> toMono(KafkaFuture<T> future) {
  124. return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
  125. if (ex != null) {
  126. // KafkaFuture doc is unclear about what exception wrapper will be used
  127. // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
  128. if (ex instanceof CompletionException || ex instanceof ExecutionException) {
  129. sink.error(ex.getCause()); //unwrapping exception
  130. } else {
  131. sink.error(ex);
  132. }
  133. } else {
  134. sink.success(res);
  135. }
  136. })).doOnCancel(() -> future.cancel(true))
  137. // AdminClient is using single thread for kafka communication
  138. // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
  139. // If some of downstream operation are blocking (by mistake) this can lead to
  140. // other AdminClient's requests stucking, which can cause timeout exceptions.
  141. // So, we explicitly setting Scheduler for downstream processing.
  142. .publishOn(Schedulers.parallel());
  143. }
  144. //---------------------------------------------------------------------------------
  145. @Getter(AccessLevel.PACKAGE) // visible for testing
  146. private final AdminClient client;
  147. private final String version;
  148. private final Set<SupportedFeature> features;
  149. public Mono<Set<String>> listTopics(boolean listInternal) {
  150. return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
  151. }
  152. public Mono<Void> deleteTopic(String topicName) {
  153. return toMono(client.deleteTopics(List.of(topicName)).all());
  154. }
  155. public String getVersion() {
  156. return version;
  157. }
  158. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
  159. return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
  160. }
  161. //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
  162. //and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
  163. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
  164. var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
  165. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  166. return partitionCalls(
  167. topicNames,
  168. 200,
  169. part -> getTopicsConfigImpl(part, includeDocFixed),
  170. mapMerger()
  171. );
  172. }
  173. private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
  174. List<ConfigResource> resources = topicNames.stream()
  175. .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
  176. .collect(toList());
  177. return toMonoWithExceptionFilter(
  178. client.describeConfigs(
  179. resources,
  180. new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
  181. UnknownTopicOrPartitionException.class,
  182. TopicAuthorizationException.class
  183. ).map(config -> config.entrySet().stream()
  184. .collect(toMap(
  185. c -> c.getKey().name(),
  186. c -> List.copyOf(c.getValue().entries()))));
  187. }
  188. private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClient client, List<Integer> brokerIds) {
  189. List<ConfigResource> resources = brokerIds.stream()
  190. .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
  191. .collect(toList());
  192. return toMono(client.describeConfigs(resources).all())
  193. // some kafka backends don't support broker's configs retrieval,
  194. // and throw various exceptions on describeConfigs() call
  195. .onErrorResume(th -> th instanceof InvalidRequestException // MSK Serverless
  196. || th instanceof UnknownTopicOrPartitionException, // Azure event hub
  197. th -> {
  198. log.trace("Error while getting configs for brokers {}", brokerIds, th);
  199. return Mono.just(Map.of());
  200. })
  201. // there are situations when kafka-ui user has no DESCRIBE_CONFIGS permission on cluster
  202. .onErrorResume(ClusterAuthorizationException.class, th -> {
  203. log.trace("AuthorizationException while getting configs for brokers {}", brokerIds, th);
  204. return Mono.just(Map.of());
  205. })
  206. // catching all remaining exceptions, but logging on WARN level
  207. .onErrorResume(th -> true, th -> {
  208. log.warn("Unexpected error while getting configs for brokers {}", brokerIds, th);
  209. return Mono.just(Map.of());
  210. })
  211. .map(config -> config.entrySet().stream()
  212. .collect(toMap(
  213. c -> Integer.valueOf(c.getKey().name()),
  214. c -> new ArrayList<>(c.getValue().entries()))));
  215. }
  216. /**
  217. * Return per-broker configs or empty map if broker's configs retrieval not supported.
  218. */
  219. public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
  220. return loadBrokersConfig(client, brokerIds);
  221. }
  222. public Mono<Map<String, TopicDescription>> describeTopics() {
  223. return listTopics(true).flatMap(this::describeTopics);
  224. }
  225. public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
  226. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  227. return partitionCalls(
  228. topics,
  229. 200,
  230. this::describeTopicsImpl,
  231. mapMerger()
  232. );
  233. }
  234. private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
  235. return toMonoWithExceptionFilter(
  236. client.describeTopics(topics).topicNameValues(),
  237. UnknownTopicOrPartitionException.class,
  238. // we only describe topics that we see from listTopics() API, so we should have permission to do it,
  239. // but also adding this exception here for rare case when access restricted after we called listTopics()
  240. TopicAuthorizationException.class
  241. );
  242. }
  243. /**
  244. * Returns TopicDescription mono, or Empty Mono if topic not visible.
  245. */
  246. public Mono<TopicDescription> describeTopic(String topic) {
  247. return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic)));
  248. }
  249. /**
  250. * Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
  251. * logic resulting Mono will be failing if any of Futures finished with error.
  252. * In some situations it is not what we want, ex. we call describeTopics(List names) method and
  253. * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
  254. * such topics in resulting map.
  255. * <p/>
  256. * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
  257. * finished with <code>classes</code> exceptions and empty Monos.
  258. */
  259. @SafeVarargs
  260. static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
  261. Class<? extends KafkaException>... classes) {
  262. if (values.isEmpty()) {
  263. return Mono.just(Map.of());
  264. }
  265. List<Mono<Tuple2<K, Optional<V>>>> monos = values.entrySet().stream()
  266. .map(e ->
  267. toMono(e.getValue())
  268. .map(r -> Tuples.of(e.getKey(), Optional.of(r)))
  269. .defaultIfEmpty(Tuples.of(e.getKey(), Optional.empty())) //tracking empty Monos
  270. .onErrorResume(
  271. // tracking Monos with suppressible error
  272. th -> Stream.of(classes).anyMatch(clazz -> th.getClass().isAssignableFrom(clazz)),
  273. th -> Mono.just(Tuples.of(e.getKey(), Optional.empty()))))
  274. .toList();
  275. return Mono.zip(
  276. monos,
  277. resultsArr -> Stream.of(resultsArr)
  278. .map(obj -> (Tuple2<K, Optional<V>>) obj)
  279. .filter(t -> t.getT2().isPresent()) //skipping empty & suppressible-errors
  280. .collect(Collectors.toMap(Tuple2::getT1, t -> t.getT2().get()))
  281. );
  282. }
  283. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
  284. return describeCluster()
  285. .map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
  286. .flatMap(this::describeLogDirs);
  287. }
  288. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
  289. Collection<Integer> brokerIds) {
  290. return toMono(client.describeLogDirs(brokerIds).all())
  291. .onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of()))
  292. .onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of()))
  293. .onErrorResume(th -> true, th -> {
  294. log.warn("Error while calling describeLogDirs", th);
  295. return Mono.just(Map.of());
  296. });
  297. }
  298. public Mono<ClusterDescription> describeCluster() {
  299. return describeClusterImpl(client, features);
  300. }
  301. private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
  302. boolean includeAuthorizedOperations =
  303. features.contains(SupportedFeature.DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS);
  304. DescribeClusterResult result = client.describeCluster(
  305. new DescribeClusterOptions().includeAuthorizedOperations(includeAuthorizedOperations));
  306. var allOfFuture = KafkaFuture.allOf(
  307. result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
  308. return toMono(allOfFuture).then(
  309. Mono.fromCallable(() ->
  310. new ClusterDescription(
  311. result.controller().get(),
  312. result.clusterId().get(),
  313. result.nodes().get(),
  314. result.authorizedOperations().get()
  315. )
  316. )
  317. );
  318. }
  319. private static Mono<String> getClusterVersion(AdminClient client) {
  320. return describeClusterImpl(client, Set.of())
  321. // choosing node from which we will get configs (starting with controller)
  322. .flatMap(descr -> descr.controller != null
  323. ? Mono.just(descr.controller)
  324. : Mono.justOrEmpty(descr.nodes.stream().findFirst())
  325. )
  326. .flatMap(node -> loadBrokersConfig(client, List.of(node.id())))
  327. .flatMap(configs -> configs.values().stream()
  328. .flatMap(Collection::stream)
  329. .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
  330. .findFirst()
  331. .map(configEntry -> Mono.just(configEntry.value()))
  332. .orElse(Mono.empty()))
  333. .switchIfEmpty(Mono.just("1.0-UNKNOWN"));
  334. }
  335. public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
  336. return toMono(client.deleteConsumerGroups(groupIds).all())
  337. .onErrorResume(GroupIdNotFoundException.class,
  338. th -> Mono.error(new NotFoundException("The group id does not exist")))
  339. .onErrorResume(GroupNotEmptyException.class,
  340. th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
  341. }
  342. public Mono<Void> createTopic(String name,
  343. int numPartitions,
  344. @Nullable Integer replicationFactor,
  345. Map<String, String> configs) {
  346. var newTopic = new NewTopic(
  347. name,
  348. Optional.of(numPartitions),
  349. Optional.ofNullable(replicationFactor).map(Integer::shortValue)
  350. ).configs(configs);
  351. return toMono(client.createTopics(List.of(newTopic)).all());
  352. }
  353. public Mono<Void> alterPartitionReassignments(
  354. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
  355. return toMono(client.alterPartitionReassignments(reassignments).all());
  356. }
  357. public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
  358. return toMono(client.createPartitions(newPartitionsMap).all());
  359. }
  360. // NOTE: places whole current topic config with new one. Entries that were present in old config,
  361. // but missed in new will be set to default
  362. public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
  363. if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
  364. return getTopicsConfigImpl(List.of(topicName), false)
  365. .map(conf -> conf.getOrDefault(topicName, List.of()))
  366. .flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
  367. } else {
  368. return alterConfig(topicName, configs);
  369. }
  370. }
  371. public Mono<List<String>> listConsumerGroupNames() {
  372. return listConsumerGroups().map(lst -> lst.stream().map(ConsumerGroupListing::groupId).toList());
  373. }
  374. public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
  375. return toMono(client.listConsumerGroups().all());
  376. }
  377. public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
  378. return partitionCalls(
  379. groupIds,
  380. 25,
  381. 4,
  382. ids -> toMono(client.describeConsumerGroups(ids).all()),
  383. mapMerger()
  384. );
  385. }
  386. // group -> partition -> offset
  387. // NOTE: partitions with no committed offsets will be skipped
  388. public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<String> consumerGroups,
  389. // all partitions if null passed
  390. @Nullable List<TopicPartition> partitions) {
  391. Function<Collection<String>, Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>>> call =
  392. groups -> toMono(
  393. client.listConsumerGroupOffsets(
  394. groups.stream()
  395. .collect(Collectors.toMap(
  396. g -> g,
  397. g -> new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)
  398. ))).all()
  399. );
  400. Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
  401. consumerGroups,
  402. 25,
  403. 4,
  404. call,
  405. mapMerger()
  406. );
  407. return merged.map(map -> {
  408. var table = ImmutableTable.<String, TopicPartition, Long>builder();
  409. map.forEach((g, tpOffsets) -> tpOffsets.forEach((tp, offset) -> {
  410. if (offset != null) {
  411. // offset will be null for partitions that don't have committed offset for this group
  412. table.put(g, tp, offset.offset());
  413. }
  414. }));
  415. return table.build();
  416. });
  417. }
  418. public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
  419. return toMono(client.alterConsumerGroupOffsets(
  420. groupId,
  421. offsets.entrySet().stream()
  422. .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))))
  423. .all());
  424. }
  425. /**
  426. * List offset for the topic's partitions and OffsetSpec.
  427. *
  428. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  429. * false - skip partitions with no leader
  430. */
  431. public Mono<Map<TopicPartition, Long>> listTopicOffsets(String topic,
  432. OffsetSpec offsetSpec,
  433. boolean failOnUnknownLeader) {
  434. return describeTopic(topic)
  435. .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader))
  436. .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec));
  437. }
  438. /**
  439. * List offset for the specified partitions and OffsetSpec.
  440. *
  441. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  442. * false - skip partitions with no leader
  443. */
  444. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> partitions,
  445. OffsetSpec offsetSpec,
  446. boolean failOnUnknownLeader) {
  447. return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader)
  448. .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
  449. }
  450. /**
  451. * List offset for the specified topics, skipping no-leader partitions.
  452. */
  453. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
  454. OffsetSpec offsetSpec) {
  455. return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
  456. }
  457. private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
  458. boolean failOnUnknownLeader) {
  459. var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
  460. return describeTopicsImpl(targetTopics)
  461. .map(descriptions ->
  462. filterPartitionsWithLeaderCheck(
  463. descriptions.values(), partitions::contains, failOnUnknownLeader));
  464. }
  465. @VisibleForTesting
  466. static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
  467. Predicate<TopicPartition> partitionPredicate,
  468. boolean failOnUnknownLeader) {
  469. var goodPartitions = new HashSet<TopicPartition>();
  470. for (TopicDescription description : topicDescriptions) {
  471. var goodTopicPartitions = new ArrayList<TopicPartition>();
  472. for (TopicPartitionInfo partitionInfo : description.partitions()) {
  473. TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
  474. if (partitionInfo.leader() == null) {
  475. if (failOnUnknownLeader) {
  476. throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
  477. } else {
  478. // if ANY of topic partitions has no leader - we have to skip all topic partitions
  479. goodTopicPartitions.clear();
  480. break;
  481. }
  482. }
  483. if (partitionPredicate.test(topicPartition)) {
  484. goodTopicPartitions.add(topicPartition);
  485. }
  486. }
  487. goodPartitions.addAll(goodTopicPartitions);
  488. }
  489. return goodPartitions;
  490. }
  491. // 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
  492. // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
  493. // 2. NOTE(!): Skips partitions that were not initialized yet
  494. // (UnknownTopicOrPartitionException thrown, ex. after topic creation)
  495. // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
  496. @KafkaClientInternalsDependant
  497. @VisibleForTesting
  498. Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
  499. if (partitions.isEmpty()) {
  500. return Mono.just(Map.of());
  501. }
  502. Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
  503. parts -> {
  504. ListOffsetsResult r = client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec)));
  505. Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> perPartitionResults = new HashMap<>();
  506. parts.forEach(p -> perPartitionResults.put(p, r.partitionResult(p)));
  507. return toMonoWithExceptionFilter(perPartitionResults, UnknownTopicOrPartitionException.class)
  508. .map(offsets -> offsets.entrySet().stream()
  509. // filtering partitions for which offsets were not found
  510. .filter(e -> e.getValue().offset() >= 0)
  511. .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
  512. };
  513. return partitionCalls(
  514. partitions,
  515. 200,
  516. call,
  517. mapMerger()
  518. );
  519. }
  520. public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
  521. ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
  522. AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
  523. return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
  524. }
  525. public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
  526. var records = offsets.entrySet().stream()
  527. .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
  528. .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
  529. return toMono(client.deleteRecords(records).all());
  530. }
  531. public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
  532. return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
  533. }
  534. private Mono<Void> incrementalAlterConfig(String topicName,
  535. List<ConfigEntry> currentConfigs,
  536. Map<String, String> newConfigs) {
  537. var configsToDelete = currentConfigs.stream()
  538. .filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only
  539. .filter(e -> !newConfigs.containsKey(e.name()))
  540. .map(e -> new AlterConfigOp(e, AlterConfigOp.OpType.DELETE));
  541. var configsToSet = newConfigs.entrySet().stream()
  542. .map(e -> new AlterConfigOp(new ConfigEntry(e.getKey(), e.getValue()), AlterConfigOp.OpType.SET));
  543. return toMono(client.incrementalAlterConfigs(
  544. Map.of(
  545. new ConfigResource(ConfigResource.Type.TOPIC, topicName),
  546. Stream.concat(configsToDelete, configsToSet).toList()
  547. )).all());
  548. }
  549. @SuppressWarnings("deprecation")
  550. private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
  551. List<ConfigEntry> configEntries = configs.entrySet().stream()
  552. .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
  553. .collect(toList());
  554. Config config = new Config(configEntries);
  555. var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  556. return toMono(client.alterConfigs(Map.of(topicResource, config)).all());
  557. }
  558. /**
  559. * Splits input collection into batches, converts each batch into Mono, sequentially subscribes to them
  560. * and merges output Monos into one Mono.
  561. */
  562. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  563. int partitionSize,
  564. Function<Collection<I>, Mono<R>> call,
  565. BiFunction<R, R, R> merger) {
  566. if (items.isEmpty()) {
  567. return call.apply(items);
  568. }
  569. Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
  570. return Flux.fromIterable(parts)
  571. .concatMap(call)
  572. .reduce(merger);
  573. }
  574. /**
  575. * Splits input collection into batches, converts each batch into Mono, subscribes to them (concurrently,
  576. * with specified concurrency level) and merges output Monos into one Mono.
  577. */
  578. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  579. int partitionSize,
  580. int concurrency,
  581. Function<Collection<I>, Mono<R>> call,
  582. BiFunction<R, R, R> merger) {
  583. if (items.isEmpty()) {
  584. return call.apply(items);
  585. }
  586. Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
  587. return Flux.fromIterable(parts)
  588. .flatMap(call, concurrency)
  589. .reduce(merger);
  590. }
  591. private static <K, V> BiFunction<Map<K, V>, Map<K, V>, Map<K, V>> mapMerger() {
  592. return (m1, m2) -> {
  593. var merged = new HashMap<K, V>();
  594. merged.putAll(m1);
  595. merged.putAll(m2);
  596. return merged;
  597. };
  598. }
  599. @Override
  600. public void close() {
  601. client.close();
  602. }
  603. }