ReactiveAdminClient.java 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. package com.provectus.kafka.ui.service;
  2. import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
  3. import static java.util.stream.Collectors.toList;
  4. import static java.util.stream.Collectors.toMap;
  5. import com.google.common.collect.ImmutableMap;
  6. import com.google.common.collect.Iterators;
  7. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  8. import com.provectus.kafka.ui.exception.NotFoundException;
  9. import com.provectus.kafka.ui.exception.ValidationException;
  10. import com.provectus.kafka.ui.util.MapUtil;
  11. import com.provectus.kafka.ui.util.NumberUtil;
  12. import com.provectus.kafka.ui.util.annotations.KafkaClientInternalsDependant;
  13. import java.io.Closeable;
  14. import java.util.ArrayList;
  15. import java.util.Arrays;
  16. import java.util.Collection;
  17. import java.util.HashSet;
  18. import java.util.Iterator;
  19. import java.util.List;
  20. import java.util.Map;
  21. import java.util.Optional;
  22. import java.util.Set;
  23. import java.util.concurrent.CompletionException;
  24. import java.util.concurrent.ConcurrentHashMap;
  25. import java.util.concurrent.ExecutionException;
  26. import java.util.concurrent.atomic.AtomicInteger;
  27. import java.util.function.BiFunction;
  28. import java.util.function.Function;
  29. import java.util.function.Predicate;
  30. import java.util.stream.Collectors;
  31. import java.util.stream.Stream;
  32. import javax.annotation.Nullable;
  33. import lombok.RequiredArgsConstructor;
  34. import lombok.Value;
  35. import lombok.extern.slf4j.Slf4j;
  36. import org.apache.kafka.clients.admin.AdminClient;
  37. import org.apache.kafka.clients.admin.AlterConfigOp;
  38. import org.apache.kafka.clients.admin.Config;
  39. import org.apache.kafka.clients.admin.ConfigEntry;
  40. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  41. import org.apache.kafka.clients.admin.ConsumerGroupListing;
  42. import org.apache.kafka.clients.admin.DescribeConfigsOptions;
  43. import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
  44. import org.apache.kafka.clients.admin.ListTopicsOptions;
  45. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  46. import org.apache.kafka.clients.admin.NewPartitions;
  47. import org.apache.kafka.clients.admin.NewTopic;
  48. import org.apache.kafka.clients.admin.OffsetSpec;
  49. import org.apache.kafka.clients.admin.RecordsToDelete;
  50. import org.apache.kafka.clients.admin.TopicDescription;
  51. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  52. import org.apache.kafka.common.KafkaException;
  53. import org.apache.kafka.common.KafkaFuture;
  54. import org.apache.kafka.common.Node;
  55. import org.apache.kafka.common.TopicPartition;
  56. import org.apache.kafka.common.TopicPartitionInfo;
  57. import org.apache.kafka.common.TopicPartitionReplica;
  58. import org.apache.kafka.common.acl.AclOperation;
  59. import org.apache.kafka.common.config.ConfigResource;
  60. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  61. import org.apache.kafka.common.errors.GroupNotEmptyException;
  62. import org.apache.kafka.common.errors.InvalidRequestException;
  63. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  64. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  65. import reactor.core.publisher.Mono;
  66. import reactor.core.scheduler.Schedulers;
  67. import reactor.util.function.Tuple2;
  68. import reactor.util.function.Tuples;
  69. @Slf4j
  70. @RequiredArgsConstructor
  71. public class ReactiveAdminClient implements Closeable {
  72. private enum SupportedFeature {
  73. INCREMENTAL_ALTER_CONFIGS(2.3f),
  74. CONFIG_DOCUMENTATION_RETRIEVAL(2.6f);
  75. private final float sinceVersion;
  76. SupportedFeature(float sinceVersion) {
  77. this.sinceVersion = sinceVersion;
  78. }
  79. static Set<SupportedFeature> forVersion(float kafkaVersion) {
  80. return Arrays.stream(SupportedFeature.values())
  81. .filter(f -> kafkaVersion >= f.sinceVersion)
  82. .collect(Collectors.toSet());
  83. }
  84. static Set<SupportedFeature> defaultFeatures() {
  85. return Set.of();
  86. }
  87. }
  88. @Value
  89. public static class ClusterDescription {
  90. @Nullable
  91. Node controller;
  92. String clusterId;
  93. Collection<Node> nodes;
  94. Set<AclOperation> authorizedOperations;
  95. }
  96. public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
  97. return getClusterVersion(adminClient)
  98. .map(ver ->
  99. new ReactiveAdminClient(
  100. adminClient,
  101. ver,
  102. getSupportedUpdateFeaturesForVersion(ver)));
  103. }
  104. private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
  105. try {
  106. float version = NumberUtil.parserClusterVersion(versionStr);
  107. return SupportedFeature.forVersion(version);
  108. } catch (NumberFormatException e) {
  109. return SupportedFeature.defaultFeatures();
  110. }
  111. }
  112. //TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here
  113. private static <T> Mono<T> toMono(KafkaFuture<T> future) {
  114. return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
  115. if (ex != null) {
  116. // KafkaFuture doc is unclear about what exception wrapper will be used
  117. // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
  118. if (ex instanceof CompletionException || ex instanceof ExecutionException) {
  119. sink.error(ex.getCause()); //unwrapping exception
  120. } else {
  121. sink.error(ex);
  122. }
  123. } else {
  124. sink.success(res);
  125. }
  126. })).doOnCancel(() -> future.cancel(true))
  127. // AdminClient is using single thread for kafka communication
  128. // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
  129. // If some of downstream operation are blocking (by mistake) this can lead to
  130. // other AdminClient's requests stucking, which can cause timeout exceptions.
  131. // So, we explicitly setting Scheduler for downstream processing.
  132. .publishOn(Schedulers.parallel());
  133. }
  134. //---------------------------------------------------------------------------------
  135. private final AdminClient client;
  136. private final String version;
  137. private final Set<SupportedFeature> features;
  138. public Mono<Set<String>> listTopics(boolean listInternal) {
  139. return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
  140. }
  141. public Mono<Void> deleteTopic(String topicName) {
  142. return toMono(client.deleteTopics(List.of(topicName)).all());
  143. }
  144. public String getVersion() {
  145. return version;
  146. }
  147. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
  148. return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
  149. }
  150. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
  151. var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
  152. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  153. return partitionCalls(
  154. topicNames,
  155. 200,
  156. part -> getTopicsConfigImpl(part, includeDocFixed),
  157. (m1, m2) -> ImmutableMap.<String, List<ConfigEntry>>builder().putAll(m1).putAll(m2).build()
  158. );
  159. }
  160. private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
  161. List<ConfigResource> resources = topicNames.stream()
  162. .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
  163. .collect(toList());
  164. return toMonoWithExceptionFilter(
  165. client.describeConfigs(
  166. resources,
  167. new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
  168. UnknownTopicOrPartitionException.class
  169. ).map(config -> config.entrySet().stream()
  170. .collect(toMap(
  171. c -> c.getKey().name(),
  172. c -> List.copyOf(c.getValue().entries()))));
  173. }
  174. private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClient client, List<Integer> brokerIds) {
  175. List<ConfigResource> resources = brokerIds.stream()
  176. .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
  177. .collect(toList());
  178. return toMono(client.describeConfigs(resources).all())
  179. .doOnError(InvalidRequestException.class,
  180. th -> log.trace("Error while getting broker {} configs", brokerIds, th))
  181. // some kafka backends (like MSK serverless) do not support broker's configs retrieval,
  182. // in that case InvalidRequestException will be thrown
  183. .onErrorResume(InvalidRequestException.class, th -> Mono.just(Map.of()))
  184. .map(config -> config.entrySet().stream()
  185. .collect(toMap(
  186. c -> Integer.valueOf(c.getKey().name()),
  187. c -> new ArrayList<>(c.getValue().entries()))));
  188. }
  189. /**
  190. * Return per-broker configs or empty map if broker's configs retrieval not supported.
  191. */
  192. public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
  193. return loadBrokersConfig(client, brokerIds);
  194. }
  195. public Mono<Map<String, TopicDescription>> describeTopics() {
  196. return listTopics(true).flatMap(this::describeTopics);
  197. }
  198. public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
  199. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  200. return partitionCalls(
  201. topics,
  202. 200,
  203. this::describeTopicsImpl,
  204. (m1, m2) -> ImmutableMap.<String, TopicDescription>builder().putAll(m1).putAll(m2).build()
  205. );
  206. }
  207. private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
  208. return toMonoWithExceptionFilter(
  209. client.describeTopics(topics).values(),
  210. UnknownTopicOrPartitionException.class
  211. );
  212. }
  213. /**
  214. * Returns TopicDescription mono, or Empty Mono if topic not found.
  215. */
  216. public Mono<TopicDescription> describeTopic(String topic) {
  217. return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic)));
  218. }
  219. /**
  220. * Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
  221. * logic resulting Mono will be failing if any of Futures finished with error.
  222. * In some situations it is not what we want, ex. we call describeTopics(List names) method and
  223. * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
  224. * such topics in resulting map.
  225. * <p/>
  226. * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
  227. * finished with <code>clazz</code> exception.
  228. */
  229. private <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
  230. Class<? extends KafkaException> clazz) {
  231. if (values.isEmpty()) {
  232. return Mono.just(Map.of());
  233. }
  234. List<Mono<Tuple2<K, V>>> monos = values.entrySet().stream()
  235. .map(e -> toMono(e.getValue()).map(r -> Tuples.of(e.getKey(), r)))
  236. .collect(toList());
  237. return Mono.create(sink -> {
  238. var finishedCnt = new AtomicInteger();
  239. var results = new ConcurrentHashMap<K, V>();
  240. monos.forEach(mono -> mono.subscribe(
  241. r -> {
  242. results.put(r.getT1(), r.getT2());
  243. if (finishedCnt.incrementAndGet() == monos.size()) {
  244. sink.success(results);
  245. }
  246. },
  247. th -> {
  248. if (!th.getClass().isAssignableFrom(clazz)) {
  249. sink.error(th);
  250. } else if (finishedCnt.incrementAndGet() == monos.size()) {
  251. sink.success(results);
  252. }
  253. }
  254. ));
  255. });
  256. }
  257. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
  258. return describeCluster()
  259. .map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
  260. .flatMap(this::describeLogDirs);
  261. }
  262. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
  263. Collection<Integer> brokerIds) {
  264. return toMono(client.describeLogDirs(brokerIds).all());
  265. }
  266. public Mono<ClusterDescription> describeCluster() {
  267. var r = client.describeCluster();
  268. var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations());
  269. return Mono.create(sink -> all.whenComplete((res, ex) -> {
  270. if (ex != null) {
  271. sink.error(ex);
  272. } else {
  273. try {
  274. sink.success(
  275. new ClusterDescription(
  276. getUninterruptibly(r.controller()),
  277. getUninterruptibly(r.clusterId()),
  278. getUninterruptibly(r.nodes()),
  279. getUninterruptibly(r.authorizedOperations())
  280. )
  281. );
  282. } catch (ExecutionException e) {
  283. // can't be here, because all futures already completed
  284. }
  285. }
  286. }));
  287. }
  288. private static Mono<String> getClusterVersion(AdminClient client) {
  289. return toMono(client.describeCluster().controller())
  290. .flatMap(controller -> loadBrokersConfig(client, List.of(controller.id())))
  291. .map(configs -> configs.values().stream()
  292. .flatMap(Collection::stream)
  293. .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
  294. .findFirst()
  295. .map(ConfigEntry::value)
  296. .orElse("1.0-UNKNOWN")
  297. );
  298. }
  299. public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
  300. return toMono(client.deleteConsumerGroups(groupIds).all())
  301. .onErrorResume(GroupIdNotFoundException.class,
  302. th -> Mono.error(new NotFoundException("The group id does not exist")))
  303. .onErrorResume(GroupNotEmptyException.class,
  304. th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
  305. }
  306. public Mono<Void> createTopic(String name,
  307. int numPartitions,
  308. @Nullable Integer replicationFactor,
  309. Map<String, String> configs) {
  310. var newTopic = new NewTopic(
  311. name,
  312. Optional.of(numPartitions),
  313. Optional.ofNullable(replicationFactor).map(Integer::shortValue)
  314. ).configs(configs);
  315. return toMono(client.createTopics(List.of(newTopic)).all());
  316. }
  317. public Mono<Void> alterPartitionReassignments(
  318. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
  319. return toMono(client.alterPartitionReassignments(reassignments).all());
  320. }
  321. public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
  322. return toMono(client.createPartitions(newPartitionsMap).all());
  323. }
  324. public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
  325. if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
  326. return incrementalAlterConfig(topicName, configs);
  327. } else {
  328. return alterConfig(topicName, configs);
  329. }
  330. }
  331. public Mono<List<String>> listConsumerGroups() {
  332. return toMono(client.listConsumerGroups().all())
  333. .map(lst -> lst.stream().map(ConsumerGroupListing::groupId).collect(toList()));
  334. }
  335. public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
  336. return toMono(client.describeConsumerGroups(groupIds).all());
  337. }
  338. public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(String groupId) {
  339. return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
  340. }
  341. public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
  342. String groupId, List<TopicPartition> partitions) {
  343. return listConsumerGroupOffsets(groupId,
  344. new ListConsumerGroupOffsetsOptions().topicPartitions(partitions));
  345. }
  346. private Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
  347. String groupId, ListConsumerGroupOffsetsOptions options) {
  348. return toMono(client.listConsumerGroupOffsets(groupId, options).partitionsToOffsetAndMetadata())
  349. .map(MapUtil::removeNullValues)
  350. .map(m -> m.entrySet().stream()
  351. .map(e -> Tuples.of(e.getKey(), e.getValue().offset()))
  352. .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)));
  353. }
  354. public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
  355. return toMono(client.alterConsumerGroupOffsets(
  356. groupId,
  357. offsets.entrySet().stream()
  358. .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))))
  359. .all());
  360. }
  361. /**
  362. * List offset for the topic's partitions and OffsetSpec.
  363. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  364. * false - skip partitions with no leader
  365. */
  366. public Mono<Map<TopicPartition, Long>> listTopicOffsets(String topic,
  367. OffsetSpec offsetSpec,
  368. boolean failOnUnknownLeader) {
  369. return describeTopic(topic)
  370. .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader))
  371. .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec));
  372. }
  373. /**
  374. * List offset for the specified partitions and OffsetSpec.
  375. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  376. * false - skip partitions with no leader
  377. */
  378. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> partitions,
  379. OffsetSpec offsetSpec,
  380. boolean failOnUnknownLeader) {
  381. return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader)
  382. .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
  383. }
  384. private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
  385. boolean failOnUnknownLeader) {
  386. var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
  387. return describeTopicsImpl(targetTopics)
  388. .map(descriptions ->
  389. filterPartitionsWithLeaderCheck(
  390. descriptions.values(), partitions::contains, failOnUnknownLeader));
  391. }
  392. private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
  393. Predicate<TopicPartition> partitionPredicate,
  394. boolean failOnUnknownLeader) {
  395. var goodPartitions = new HashSet<TopicPartition>();
  396. for (TopicDescription description : topicDescriptions) {
  397. for (TopicPartitionInfo partitionInfo : description.partitions()) {
  398. TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
  399. if (!partitionPredicate.test(topicPartition)) {
  400. continue;
  401. }
  402. if (partitionInfo.leader() != null) {
  403. goodPartitions.add(topicPartition);
  404. } else if (failOnUnknownLeader) {
  405. throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
  406. }
  407. }
  408. }
  409. return goodPartitions;
  410. }
  411. // 1. NOTE(!): should only apply for partitions with existing leader,
  412. // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
  413. // 2. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
  414. @KafkaClientInternalsDependant
  415. public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
  416. OffsetSpec offsetSpec) {
  417. Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
  418. parts -> toMono(
  419. client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec))).all())
  420. .map(offsets -> offsets.entrySet().stream()
  421. // filtering partitions for which offsets were not found
  422. .filter(e -> e.getValue().offset() >= 0)
  423. .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
  424. return partitionCalls(
  425. partitions,
  426. 200,
  427. call,
  428. (m1, m2) -> ImmutableMap.<TopicPartition, Long>builder().putAll(m1).putAll(m2).build()
  429. );
  430. }
  431. public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
  432. ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
  433. AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
  434. return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
  435. }
  436. public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
  437. var records = offsets.entrySet().stream()
  438. .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
  439. .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
  440. return toMono(client.deleteRecords(records).all());
  441. }
  442. public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
  443. return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
  444. }
  445. private Mono<Void> incrementalAlterConfig(String topicName, Map<String, String> configs) {
  446. var config = configs.entrySet().stream()
  447. .flatMap(cfg -> Stream.of(
  448. new AlterConfigOp(
  449. new ConfigEntry(
  450. cfg.getKey(),
  451. cfg.getValue()),
  452. AlterConfigOp.OpType.SET)))
  453. .collect(toList());
  454. var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  455. return toMono(client.incrementalAlterConfigs(Map.of(topicResource, config)).all());
  456. }
  457. @SuppressWarnings("deprecation")
  458. private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
  459. List<ConfigEntry> configEntries = configs.entrySet().stream()
  460. .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
  461. .collect(toList());
  462. Config config = new Config(configEntries);
  463. var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  464. return toMono(client.alterConfigs(Map.of(topicResource, config)).all());
  465. }
  466. /**
  467. * Splits input collection into batches, applies each batch sequentially to function
  468. * and merges output Monos into one Mono.
  469. */
  470. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  471. int partitionSize,
  472. Function<Collection<I>, Mono<R>> call,
  473. BiFunction<R, R, R> merger) {
  474. if (items.isEmpty()) {
  475. return call.apply(items);
  476. }
  477. Iterator<List<I>> parts = Iterators.partition(items.iterator(), partitionSize);
  478. Mono<R> mono = call.apply(parts.next());
  479. while (parts.hasNext()) {
  480. var nextPart = parts.next();
  481. // calls will be executed sequentially
  482. mono = mono.flatMap(res1 -> call.apply(nextPart).map(res2 -> merger.apply(res1, res2)));
  483. }
  484. return mono;
  485. }
  486. @Override
  487. public void close() {
  488. client.close();
  489. }
  490. }