ReactiveAdminClient.java 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. package com.provectus.kafka.ui.service;
  2. import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
  3. import static java.util.stream.Collectors.toList;
  4. import static java.util.stream.Collectors.toMap;
  5. import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
  6. import com.google.common.collect.ImmutableMap;
  7. import com.google.common.collect.Iterators;
  8. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  9. import com.provectus.kafka.ui.exception.NotFoundException;
  10. import com.provectus.kafka.ui.exception.ValidationException;
  11. import com.provectus.kafka.ui.util.MapUtil;
  12. import com.provectus.kafka.ui.util.NumberUtil;
  13. import com.provectus.kafka.ui.util.annotations.KafkaClientInternalsDependant;
  14. import java.io.Closeable;
  15. import java.util.ArrayList;
  16. import java.util.Arrays;
  17. import java.util.Collection;
  18. import java.util.HashMap;
  19. import java.util.HashSet;
  20. import java.util.Iterator;
  21. import java.util.List;
  22. import java.util.Map;
  23. import java.util.Optional;
  24. import java.util.Set;
  25. import java.util.concurrent.CompletionException;
  26. import java.util.concurrent.ConcurrentHashMap;
  27. import java.util.concurrent.ExecutionException;
  28. import java.util.concurrent.atomic.AtomicInteger;
  29. import java.util.function.BiFunction;
  30. import java.util.function.Function;
  31. import java.util.function.Predicate;
  32. import java.util.stream.Collectors;
  33. import java.util.stream.Stream;
  34. import javax.annotation.Nullable;
  35. import lombok.AccessLevel;
  36. import lombok.Getter;
  37. import lombok.RequiredArgsConstructor;
  38. import lombok.Value;
  39. import lombok.extern.slf4j.Slf4j;
  40. import org.apache.kafka.clients.admin.AdminClient;
  41. import org.apache.kafka.clients.admin.AlterConfigOp;
  42. import org.apache.kafka.clients.admin.Config;
  43. import org.apache.kafka.clients.admin.ConfigEntry;
  44. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  45. import org.apache.kafka.clients.admin.ConsumerGroupListing;
  46. import org.apache.kafka.clients.admin.DescribeConfigsOptions;
  47. import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
  48. import org.apache.kafka.clients.admin.ListOffsetsResult;
  49. import org.apache.kafka.clients.admin.ListTopicsOptions;
  50. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  51. import org.apache.kafka.clients.admin.NewPartitions;
  52. import org.apache.kafka.clients.admin.NewTopic;
  53. import org.apache.kafka.clients.admin.OffsetSpec;
  54. import org.apache.kafka.clients.admin.RecordsToDelete;
  55. import org.apache.kafka.clients.admin.TopicDescription;
  56. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  57. import org.apache.kafka.common.KafkaException;
  58. import org.apache.kafka.common.KafkaFuture;
  59. import org.apache.kafka.common.Node;
  60. import org.apache.kafka.common.TopicPartition;
  61. import org.apache.kafka.common.TopicPartitionInfo;
  62. import org.apache.kafka.common.TopicPartitionReplica;
  63. import org.apache.kafka.common.acl.AclOperation;
  64. import org.apache.kafka.common.config.ConfigResource;
  65. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  66. import org.apache.kafka.common.errors.GroupNotEmptyException;
  67. import org.apache.kafka.common.errors.InvalidRequestException;
  68. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  69. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  70. import reactor.core.publisher.Mono;
  71. import reactor.core.scheduler.Schedulers;
  72. import reactor.util.function.Tuple2;
  73. import reactor.util.function.Tuples;
  74. @Slf4j
  75. @RequiredArgsConstructor
  76. public class ReactiveAdminClient implements Closeable {
  77. private enum SupportedFeature {
  78. INCREMENTAL_ALTER_CONFIGS(2.3f),
  79. CONFIG_DOCUMENTATION_RETRIEVAL(2.6f);
  80. private final float sinceVersion;
  81. SupportedFeature(float sinceVersion) {
  82. this.sinceVersion = sinceVersion;
  83. }
  84. static Set<SupportedFeature> forVersion(float kafkaVersion) {
  85. return Arrays.stream(SupportedFeature.values())
  86. .filter(f -> kafkaVersion >= f.sinceVersion)
  87. .collect(Collectors.toSet());
  88. }
  89. static Set<SupportedFeature> defaultFeatures() {
  90. return Set.of();
  91. }
  92. }
  93. @Value
  94. public static class ClusterDescription {
  95. @Nullable
  96. Node controller;
  97. String clusterId;
  98. Collection<Node> nodes;
  99. Set<AclOperation> authorizedOperations;
  100. }
  101. public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
  102. return getClusterVersion(adminClient)
  103. .map(ver ->
  104. new ReactiveAdminClient(
  105. adminClient,
  106. ver,
  107. getSupportedUpdateFeaturesForVersion(ver)));
  108. }
  109. private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
  110. try {
  111. float version = NumberUtil.parserClusterVersion(versionStr);
  112. return SupportedFeature.forVersion(version);
  113. } catch (NumberFormatException e) {
  114. return SupportedFeature.defaultFeatures();
  115. }
  116. }
  117. //TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here
  118. private static <T> Mono<T> toMono(KafkaFuture<T> future) {
  119. return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
  120. if (ex != null) {
  121. // KafkaFuture doc is unclear about what exception wrapper will be used
  122. // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
  123. if (ex instanceof CompletionException || ex instanceof ExecutionException) {
  124. sink.error(ex.getCause()); //unwrapping exception
  125. } else {
  126. sink.error(ex);
  127. }
  128. } else {
  129. sink.success(res);
  130. }
  131. })).doOnCancel(() -> future.cancel(true))
  132. // AdminClient is using single thread for kafka communication
  133. // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
  134. // If some of downstream operation are blocking (by mistake) this can lead to
  135. // other AdminClient's requests stucking, which can cause timeout exceptions.
  136. // So, we explicitly setting Scheduler for downstream processing.
  137. .publishOn(Schedulers.parallel());
  138. }
  139. //---------------------------------------------------------------------------------
  140. @Getter(AccessLevel.PACKAGE) // visible for testing
  141. private final AdminClient client;
  142. private final String version;
  143. private final Set<SupportedFeature> features;
  144. public Mono<Set<String>> listTopics(boolean listInternal) {
  145. return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
  146. }
  147. public Mono<Void> deleteTopic(String topicName) {
  148. return toMono(client.deleteTopics(List.of(topicName)).all());
  149. }
  150. public String getVersion() {
  151. return version;
  152. }
  153. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
  154. return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
  155. }
  156. //NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
  157. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
  158. var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
  159. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  160. return partitionCalls(
  161. topicNames,
  162. 200,
  163. part -> getTopicsConfigImpl(part, includeDocFixed),
  164. (m1, m2) -> ImmutableMap.<String, List<ConfigEntry>>builder().putAll(m1).putAll(m2).build()
  165. );
  166. }
  167. private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
  168. List<ConfigResource> resources = topicNames.stream()
  169. .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
  170. .collect(toList());
  171. return toMonoWithExceptionFilter(
  172. client.describeConfigs(
  173. resources,
  174. new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
  175. UnknownTopicOrPartitionException.class
  176. ).map(config -> config.entrySet().stream()
  177. .collect(toMap(
  178. c -> c.getKey().name(),
  179. c -> List.copyOf(c.getValue().entries()))));
  180. }
  181. private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClient client, List<Integer> brokerIds) {
  182. List<ConfigResource> resources = brokerIds.stream()
  183. .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
  184. .collect(toList());
  185. return toMono(client.describeConfigs(resources).all())
  186. .doOnError(InvalidRequestException.class,
  187. th -> log.trace("Error while getting broker {} configs", brokerIds, th))
  188. // some kafka backends (like MSK serverless) do not support broker's configs retrieval,
  189. // in that case InvalidRequestException will be thrown
  190. .onErrorResume(InvalidRequestException.class, th -> Mono.just(Map.of()))
  191. .map(config -> config.entrySet().stream()
  192. .collect(toMap(
  193. c -> Integer.valueOf(c.getKey().name()),
  194. c -> new ArrayList<>(c.getValue().entries()))));
  195. }
  196. /**
  197. * Return per-broker configs or empty map if broker's configs retrieval not supported.
  198. */
  199. public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
  200. return loadBrokersConfig(client, brokerIds);
  201. }
  202. public Mono<Map<String, TopicDescription>> describeTopics() {
  203. return listTopics(true).flatMap(this::describeTopics);
  204. }
  205. public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
  206. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  207. return partitionCalls(
  208. topics,
  209. 200,
  210. this::describeTopicsImpl,
  211. (m1, m2) -> ImmutableMap.<String, TopicDescription>builder().putAll(m1).putAll(m2).build()
  212. );
  213. }
  214. private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
  215. return toMonoWithExceptionFilter(
  216. client.describeTopics(topics).values(),
  217. UnknownTopicOrPartitionException.class
  218. );
  219. }
  220. /**
  221. * Returns TopicDescription mono, or Empty Mono if topic not found.
  222. */
  223. public Mono<TopicDescription> describeTopic(String topic) {
  224. return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic)));
  225. }
  226. /**
  227. * Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
  228. * logic resulting Mono will be failing if any of Futures finished with error.
  229. * In some situations it is not what we want, ex. we call describeTopics(List names) method and
  230. * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
  231. * such topics in resulting map.
  232. * <p/>
  233. * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
  234. * finished with <code>clazz</code> exception.
  235. */
  236. static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
  237. Class<? extends KafkaException> clazz) {
  238. if (values.isEmpty()) {
  239. return Mono.just(Map.of());
  240. }
  241. List<Mono<Tuple2<K, V>>> monos = values.entrySet().stream()
  242. .map(e -> toMono(e.getValue()).map(r -> Tuples.of(e.getKey(), r)))
  243. .collect(toList());
  244. return Mono.create(sink -> {
  245. var finishedCnt = new AtomicInteger();
  246. var results = new ConcurrentHashMap<K, V>();
  247. monos.forEach(mono -> mono.subscribe(
  248. r -> {
  249. results.put(r.getT1(), r.getT2());
  250. if (finishedCnt.incrementAndGet() == monos.size()) {
  251. sink.success(results);
  252. }
  253. },
  254. th -> {
  255. if (!th.getClass().isAssignableFrom(clazz)) {
  256. sink.error(th);
  257. } else if (finishedCnt.incrementAndGet() == monos.size()) {
  258. sink.success(results);
  259. }
  260. }
  261. ));
  262. });
  263. }
  264. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
  265. return describeCluster()
  266. .map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
  267. .flatMap(this::describeLogDirs);
  268. }
  269. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
  270. Collection<Integer> brokerIds) {
  271. return toMono(client.describeLogDirs(brokerIds).all());
  272. }
  273. public Mono<ClusterDescription> describeCluster() {
  274. var r = client.describeCluster();
  275. var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations());
  276. return Mono.create(sink -> all.whenComplete((res, ex) -> {
  277. if (ex != null) {
  278. sink.error(ex);
  279. } else {
  280. try {
  281. sink.success(
  282. new ClusterDescription(
  283. getUninterruptibly(r.controller()),
  284. getUninterruptibly(r.clusterId()),
  285. getUninterruptibly(r.nodes()),
  286. getUninterruptibly(r.authorizedOperations())
  287. )
  288. );
  289. } catch (ExecutionException e) {
  290. // can't be here, because all futures already completed
  291. }
  292. }
  293. }));
  294. }
  295. private static Mono<String> getClusterVersion(AdminClient client) {
  296. return toMono(client.describeCluster().controller())
  297. .flatMap(controller -> loadBrokersConfig(client, List.of(controller.id())))
  298. .map(configs -> configs.values().stream()
  299. .flatMap(Collection::stream)
  300. .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
  301. .findFirst()
  302. .map(ConfigEntry::value)
  303. .orElse("1.0-UNKNOWN")
  304. );
  305. }
  306. public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
  307. return toMono(client.deleteConsumerGroups(groupIds).all())
  308. .onErrorResume(GroupIdNotFoundException.class,
  309. th -> Mono.error(new NotFoundException("The group id does not exist")))
  310. .onErrorResume(GroupNotEmptyException.class,
  311. th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
  312. }
  313. public Mono<Void> createTopic(String name,
  314. int numPartitions,
  315. @Nullable Integer replicationFactor,
  316. Map<String, String> configs) {
  317. var newTopic = new NewTopic(
  318. name,
  319. Optional.of(numPartitions),
  320. Optional.ofNullable(replicationFactor).map(Integer::shortValue)
  321. ).configs(configs);
  322. return toMono(client.createTopics(List.of(newTopic)).all());
  323. }
  324. public Mono<Void> alterPartitionReassignments(
  325. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
  326. return toMono(client.alterPartitionReassignments(reassignments).all());
  327. }
  328. public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
  329. return toMono(client.createPartitions(newPartitionsMap).all());
  330. }
  331. // NOTE: places whole current topic config with new one. Entries that were present in old config,
  332. // but missed in new will be set to default
  333. public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
  334. if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
  335. return getTopicsConfigImpl(List.of(topicName), false)
  336. .map(conf -> conf.getOrDefault(topicName, List.of()))
  337. .flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
  338. } else {
  339. return alterConfig(topicName, configs);
  340. }
  341. }
  342. public Mono<List<String>> listConsumerGroups() {
  343. return toMono(client.listConsumerGroups().all())
  344. .map(lst -> lst.stream().map(ConsumerGroupListing::groupId).collect(toList()));
  345. }
  346. public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
  347. return toMono(client.describeConsumerGroups(groupIds).all());
  348. }
  349. public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(String groupId) {
  350. return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
  351. }
  352. public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
  353. String groupId, List<TopicPartition> partitions) {
  354. return listConsumerGroupOffsets(groupId,
  355. new ListConsumerGroupOffsetsOptions().topicPartitions(partitions));
  356. }
  357. private Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
  358. String groupId, ListConsumerGroupOffsetsOptions options) {
  359. return toMono(client.listConsumerGroupOffsets(groupId, options).partitionsToOffsetAndMetadata())
  360. .map(MapUtil::removeNullValues)
  361. .map(m -> m.entrySet().stream()
  362. .map(e -> Tuples.of(e.getKey(), e.getValue().offset()))
  363. .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)));
  364. }
  365. public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
  366. return toMono(client.alterConsumerGroupOffsets(
  367. groupId,
  368. offsets.entrySet().stream()
  369. .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))))
  370. .all());
  371. }
  372. /**
  373. * List offset for the topic's partitions and OffsetSpec.
  374. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  375. * false - skip partitions with no leader
  376. */
  377. public Mono<Map<TopicPartition, Long>> listTopicOffsets(String topic,
  378. OffsetSpec offsetSpec,
  379. boolean failOnUnknownLeader) {
  380. return describeTopic(topic)
  381. .map(td -> filterPartitionsWithLeaderCheck(List.of(td), p -> true, failOnUnknownLeader))
  382. .flatMap(partitions -> listOffsetsUnsafe(partitions, offsetSpec));
  383. }
  384. /**
  385. * List offset for the specified partitions and OffsetSpec.
  386. * @param failOnUnknownLeader true - throw exception in case of no-leader partitions,
  387. * false - skip partitions with no leader
  388. */
  389. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> partitions,
  390. OffsetSpec offsetSpec,
  391. boolean failOnUnknownLeader) {
  392. return filterPartitionsWithLeaderCheck(partitions, failOnUnknownLeader)
  393. .flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
  394. }
  395. private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
  396. boolean failOnUnknownLeader) {
  397. var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
  398. return describeTopicsImpl(targetTopics)
  399. .map(descriptions ->
  400. filterPartitionsWithLeaderCheck(
  401. descriptions.values(), partitions::contains, failOnUnknownLeader));
  402. }
  403. private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
  404. Predicate<TopicPartition> partitionPredicate,
  405. boolean failOnUnknownLeader) {
  406. var goodPartitions = new HashSet<TopicPartition>();
  407. for (TopicDescription description : topicDescriptions) {
  408. for (TopicPartitionInfo partitionInfo : description.partitions()) {
  409. TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
  410. if (!partitionPredicate.test(topicPartition)) {
  411. continue;
  412. }
  413. if (partitionInfo.leader() != null) {
  414. goodPartitions.add(topicPartition);
  415. } else if (failOnUnknownLeader) {
  416. throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
  417. }
  418. }
  419. }
  420. return goodPartitions;
  421. }
  422. // 1. NOTE(!): should only apply for partitions with existing leader,
  423. // otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
  424. // 2. NOTE(!): Skips partitions that were not initialized yet
  425. // (UnknownTopicOrPartitionException thrown, ex. after topic creation)
  426. // 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
  427. @KafkaClientInternalsDependant
  428. public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
  429. OffsetSpec offsetSpec) {
  430. Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
  431. parts -> {
  432. ListOffsetsResult r = client.listOffsets(parts.stream().collect(toMap(tp -> tp, tp -> offsetSpec)));
  433. Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> perPartitionResults = new HashMap<>();
  434. parts.forEach(p -> perPartitionResults.put(p, r.partitionResult(p)));
  435. return toMonoWithExceptionFilter(perPartitionResults, UnknownTopicOrPartitionException.class)
  436. .map(offsets -> offsets.entrySet().stream()
  437. // filtering partitions for which offsets were not found
  438. .filter(e -> e.getValue().offset() >= 0)
  439. .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
  440. };
  441. return partitionCalls(
  442. partitions,
  443. 200,
  444. call,
  445. (m1, m2) -> ImmutableMap.<TopicPartition, Long>builder().putAll(m1).putAll(m2).build()
  446. );
  447. }
  448. public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
  449. ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
  450. AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
  451. return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
  452. }
  453. public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
  454. var records = offsets.entrySet().stream()
  455. .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
  456. .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
  457. return toMono(client.deleteRecords(records).all());
  458. }
  459. public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
  460. return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
  461. }
  462. private Mono<Void> incrementalAlterConfig(String topicName,
  463. List<ConfigEntry> currentConfigs,
  464. Map<String, String> newConfigs) {
  465. var configsToDelete = currentConfigs.stream()
  466. .filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only
  467. .filter(e -> !newConfigs.containsKey(e.name()))
  468. .map(e -> new AlterConfigOp(e, AlterConfigOp.OpType.DELETE));
  469. var configsToSet = newConfigs.entrySet().stream()
  470. .map(e -> new AlterConfigOp(new ConfigEntry(e.getKey(), e.getValue()), AlterConfigOp.OpType.SET));
  471. return toMono(client.incrementalAlterConfigs(
  472. Map.of(
  473. new ConfigResource(ConfigResource.Type.TOPIC, topicName),
  474. Stream.concat(configsToDelete, configsToSet).toList()
  475. )).all());
  476. }
  477. @SuppressWarnings("deprecation")
  478. private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
  479. List<ConfigEntry> configEntries = configs.entrySet().stream()
  480. .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
  481. .collect(toList());
  482. Config config = new Config(configEntries);
  483. var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  484. return toMono(client.alterConfigs(Map.of(topicResource, config)).all());
  485. }
  486. /**
  487. * Splits input collection into batches, applies each batch sequentially to function
  488. * and merges output Monos into one Mono.
  489. */
  490. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  491. int partitionSize,
  492. Function<Collection<I>, Mono<R>> call,
  493. BiFunction<R, R, R> merger) {
  494. if (items.isEmpty()) {
  495. return call.apply(items);
  496. }
  497. Iterator<List<I>> parts = Iterators.partition(items.iterator(), partitionSize);
  498. Mono<R> mono = call.apply(parts.next());
  499. while (parts.hasNext()) {
  500. var nextPart = parts.next();
  501. // calls will be executed sequentially
  502. mono = mono.flatMap(res1 -> call.apply(nextPart).map(res2 -> merger.apply(res1, res2)));
  503. }
  504. return mono;
  505. }
  506. @Override
  507. public void close() {
  508. client.close();
  509. }
  510. }