ReactiveAdminClient.java 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. package com.provectus.kafka.ui.service;
  2. import static java.util.stream.Collectors.toList;
  3. import static java.util.stream.Collectors.toMap;
  4. import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
  5. import com.google.common.annotations.VisibleForTesting;
  6. import com.google.common.base.Preconditions;
  7. import com.google.common.collect.ImmutableTable;
  8. import com.google.common.collect.Iterables;
  9. import com.google.common.collect.Table;
  10. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  11. import com.provectus.kafka.ui.exception.NotFoundException;
  12. import com.provectus.kafka.ui.exception.ValidationException;
  13. import com.provectus.kafka.ui.util.KafkaVersion;
  14. import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
  15. import java.io.Closeable;
  16. import java.util.ArrayList;
  17. import java.util.Collection;
  18. import java.util.HashMap;
  19. import java.util.HashSet;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.Optional;
  23. import java.util.Set;
  24. import java.util.concurrent.CompletionException;
  25. import java.util.concurrent.ExecutionException;
  26. import java.util.function.BiFunction;
  27. import java.util.function.Function;
  28. import java.util.function.Predicate;
  29. import java.util.stream.Collectors;
  30. import java.util.stream.Stream;
  31. import javax.annotation.Nullable;
  32. import lombok.AccessLevel;
  33. import lombok.AllArgsConstructor;
  34. import lombok.Builder;
  35. import lombok.Getter;
  36. import lombok.Value;
  37. import lombok.extern.slf4j.Slf4j;
  38. import org.apache.kafka.clients.admin.AdminClient;
  39. import org.apache.kafka.clients.admin.AlterConfigOp;
  40. import org.apache.kafka.clients.admin.Config;
  41. import org.apache.kafka.clients.admin.ConfigEntry;
  42. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  43. import org.apache.kafka.clients.admin.ConsumerGroupListing;
  44. import org.apache.kafka.clients.admin.DescribeClusterOptions;
  45. import org.apache.kafka.clients.admin.DescribeClusterResult;
  46. import org.apache.kafka.clients.admin.DescribeConfigsOptions;
  47. import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
  48. import org.apache.kafka.clients.admin.ListOffsetsResult;
  49. import org.apache.kafka.clients.admin.ListTopicsOptions;
  50. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  51. import org.apache.kafka.clients.admin.NewPartitions;
  52. import org.apache.kafka.clients.admin.NewTopic;
  53. import org.apache.kafka.clients.admin.OffsetSpec;
  54. import org.apache.kafka.clients.admin.RecordsToDelete;
  55. import org.apache.kafka.clients.admin.TopicDescription;
  56. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  57. import org.apache.kafka.common.KafkaException;
  58. import org.apache.kafka.common.KafkaFuture;
  59. import org.apache.kafka.common.Node;
  60. import org.apache.kafka.common.TopicPartition;
  61. import org.apache.kafka.common.TopicPartitionInfo;
  62. import org.apache.kafka.common.TopicPartitionReplica;
  63. import org.apache.kafka.common.acl.AccessControlEntryFilter;
  64. import org.apache.kafka.common.acl.AclBinding;
  65. import org.apache.kafka.common.acl.AclBindingFilter;
  66. import org.apache.kafka.common.acl.AclOperation;
  67. import org.apache.kafka.common.config.ConfigResource;
  68. import org.apache.kafka.common.errors.ClusterAuthorizationException;
  69. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  70. import org.apache.kafka.common.errors.GroupNotEmptyException;
  71. import org.apache.kafka.common.errors.InvalidRequestException;
  72. import org.apache.kafka.common.errors.SecurityDisabledException;
  73. import org.apache.kafka.common.errors.TopicAuthorizationException;
  74. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  75. import org.apache.kafka.common.errors.UnsupportedVersionException;
  76. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  77. import org.apache.kafka.common.resource.ResourcePatternFilter;
  78. import reactor.core.publisher.Flux;
  79. import reactor.core.publisher.Mono;
  80. import reactor.core.scheduler.Schedulers;
  81. import reactor.util.function.Tuple2;
  82. import reactor.util.function.Tuples;
  83. @Slf4j
  84. @AllArgsConstructor
  85. public class ReactiveAdminClient implements Closeable {
  86. public enum SupportedFeature {
  87. INCREMENTAL_ALTER_CONFIGS(2.3f),
  88. CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
  89. DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
  90. AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
  91. private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;
  92. SupportedFeature(BiFunction<AdminClient, Float, Mono<Boolean>> predicate) {
  93. this.predicate = predicate;
  94. }
  95. SupportedFeature(float fromVersion) {
  96. this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
  97. }
  98. static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersionStr) {
  99. @Nullable Float kafkaVersion = KafkaVersion.parse(kafkaVersionStr).orElse(null);
  100. return Flux.fromArray(SupportedFeature.values())
  101. .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
  102. .filter(Tuple2::getT2)
  103. .map(Tuple2::getT1)
  104. .collect(Collectors.toSet());
  105. }
  106. }
  107. @Value
  108. public static class ClusterDescription {
  109. @Nullable
  110. Node controller;
  111. String clusterId;
  112. Collection<Node> nodes;
  113. @Nullable // null, if ACL is disabled
  114. Set<AclOperation> authorizedOperations;
  115. }
  116. @Builder
  117. private record ConfigRelatedInfo(String version,
  118. Set<SupportedFeature> features,
  119. boolean topicDeletionIsAllowed) {
  120. private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
  121. return loadBrokersConfig(ac, List.of(controllerId))
  122. .map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
  123. .flatMap(configs -> {
  124. String version = "1.0-UNKNOWN";
  125. boolean topicDeletionEnabled = true;
  126. for (ConfigEntry entry : configs) {
  127. if (entry.name().contains("inter.broker.protocol.version")) {
  128. version = entry.value();
  129. }
  130. if (entry.name().equals("delete.topic.enable")) {
  131. topicDeletionEnabled = Boolean.parseBoolean(entry.value());
  132. }
  133. }
  134. var builder = ConfigRelatedInfo.builder()
  135. .version(version)
  136. .topicDeletionIsAllowed(topicDeletionEnabled);
  137. return SupportedFeature.forVersion(ac, version)
  138. .map(features -> builder.features(features).build());
  139. });
  140. }
  141. }
  142. public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
  143. return describeClusterImpl(adminClient, Set.of())
  144. // choosing node from which we will get configs (starting with controller)
  145. .flatMap(descr -> descr.controller != null
  146. ? Mono.just(descr.controller)
  147. : Mono.justOrEmpty(descr.nodes.stream().findFirst())
  148. )
  149. .flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
  150. .map(info -> new ReactiveAdminClient(adminClient, info));
  151. }
  152. private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
  153. return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
  154. .thenReturn(true)
  155. .doOnError(th -> !(th instanceof SecurityDisabledException)
  156. && !(th instanceof InvalidRequestException)
  157. && !(th instanceof UnsupportedVersionException),
  158. th -> log.warn("Error checking if security enabled", th))
  159. .onErrorReturn(false);
  160. }
  161. // NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
  162. // (see MonoSink.success(..) javadoc for details)
  163. public static <T> Mono<T> toMono(KafkaFuture<T> future) {
  164. return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
  165. if (ex != null) {
  166. // KafkaFuture doc is unclear about what exception wrapper will be used
  167. // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
  168. if (ex instanceof CompletionException || ex instanceof ExecutionException) {
  169. sink.error(ex.getCause()); //unwrapping exception
  170. } else {
  171. sink.error(ex);
  172. }
  173. } else {
  174. sink.success(res);
  175. }
  176. })).doOnCancel(() -> future.cancel(true))
  177. // AdminClient is using single thread for kafka communication
  178. // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
  179. // If some of downstream operation are blocking (by mistake) this can lead to
  180. // other AdminClient's requests stucking, which can cause timeout exceptions.
  181. // So, we explicitly setting Scheduler for downstream processing.
  182. .publishOn(Schedulers.parallel());
  183. }
  184. //---------------------------------------------------------------------------------
  185. @Getter(AccessLevel.PACKAGE) // visible for testing
  186. private final AdminClient client;
  187. private volatile ConfigRelatedInfo configRelatedInfo;
  188. public Set<SupportedFeature> getClusterFeatures() {
  189. return configRelatedInfo.features();
  190. }
  191. public Mono<Set<String>> listTopics(boolean listInternal) {
  192. return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
  193. }
  194. public Mono<Void> deleteTopic(String topicName) {
  195. return toMono(client.deleteTopics(List.of(topicName)).all());
  196. }
  197. public String getVersion() {
  198. return configRelatedInfo.version();
  199. }
  200. public boolean isTopicDeletionEnabled() {
  201. return configRelatedInfo.topicDeletionIsAllowed();
  202. }
  203. public Mono<Void> updateInternalStats(@Nullable Node controller) {
  204. if (controller == null) {
  205. return Mono.empty();
  206. }
  207. return ConfigRelatedInfo.extract(client, controller.id())
  208. .doOnNext(info -> this.configRelatedInfo = info)
  209. .then();
  210. }
  211. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
  212. return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
  213. }
  214. //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
  215. //and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
  216. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
  217. var includeDocFixed = includeDoc && getClusterFeatures().contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL);
  218. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  219. return partitionCalls(
  220. topicNames,
  221. 200,
  222. part -> getTopicsConfigImpl(part, includeDocFixed),
  223. mapMerger()
  224. );
  225. }
  226. private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
  227. List<ConfigResource> resources = topicNames.stream()
  228. .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
  229. .collect(toList());
  230. return toMonoWithExceptionFilter(
  231. client.describeConfigs(
  232. resources,
  233. new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
  234. UnknownTopicOrPartitionException.class,
  235. TopicAuthorizationException.class
  236. ).map(config -> config.entrySet().stream()
  237. .collect(toMap(
  238. c -> c.getKey().name(),
  239. c -> List.copyOf(c.getValue().entries()))));
  240. }
  241. private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClient client, List<Integer> brokerIds) {
  242. List<ConfigResource> resources = brokerIds.stream()
  243. .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
  244. .collect(toList());
  245. return toMono(client.describeConfigs(resources).all())
  246. // some kafka backends don't support broker's configs retrieval,
  247. // and throw various exceptions on describeConfigs() call
  248. .onErrorResume(th -> th instanceof InvalidRequestException // MSK Serverless
  249. || th instanceof UnknownTopicOrPartitionException, // Azure event hub
  250. th -> {
  251. log.trace("Error while getting configs for brokers {}", brokerIds, th);
  252. return Mono.just(Map.of());
  253. })
  254. // there are situations when kafka-ui user has no DESCRIBE_CONFIGS permission on cluster
  255. .onErrorResume(ClusterAuthorizationException.class, th -> {
  256. log.trace("AuthorizationException while getting configs for brokers {}", brokerIds, th);
  257. return Mono.just(Map.of());
  258. })
  259. // catching all remaining exceptions, but logging on WARN level
  260. .onErrorResume(th -> true, th -> {
  261. log.warn("Unexpected error while getting configs for brokers {}", brokerIds, th);
  262. return Mono.just(Map.of());
  263. })
  264. .map(config -> config.entrySet().stream()
  265. .collect(toMap(
  266. c -> Integer.valueOf(c.getKey().name()),
  267. c -> new ArrayList<>(c.getValue().entries()))));
  268. }
  269. /**
  270. * Return per-broker configs or empty map if broker's configs retrieval not supported.
  271. */
  272. public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
  273. return loadBrokersConfig(client, brokerIds);
  274. }
  275. public Mono<Map<String, TopicDescription>> describeTopics() {
  276. return listTopics(true).flatMap(this::describeTopics);
  277. }
  278. public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
  279. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  280. return partitionCalls(
  281. topics,
  282. 200,
  283. this::describeTopicsImpl,
  284. mapMerger()
  285. );
  286. }
  287. private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
  288. return toMonoWithExceptionFilter(
  289. client.describeTopics(topics).topicNameValues(),
  290. UnknownTopicOrPartitionException.class,
  291. // we only describe topics that we see from listTopics() API, so we should have permission to do it,
  292. // but also adding this exception here for rare case when access restricted after we called listTopics()
  293. TopicAuthorizationException.class
  294. );
  295. }
  296. /**
  297. * Returns TopicDescription mono, or Empty Mono if topic not visible.
  298. */
  299. public Mono<TopicDescription> describeTopic(String topic) {
  300. return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic)));
  301. }
  302. /**
  303. * Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
  304. * logic resulting Mono will be failing if any of Futures finished with error.
  305. * In some situations it is not what we want, ex. we call describeTopics(List names) method and
  306. * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
  307. * such topics in resulting map.
  308. * <p/>
  309. * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
  310. * finished with <code>classes</code> exceptions and empty Monos.
  311. */
  312. @SafeVarargs
  313. static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
  314. Class<? extends KafkaException>... classes) {
  315. if (values.isEmpty()) {
  316. return Mono.just(Map.of());
  317. }
  318. List<Mono<Tuple2<K, Optional<V>>>> monos = values.entrySet().stream()
  319. .map(e ->
  320. toMono(e.getValue())
  321. .map(r -> Tuples.of(e.getKey(), Optional.of(r)))
  322. .defaultIfEmpty(Tuples.of(e.getKey(), Optional.empty())) //tracking empty Monos
  323. .onErrorResume(
  324. // tracking Monos with suppressible error
  325. th -> Stream.of(classes).anyMatch(clazz -> th.getClass().isAssignableFrom(clazz)),
  326. th -> Mono.just(Tuples.of(e.getKey(), Optional.empty()))))
  327. .toList();
  328. return Mono.zip(
  329. monos,
  330. resultsArr -> Stream.of(resultsArr)
  331. .map(obj -> (Tuple2<K, Optional<V>>) obj)
  332. .filter(t -> t.getT2().isPresent()) //skipping empty & suppressible-errors
  333. .collect(Collectors.toMap(Tuple2::getT1, t -> t.getT2().get()))
  334. );
  335. }
  336. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
  337. return describeCluster()
  338. .map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
  339. .flatMap(this::describeLogDirs);
  340. }
  341. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
  342. Collection<Integer> brokerIds) {
  343. return toMono(client.describeLogDirs(brokerIds).all())
  344. .onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of()))
  345. .onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of()))
  346. .onErrorResume(th -> true, th -> {
  347. log.warn("Error while calling describeLogDirs", th);
  348. return Mono.just(Map.of());
  349. });
  350. }
  351. public Mono<ClusterDescription> describeCluster() {
  352. return describeClusterImpl(client, getClusterFeatures());
  353. }
  354. private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
  355. boolean includeAuthorizedOperations =
  356. features.contains(SupportedFeature.DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS);
  357. DescribeClusterResult result = client.describeCluster(
  358. new DescribeClusterOptions().includeAuthorizedOperations(includeAuthorizedOperations));
  359. var allOfFuture = KafkaFuture.allOf(
  360. result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
  361. return toMono(allOfFuture).then(
  362. Mono.fromCallable(() ->
  363. new ClusterDescription(
  364. result.controller().get(),
  365. result.clusterId().get(),
  366. result.nodes().get(),
  367. result.authorizedOperations().get()
  368. )
  369. )
  370. );
  371. }
  372. public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
  373. return toMono(client.deleteConsumerGroups(groupIds).all())
  374. .onErrorResume(GroupIdNotFoundException.class,
  375. th -> Mono.error(new NotFoundException("The group id does not exist")))
  376. .onErrorResume(GroupNotEmptyException.class,
  377. th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
  378. }
  379. public Mono<Void> createTopic(String name,
  380. int numPartitions,
  381. @Nullable Integer replicationFactor,
  382. Map<String, String> configs) {
  383. var newTopic = new NewTopic(
  384. name,
  385. Optional.of(numPartitions),
  386. Optional.ofNullable(replicationFactor).map(Integer::shortValue)
  387. ).configs(configs);
  388. return toMono(client.createTopics(List.of(newTopic)).all());
  389. }
  390. public Mono<Void> alterPartitionReassignments(
  391. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
  392. return toMono(client.alterPartitionReassignments(reassignments).all());
  393. }
  394. public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
  395. return toMono(client.createPartitions(newPartitionsMap).all());
  396. }
  397. // NOTE: places whole current topic config with new one. Entries that were present in old config,
  398. // but missed in new will be set to default
  399. public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
  400. if (getClusterFeatures().contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
  401. return getTopicsConfigImpl(List.of(topicName), false)
  402. .map(conf -> conf.getOrDefault(topicName, List.of()))
  403. .flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
  404. } else {
  405. return alterConfig(topicName, configs);
  406. }
  407. }
  408. public Mono<List<String>> listConsumerGroupNames() {
  409. return listConsumerGroups().map(lst -> lst.stream().map(ConsumerGroupListing::groupId).toList());
  410. }
  411. public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
  412. return toMono(client.listConsumerGroups().all());
  413. }
  414. public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
  415. return partitionCalls(
  416. groupIds,
  417. 25,
  418. 4,
  419. ids -> toMono(client.describeConsumerGroups(ids).all()),
  420. mapMerger()
  421. );
  422. }
  423. // group -> partition -> offset
  424. // NOTE: partitions with no committed offsets will be skipped
  425. public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<String> consumerGroups,
  426. // all partitions if null passed
  427. @Nullable List<TopicPartition> partitions) {
  428. Function<Collection<String>, Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>>> call =
  429. groups -> toMono(
  430. client.listConsumerGroupOffsets(
  431. groups.stream()
  432. .collect(Collectors.toMap(
  433. g -> g,
  434. g -> new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)
  435. ))).all()
  436. );
  437. Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
  438. consumerGroups,
  439. 25,
  440. 4,
  441. call,
  442. mapMerger()
  443. );
  444. return merged.map(map -> {
  445. var table = ImmutableTable.<String, TopicPartition, Long>builder();
  446. map.forEach((g, tpOffsets) -> tpOffsets.forEach((tp, offset) -> {
  447. if (offset != null) {
  448. // offset will be null for partitions that don't have committed offset for this group
  449. table.put(g, tp, offset.offset());
  450. }
  451. }));
  452. return table.build();
  453. });
  454. }
  455. public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
  456. return toMono(client.alterConsumerGroupOffsets(
  457. groupId,
  458. offsets.entrySet().stream()
  459. .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))))
  460. .all());
  461. }
  462. /**
  463. * List offset for the topic's partitions and OffsetSpec.
  464. *
  465. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  466. * false - skip partitions with no leader
  467. */
  468. public Mono<Map<TopicPartition, Long>> listTopicOffsets(String topic,
  469. OffsetSpec offsetSpec,
  470. boolean failOnUnknownLeader) {
  471. return describeTopic(topic)
  472. .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader))
  473. .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec));
  474. }
  475. /**
  476. * List offset for the specified partitions and OffsetSpec.
  477. *
  478. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  479. * false - skip partitions with no leader
  480. */
  481. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> partitions,
  482. OffsetSpec offsetSpec,
  483. boolean failOnUnknownLeader) {
  484. return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader)
  485. .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
  486. }
  487. /**
  488. * List offset for the specified topics, skipping no-leader partitions.
  489. */
  490. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
  491. OffsetSpec offsetSpec) {
  492. return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
  493. }
  494. private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
  495. boolean failOnUnknownLeader) {
  496. var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
  497. return describeTopicsImpl(targetTopics)
  498. .map(descriptions ->
  499. filterPartitionsWithLeaderCheck(
  500. descriptions.values(), partitions::contains, failOnUnknownLeader));
  501. }
  502. @VisibleForTesting
  503. static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
  504. Predicate<TopicPartition> partitionPredicate,
  505. boolean failOnUnknownLeader) {
  506. var goodPartitions = new HashSet<TopicPartition>();
  507. for (TopicDescription description : topicDescriptions) {
  508. var goodTopicPartitions = new ArrayList<TopicPartition>();
  509. for (TopicPartitionInfo partitionInfo : description.partitions()) {
  510. TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
  511. if (partitionInfo.leader() == null) {
  512. if (failOnUnknownLeader) {
  513. throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
  514. } else {
  515. // if ANY of topic partitions has no leader - we have to skip all topic partitions
  516. goodTopicPartitions.clear();
  517. break;
  518. }
  519. }
  520. if (partitionPredicate.test(topicPartition)) {
  521. goodTopicPartitions.add(topicPartition);
  522. }
  523. }
  524. goodPartitions.addAll(goodTopicPartitions);
  525. }
  526. return goodPartitions;
  527. }
  528. // 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
  529. // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
  530. // 2. NOTE(!): Skips partitions that were not initialized yet
  531. // (UnknownTopicOrPartitionException thrown, ex. after topic creation)
  532. // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
  533. @KafkaClientInternalsDependant
  534. @VisibleForTesting
  535. Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
  536. if (partitions.isEmpty()) {
  537. return Mono.just(Map.of());
  538. }
  539. Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
  540. parts -> {
  541. ListOffsetsResult r = client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec)));
  542. Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> perPartitionResults = new HashMap<>();
  543. parts.forEach(p -> perPartitionResults.put(p, r.partitionResult(p)));
  544. return toMonoWithExceptionFilter(perPartitionResults, UnknownTopicOrPartitionException.class)
  545. .map(offsets -> offsets.entrySet().stream()
  546. // filtering partitions for which offsets were not found
  547. .filter(e -> e.getValue().offset() >= 0)
  548. .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
  549. };
  550. return partitionCalls(
  551. partitions,
  552. 200,
  553. call,
  554. mapMerger()
  555. );
  556. }
  557. public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
  558. Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
  559. return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
  560. }
  561. public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
  562. Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
  563. return toMono(client.createAcls(aclBindings).all());
  564. }
  565. public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
  566. Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
  567. var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
  568. return toMono(client.deleteAcls(filters).all()).then();
  569. }
  570. public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
  571. ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
  572. AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
  573. return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
  574. }
  575. public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
  576. var records = offsets.entrySet().stream()
  577. .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
  578. .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
  579. return toMono(client.deleteRecords(records).all());
  580. }
  581. public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
  582. return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
  583. }
  584. private Mono<Void> incrementalAlterConfig(String topicName,
  585. List<ConfigEntry> currentConfigs,
  586. Map<String, String> newConfigs) {
  587. var configsToDelete = currentConfigs.stream()
  588. .filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only
  589. .filter(e -> !newConfigs.containsKey(e.name()))
  590. .map(e -> new AlterConfigOp(e, AlterConfigOp.OpType.DELETE));
  591. var configsToSet = newConfigs.entrySet().stream()
  592. .map(e -> new AlterConfigOp(new ConfigEntry(e.getKey(), e.getValue()), AlterConfigOp.OpType.SET));
  593. return toMono(client.incrementalAlterConfigs(
  594. Map.of(
  595. new ConfigResource(ConfigResource.Type.TOPIC, topicName),
  596. Stream.concat(configsToDelete, configsToSet).toList()
  597. )).all());
  598. }
  599. @SuppressWarnings("deprecation")
  600. private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
  601. List<ConfigEntry> configEntries = configs.entrySet().stream()
  602. .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
  603. .collect(toList());
  604. Config config = new Config(configEntries);
  605. var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  606. return toMono(client.alterConfigs(Map.of(topicResource, config)).all());
  607. }
  608. /**
  609. * Splits input collection into batches, converts each batch into Mono, sequentially subscribes to them
  610. * and merges output Monos into one Mono.
  611. */
  612. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  613. int partitionSize,
  614. Function<Collection<I>, Mono<R>> call,
  615. BiFunction<R, R, R> merger) {
  616. if (items.isEmpty()) {
  617. return call.apply(items);
  618. }
  619. Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
  620. return Flux.fromIterable(parts)
  621. .concatMap(call)
  622. .reduce(merger);
  623. }
  624. /**
  625. * Splits input collection into batches, converts each batch into Mono, subscribes to them (concurrently,
  626. * with specified concurrency level) and merges output Monos into one Mono.
  627. */
  628. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  629. int partitionSize,
  630. int concurrency,
  631. Function<Collection<I>, Mono<R>> call,
  632. BiFunction<R, R, R> merger) {
  633. if (items.isEmpty()) {
  634. return call.apply(items);
  635. }
  636. Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
  637. return Flux.fromIterable(parts)
  638. .flatMap(call, concurrency)
  639. .reduce(merger);
  640. }
  641. private static <K, V> BiFunction<Map<K, V>, Map<K, V>, Map<K, V>> mapMerger() {
  642. return (m1, m2) -> {
  643. var merged = new HashMap<K, V>();
  644. merged.putAll(m1);
  645. merged.putAll(m2);
  646. return merged;
  647. };
  648. }
  649. @Override
  650. public void close() {
  651. client.close();
  652. }
  653. }