ReactiveAdminClient.java 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. package com.provectus.kafka.ui.service;
  2. import static java.util.stream.Collectors.toList;
  3. import static java.util.stream.Collectors.toMap;
  4. import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
  5. import com.google.common.annotations.VisibleForTesting;
  6. import com.google.common.base.Preconditions;
  7. import com.google.common.collect.ImmutableTable;
  8. import com.google.common.collect.Iterables;
  9. import com.google.common.collect.Table;
  10. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  11. import com.provectus.kafka.ui.exception.NotFoundException;
  12. import com.provectus.kafka.ui.exception.ValidationException;
  13. import com.provectus.kafka.ui.util.KafkaVersion;
  14. import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
  15. import java.io.Closeable;
  16. import java.time.Duration;
  17. import java.time.temporal.ChronoUnit;
  18. import java.util.ArrayList;
  19. import java.util.Collection;
  20. import java.util.HashMap;
  21. import java.util.HashSet;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.Optional;
  25. import java.util.Set;
  26. import java.util.concurrent.CompletionException;
  27. import java.util.concurrent.ExecutionException;
  28. import java.util.function.BiFunction;
  29. import java.util.function.Function;
  30. import java.util.function.Predicate;
  31. import java.util.stream.Collectors;
  32. import java.util.stream.IntStream;
  33. import java.util.stream.Stream;
  34. import javax.annotation.Nullable;
  35. import lombok.AccessLevel;
  36. import lombok.AllArgsConstructor;
  37. import lombok.Builder;
  38. import lombok.Getter;
  39. import lombok.Value;
  40. import lombok.extern.slf4j.Slf4j;
  41. import org.apache.kafka.clients.admin.AdminClient;
  42. import org.apache.kafka.clients.admin.AlterConfigOp;
  43. import org.apache.kafka.clients.admin.Config;
  44. import org.apache.kafka.clients.admin.ConfigEntry;
  45. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  46. import org.apache.kafka.clients.admin.ConsumerGroupListing;
  47. import org.apache.kafka.clients.admin.DescribeClusterOptions;
  48. import org.apache.kafka.clients.admin.DescribeClusterResult;
  49. import org.apache.kafka.clients.admin.DescribeConfigsOptions;
  50. import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
  51. import org.apache.kafka.clients.admin.ListOffsetsResult;
  52. import org.apache.kafka.clients.admin.ListTopicsOptions;
  53. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  54. import org.apache.kafka.clients.admin.NewPartitions;
  55. import org.apache.kafka.clients.admin.NewTopic;
  56. import org.apache.kafka.clients.admin.OffsetSpec;
  57. import org.apache.kafka.clients.admin.ProducerState;
  58. import org.apache.kafka.clients.admin.RecordsToDelete;
  59. import org.apache.kafka.clients.admin.TopicDescription;
  60. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  61. import org.apache.kafka.common.KafkaException;
  62. import org.apache.kafka.common.KafkaFuture;
  63. import org.apache.kafka.common.Node;
  64. import org.apache.kafka.common.TopicPartition;
  65. import org.apache.kafka.common.TopicPartitionInfo;
  66. import org.apache.kafka.common.TopicPartitionReplica;
  67. import org.apache.kafka.common.acl.AccessControlEntryFilter;
  68. import org.apache.kafka.common.acl.AclBinding;
  69. import org.apache.kafka.common.acl.AclBindingFilter;
  70. import org.apache.kafka.common.acl.AclOperation;
  71. import org.apache.kafka.common.config.ConfigResource;
  72. import org.apache.kafka.common.errors.ClusterAuthorizationException;
  73. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  74. import org.apache.kafka.common.errors.GroupNotEmptyException;
  75. import org.apache.kafka.common.errors.InvalidRequestException;
  76. import org.apache.kafka.common.errors.SecurityDisabledException;
  77. import org.apache.kafka.common.errors.TopicAuthorizationException;
  78. import org.apache.kafka.common.errors.UnknownServerException;
  79. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  80. import org.apache.kafka.common.errors.UnsupportedVersionException;
  81. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  82. import org.apache.kafka.common.resource.ResourcePatternFilter;
  83. import reactor.core.publisher.Flux;
  84. import reactor.core.publisher.Mono;
  85. import reactor.core.scheduler.Schedulers;
  86. import reactor.util.function.Tuple2;
  87. import reactor.util.function.Tuples;
  88. @Slf4j
  89. @AllArgsConstructor
  90. public class ReactiveAdminClient implements Closeable {
  91. public enum SupportedFeature {
  92. INCREMENTAL_ALTER_CONFIGS(2.3f),
  93. CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
  94. DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
  95. AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
  96. private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;
  97. SupportedFeature(BiFunction<AdminClient, Float, Mono<Boolean>> predicate) {
  98. this.predicate = predicate;
  99. }
  100. SupportedFeature(float fromVersion) {
  101. this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
  102. }
  103. static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersionStr) {
  104. @Nullable Float kafkaVersion = KafkaVersion.parse(kafkaVersionStr).orElse(null);
  105. return Flux.fromArray(SupportedFeature.values())
  106. .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
  107. .filter(Tuple2::getT2)
  108. .map(Tuple2::getT1)
  109. .collect(Collectors.toSet());
  110. }
  111. }
  112. @Value
  113. public static class ClusterDescription {
  114. @Nullable
  115. Node controller;
  116. String clusterId;
  117. Collection<Node> nodes;
  118. @Nullable // null, if ACL is disabled
  119. Set<AclOperation> authorizedOperations;
  120. }
  121. @Builder
  122. private record ConfigRelatedInfo(String version,
  123. Set<SupportedFeature> features,
  124. boolean topicDeletionIsAllowed) {
  125. static final Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS);
  126. private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
  127. return ReactiveAdminClient.describeClusterImpl(ac, Set.of())
  128. .flatMap(desc -> {
  129. // choosing node from which we will get configs (starting with controller)
  130. var targetNodeId = Optional.ofNullable(desc.controller)
  131. .map(Node::id)
  132. .orElse(desc.getNodes().iterator().next().id());
  133. return loadBrokersConfig(ac, List.of(targetNodeId))
  134. .map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
  135. .flatMap(configs -> {
  136. String version = "1.0-UNKNOWN";
  137. boolean topicDeletionEnabled = true;
  138. for (ConfigEntry entry : configs) {
  139. if (entry.name().contains("inter.broker.protocol.version")) {
  140. version = entry.value();
  141. }
  142. if (entry.name().equals("delete.topic.enable")) {
  143. topicDeletionEnabled = Boolean.parseBoolean(entry.value());
  144. }
  145. }
  146. final String finalVersion = version;
  147. final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
  148. return SupportedFeature.forVersion(ac, version)
  149. .map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
  150. });
  151. })
  152. .cache(UPDATE_DURATION);
  153. }
  154. }
  155. public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
  156. Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
  157. return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
  158. }
  159. private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
  160. return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
  161. .thenReturn(true)
  162. .doOnError(th -> !(th instanceof SecurityDisabledException)
  163. && !(th instanceof InvalidRequestException)
  164. && !(th instanceof UnsupportedVersionException),
  165. th -> log.debug("Error checking if security enabled", th))
  166. .onErrorReturn(false);
  167. }
  168. // NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
  169. // (see MonoSink.success(..) javadoc for details)
  170. public static <T> Mono<T> toMono(KafkaFuture<T> future) {
  171. return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
  172. if (ex != null) {
  173. // KafkaFuture doc is unclear about what exception wrapper will be used
  174. // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
  175. if (ex instanceof CompletionException || ex instanceof ExecutionException) {
  176. sink.error(ex.getCause()); //unwrapping exception
  177. } else {
  178. sink.error(ex);
  179. }
  180. } else {
  181. sink.success(res);
  182. }
  183. })).doOnCancel(() -> future.cancel(true))
  184. // AdminClient is using single thread for kafka communication
  185. // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
  186. // If some of downstream operation are blocking (by mistake) this can lead to
  187. // other AdminClient's requests stucking, which can cause timeout exceptions.
  188. // So, we explicitly setting Scheduler for downstream processing.
  189. .publishOn(Schedulers.parallel());
  190. }
  191. //---------------------------------------------------------------------------------
  192. @Getter(AccessLevel.PACKAGE) // visible for testing
  193. private final AdminClient client;
  194. private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
  195. private volatile ConfigRelatedInfo configRelatedInfo;
  196. public Set<SupportedFeature> getClusterFeatures() {
  197. return configRelatedInfo.features();
  198. }
  199. public Mono<Set<String>> listTopics(boolean listInternal) {
  200. return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
  201. }
  202. public Mono<Void> deleteTopic(String topicName) {
  203. return toMono(client.deleteTopics(List.of(topicName)).all());
  204. }
  205. public String getVersion() {
  206. return configRelatedInfo.version();
  207. }
  208. public boolean isTopicDeletionEnabled() {
  209. return configRelatedInfo.topicDeletionIsAllowed();
  210. }
  211. public Mono<Void> updateInternalStats(@Nullable Node controller) {
  212. if (controller == null) {
  213. return Mono.empty();
  214. }
  215. return configRelatedInfoMono
  216. .doOnNext(info -> this.configRelatedInfo = info)
  217. .then();
  218. }
  219. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
  220. return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
  221. }
  222. //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
  223. //and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
  224. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
  225. var includeDocFixed = includeDoc && getClusterFeatures().contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL);
  226. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  227. return partitionCalls(
  228. topicNames,
  229. 200,
  230. part -> getTopicsConfigImpl(part, includeDocFixed),
  231. mapMerger()
  232. );
  233. }
  234. private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
  235. List<ConfigResource> resources = topicNames.stream()
  236. .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
  237. .collect(toList());
  238. return toMonoWithExceptionFilter(
  239. client.describeConfigs(
  240. resources,
  241. new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
  242. UnknownTopicOrPartitionException.class,
  243. // Azure Event Hubs does not support describeConfigs API for topics, do we supress corresponding error.
  244. // See https://github.com/Azure/azure-event-hubs-for-kafka/issues/61 for details.
  245. UnknownServerException.class,
  246. TopicAuthorizationException.class
  247. ).map(config -> config.entrySet().stream()
  248. .collect(toMap(
  249. c -> c.getKey().name(),
  250. c -> List.copyOf(c.getValue().entries()))));
  251. }
  252. private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClient client, List<Integer> brokerIds) {
  253. List<ConfigResource> resources = brokerIds.stream()
  254. .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
  255. .collect(toList());
  256. return toMono(client.describeConfigs(resources).all())
  257. // some kafka backends don't support broker's configs retrieval,
  258. // and throw various exceptions on describeConfigs() call
  259. .onErrorResume(th -> th instanceof InvalidRequestException // MSK Serverless
  260. || th instanceof UnknownTopicOrPartitionException, // Azure event hub
  261. th -> {
  262. log.trace("Error while getting configs for brokers {}", brokerIds, th);
  263. return Mono.just(Map.of());
  264. })
  265. // there are situations when kafka-ui user has no DESCRIBE_CONFIGS permission on cluster
  266. .onErrorResume(ClusterAuthorizationException.class, th -> {
  267. log.trace("AuthorizationException while getting configs for brokers {}", brokerIds, th);
  268. return Mono.just(Map.of());
  269. })
  270. // catching all remaining exceptions, but logging on WARN level
  271. .onErrorResume(th -> true, th -> {
  272. log.warn("Unexpected error while getting configs for brokers {}", brokerIds, th);
  273. return Mono.just(Map.of());
  274. })
  275. .map(config -> config.entrySet().stream()
  276. .collect(toMap(
  277. c -> Integer.valueOf(c.getKey().name()),
  278. c -> new ArrayList<>(c.getValue().entries()))));
  279. }
  280. /**
  281. * Return per-broker configs or empty map if broker's configs retrieval not supported.
  282. */
  283. public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
  284. return loadBrokersConfig(client, brokerIds);
  285. }
  286. public Mono<Map<String, TopicDescription>> describeTopics() {
  287. return listTopics(true).flatMap(this::describeTopics);
  288. }
  289. public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
  290. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  291. return partitionCalls(
  292. topics,
  293. 200,
  294. this::describeTopicsImpl,
  295. mapMerger()
  296. );
  297. }
  298. private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
  299. return toMonoWithExceptionFilter(
  300. client.describeTopics(topics).topicNameValues(),
  301. UnknownTopicOrPartitionException.class,
  302. // we only describe topics that we see from listTopics() API, so we should have permission to do it,
  303. // but also adding this exception here for rare case when access restricted after we called listTopics()
  304. TopicAuthorizationException.class
  305. );
  306. }
  307. /**
  308. * Returns TopicDescription mono, or Empty Mono if topic not visible.
  309. */
  310. public Mono<TopicDescription> describeTopic(String topic) {
  311. return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic)));
  312. }
  313. /**
  314. * Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
  315. * logic resulting Mono will be failing if any of Futures finished with error.
  316. * In some situations it is not what we want, ex. we call describeTopics(List names) method and
  317. * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
  318. * such topics in resulting map.
  319. * <p/>
  320. * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
  321. * finished with <code>classes</code> exceptions and empty Monos.
  322. */
  323. @SafeVarargs
  324. static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
  325. Class<? extends KafkaException>... classes) {
  326. if (values.isEmpty()) {
  327. return Mono.just(Map.of());
  328. }
  329. List<Mono<Tuple2<K, Optional<V>>>> monos = values.entrySet().stream()
  330. .map(e ->
  331. toMono(e.getValue())
  332. .map(r -> Tuples.of(e.getKey(), Optional.of(r)))
  333. .defaultIfEmpty(Tuples.of(e.getKey(), Optional.empty())) //tracking empty Monos
  334. .onErrorResume(
  335. // tracking Monos with suppressible error
  336. th -> Stream.of(classes).anyMatch(clazz -> th.getClass().isAssignableFrom(clazz)),
  337. th -> Mono.just(Tuples.of(e.getKey(), Optional.empty()))))
  338. .toList();
  339. return Mono.zip(
  340. monos,
  341. resultsArr -> Stream.of(resultsArr)
  342. .map(obj -> (Tuple2<K, Optional<V>>) obj)
  343. .filter(t -> t.getT2().isPresent()) //skipping empty & suppressible-errors
  344. .collect(Collectors.toMap(Tuple2::getT1, t -> t.getT2().get()))
  345. );
  346. }
  347. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
  348. return describeCluster()
  349. .map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
  350. .flatMap(this::describeLogDirs);
  351. }
  352. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
  353. Collection<Integer> brokerIds) {
  354. return toMono(client.describeLogDirs(brokerIds).all())
  355. .onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of()))
  356. .onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of()))
  357. .onErrorResume(th -> true, th -> {
  358. log.warn("Error while calling describeLogDirs", th);
  359. return Mono.just(Map.of());
  360. });
  361. }
  362. public Mono<ClusterDescription> describeCluster() {
  363. return describeClusterImpl(client, getClusterFeatures());
  364. }
  365. private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
  366. boolean includeAuthorizedOperations =
  367. features.contains(SupportedFeature.DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS);
  368. DescribeClusterResult result = client.describeCluster(
  369. new DescribeClusterOptions().includeAuthorizedOperations(includeAuthorizedOperations));
  370. var allOfFuture = KafkaFuture.allOf(
  371. result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
  372. return toMono(allOfFuture).then(
  373. Mono.fromCallable(() ->
  374. new ClusterDescription(
  375. result.controller().get(),
  376. result.clusterId().get(),
  377. result.nodes().get(),
  378. result.authorizedOperations().get()
  379. )
  380. )
  381. );
  382. }
  383. public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
  384. return toMono(client.deleteConsumerGroups(groupIds).all())
  385. .onErrorResume(GroupIdNotFoundException.class,
  386. th -> Mono.error(new NotFoundException("The group id does not exist")))
  387. .onErrorResume(GroupNotEmptyException.class,
  388. th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
  389. }
  390. public Mono<Void> createTopic(String name,
  391. int numPartitions,
  392. @Nullable Integer replicationFactor,
  393. Map<String, String> configs) {
  394. var newTopic = new NewTopic(
  395. name,
  396. Optional.of(numPartitions),
  397. Optional.ofNullable(replicationFactor).map(Integer::shortValue)
  398. ).configs(configs);
  399. return toMono(client.createTopics(List.of(newTopic)).all());
  400. }
  401. public Mono<Void> alterPartitionReassignments(
  402. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
  403. return toMono(client.alterPartitionReassignments(reassignments).all());
  404. }
  405. public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
  406. return toMono(client.createPartitions(newPartitionsMap).all());
  407. }
  408. // NOTE: places whole current topic config with new one. Entries that were present in old config,
  409. // but missed in new will be set to default
  410. public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
  411. if (getClusterFeatures().contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
  412. return getTopicsConfigImpl(List.of(topicName), false)
  413. .map(conf -> conf.getOrDefault(topicName, List.of()))
  414. .flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
  415. } else {
  416. return alterConfig(topicName, configs);
  417. }
  418. }
  419. public Mono<List<String>> listConsumerGroupNames() {
  420. return listConsumerGroups().map(lst -> lst.stream().map(ConsumerGroupListing::groupId).toList());
  421. }
  422. public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
  423. return toMono(client.listConsumerGroups().all());
  424. }
  425. public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
  426. return partitionCalls(
  427. groupIds,
  428. 25,
  429. 4,
  430. ids -> toMono(client.describeConsumerGroups(ids).all()),
  431. mapMerger()
  432. );
  433. }
  434. // group -> partition -> offset
  435. // NOTE: partitions with no committed offsets will be skipped
  436. public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<String> consumerGroups,
  437. // all partitions if null passed
  438. @Nullable List<TopicPartition> partitions) {
  439. Function<Collection<String>, Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>>> call =
  440. groups -> toMono(
  441. client.listConsumerGroupOffsets(
  442. groups.stream()
  443. .collect(Collectors.toMap(
  444. g -> g,
  445. g -> new ListConsumerGroupOffsetsSpec().topicPartitions(partitions)
  446. ))).all()
  447. );
  448. Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
  449. consumerGroups,
  450. 25,
  451. 4,
  452. call,
  453. mapMerger()
  454. );
  455. return merged.map(map -> {
  456. var table = ImmutableTable.<String, TopicPartition, Long>builder();
  457. map.forEach((g, tpOffsets) -> tpOffsets.forEach((tp, offset) -> {
  458. if (offset != null) {
  459. // offset will be null for partitions that don't have committed offset for this group
  460. table.put(g, tp, offset.offset());
  461. }
  462. }));
  463. return table.build();
  464. });
  465. }
  466. public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
  467. return toMono(client.alterConsumerGroupOffsets(
  468. groupId,
  469. offsets.entrySet().stream()
  470. .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))))
  471. .all());
  472. }
  473. /**
  474. * List offset for the topic's partitions and OffsetSpec.
  475. *
  476. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  477. * false - skip partitions with no leader
  478. */
  479. public Mono<Map<TopicPartition, Long>> listTopicOffsets(String topic,
  480. OffsetSpec offsetSpec,
  481. boolean failOnUnknownLeader) {
  482. return describeTopic(topic)
  483. .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader))
  484. .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec));
  485. }
  486. /**
  487. * List offset for the specified partitions and OffsetSpec.
  488. *
  489. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  490. * false - skip partitions with no leader
  491. */
  492. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> partitions,
  493. OffsetSpec offsetSpec,
  494. boolean failOnUnknownLeader) {
  495. return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader)
  496. .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
  497. }
  498. /**
  499. * List offset for the specified topics, skipping no-leader partitions.
  500. */
  501. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
  502. OffsetSpec offsetSpec) {
  503. return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
  504. }
  505. private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
  506. boolean failOnUnknownLeader) {
  507. var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
  508. return describeTopicsImpl(targetTopics)
  509. .map(descriptions ->
  510. filterPartitionsWithLeaderCheck(
  511. descriptions.values(), partitions::contains, failOnUnknownLeader));
  512. }
  513. @VisibleForTesting
  514. static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
  515. Predicate<TopicPartition> partitionPredicate,
  516. boolean failOnUnknownLeader) {
  517. var goodPartitions = new HashSet<TopicPartition>();
  518. for (TopicDescription description : topicDescriptions) {
  519. var goodTopicPartitions = new ArrayList<TopicPartition>();
  520. for (TopicPartitionInfo partitionInfo : description.partitions()) {
  521. TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
  522. if (partitionInfo.leader() == null) {
  523. if (failOnUnknownLeader) {
  524. throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
  525. } else {
  526. // if ANY of topic partitions has no leader - we have to skip all topic partitions
  527. goodTopicPartitions.clear();
  528. break;
  529. }
  530. }
  531. if (partitionPredicate.test(topicPartition)) {
  532. goodTopicPartitions.add(topicPartition);
  533. }
  534. }
  535. goodPartitions.addAll(goodTopicPartitions);
  536. }
  537. return goodPartitions;
  538. }
  539. // 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
  540. // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
  541. // 2. NOTE(!): Skips partitions that were not initialized yet
  542. // (UnknownTopicOrPartitionException thrown, ex. after topic creation)
  543. // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
  544. @KafkaClientInternalsDependant
  545. @VisibleForTesting
  546. Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
  547. if (partitions.isEmpty()) {
  548. return Mono.just(Map.of());
  549. }
  550. Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
  551. parts -> {
  552. ListOffsetsResult r = client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec)));
  553. Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> perPartitionResults = new HashMap<>();
  554. parts.forEach(p -> perPartitionResults.put(p, r.partitionResult(p)));
  555. return toMonoWithExceptionFilter(perPartitionResults, UnknownTopicOrPartitionException.class)
  556. .map(offsets -> offsets.entrySet().stream()
  557. // filtering partitions for which offsets were not found
  558. .filter(e -> e.getValue().offset() >= 0)
  559. .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
  560. };
  561. return partitionCalls(
  562. partitions,
  563. 200,
  564. call,
  565. mapMerger()
  566. );
  567. }
  568. public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
  569. Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
  570. return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
  571. }
  572. public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
  573. Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
  574. return toMono(client.createAcls(aclBindings).all());
  575. }
  576. public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
  577. Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
  578. var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
  579. return toMono(client.deleteAcls(filters).all()).then();
  580. }
  581. public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
  582. ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
  583. AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
  584. return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
  585. }
  586. public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
  587. var records = offsets.entrySet().stream()
  588. .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
  589. .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
  590. return toMono(client.deleteRecords(records).all());
  591. }
  592. public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
  593. return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
  594. }
  595. // returns tp -> list of active producer's states (if any)
  596. public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
  597. return describeTopic(topic)
  598. .map(td -> client.describeProducers(
  599. IntStream.range(0, td.partitions().size())
  600. .mapToObj(i -> new TopicPartition(topic, i))
  601. .toList()
  602. ).all()
  603. )
  604. .flatMap(ReactiveAdminClient::toMono)
  605. .map(map -> map.entrySet().stream()
  606. .filter(e -> !e.getValue().activeProducers().isEmpty()) // skipping partitions without producers
  607. .collect(toMap(Map.Entry::getKey, e -> e.getValue().activeProducers())));
  608. }
  609. private Mono<Void> incrementalAlterConfig(String topicName,
  610. List<ConfigEntry> currentConfigs,
  611. Map<String, String> newConfigs) {
  612. var configsToDelete = currentConfigs.stream()
  613. .filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only
  614. .filter(e -> !newConfigs.containsKey(e.name()))
  615. .map(e -> new AlterConfigOp(e, AlterConfigOp.OpType.DELETE));
  616. var configsToSet = newConfigs.entrySet().stream()
  617. .map(e -> new AlterConfigOp(new ConfigEntry(e.getKey(), e.getValue()), AlterConfigOp.OpType.SET));
  618. return toMono(client.incrementalAlterConfigs(
  619. Map.of(
  620. new ConfigResource(ConfigResource.Type.TOPIC, topicName),
  621. Stream.concat(configsToDelete, configsToSet).toList()
  622. )).all());
  623. }
  624. @SuppressWarnings("deprecation")
  625. private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
  626. List<ConfigEntry> configEntries = configs.entrySet().stream()
  627. .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
  628. .collect(toList());
  629. Config config = new Config(configEntries);
  630. var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  631. return toMono(client.alterConfigs(Map.of(topicResource, config)).all());
  632. }
  633. /**
  634. * Splits input collection into batches, converts each batch into Mono, sequentially subscribes to them
  635. * and merges output Monos into one Mono.
  636. */
  637. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  638. int partitionSize,
  639. Function<Collection<I>, Mono<R>> call,
  640. BiFunction<R, R, R> merger) {
  641. if (items.isEmpty()) {
  642. return call.apply(items);
  643. }
  644. Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
  645. return Flux.fromIterable(parts)
  646. .concatMap(call)
  647. .reduce(merger);
  648. }
  649. /**
  650. * Splits input collection into batches, converts each batch into Mono, subscribes to them (concurrently,
  651. * with specified concurrency level) and merges output Monos into one Mono.
  652. */
  653. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  654. int partitionSize,
  655. int concurrency,
  656. Function<Collection<I>, Mono<R>> call,
  657. BiFunction<R, R, R> merger) {
  658. if (items.isEmpty()) {
  659. return call.apply(items);
  660. }
  661. Iterable<List<I>> parts = Iterables.partition(items, partitionSize);
  662. return Flux.fromIterable(parts)
  663. .flatMap(call, concurrency)
  664. .reduce(merger);
  665. }
  666. private static <K, V> BiFunction<Map<K, V>, Map<K, V>, Map<K, V>> mapMerger() {
  667. return (m1, m2) -> {
  668. var merged = new HashMap<K, V>();
  669. merged.putAll(m1);
  670. merged.putAll(m2);
  671. return merged;
  672. };
  673. }
  674. @Override
  675. public void close() {
  676. client.close();
  677. }
  678. }