ReactiveAdminClient.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. package com.provectus.kafka.ui.service;
  2. import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
  3. import static java.util.stream.Collectors.toList;
  4. import static java.util.stream.Collectors.toMap;
  5. import com.google.common.collect.ImmutableMap;
  6. import com.google.common.collect.Iterators;
  7. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  8. import com.provectus.kafka.ui.exception.NotFoundException;
  9. import com.provectus.kafka.ui.util.MapUtil;
  10. import com.provectus.kafka.ui.util.NumberUtil;
  11. import java.io.Closeable;
  12. import java.util.ArrayList;
  13. import java.util.Arrays;
  14. import java.util.Collection;
  15. import java.util.Iterator;
  16. import java.util.List;
  17. import java.util.Map;
  18. import java.util.Optional;
  19. import java.util.Set;
  20. import java.util.concurrent.CompletionException;
  21. import java.util.concurrent.ConcurrentHashMap;
  22. import java.util.concurrent.ExecutionException;
  23. import java.util.concurrent.atomic.AtomicInteger;
  24. import java.util.function.BiFunction;
  25. import java.util.function.Function;
  26. import java.util.stream.Collectors;
  27. import java.util.stream.Stream;
  28. import javax.annotation.Nullable;
  29. import lombok.RequiredArgsConstructor;
  30. import lombok.Value;
  31. import lombok.extern.slf4j.Slf4j;
  32. import org.apache.kafka.clients.admin.AdminClient;
  33. import org.apache.kafka.clients.admin.AlterConfigOp;
  34. import org.apache.kafka.clients.admin.Config;
  35. import org.apache.kafka.clients.admin.ConfigEntry;
  36. import org.apache.kafka.clients.admin.ConsumerGroupDescription;
  37. import org.apache.kafka.clients.admin.ConsumerGroupListing;
  38. import org.apache.kafka.clients.admin.DescribeConfigsOptions;
  39. import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
  40. import org.apache.kafka.clients.admin.ListTopicsOptions;
  41. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  42. import org.apache.kafka.clients.admin.NewPartitions;
  43. import org.apache.kafka.clients.admin.NewTopic;
  44. import org.apache.kafka.clients.admin.OffsetSpec;
  45. import org.apache.kafka.clients.admin.RecordsToDelete;
  46. import org.apache.kafka.clients.admin.TopicDescription;
  47. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  48. import org.apache.kafka.common.KafkaException;
  49. import org.apache.kafka.common.KafkaFuture;
  50. import org.apache.kafka.common.Node;
  51. import org.apache.kafka.common.TopicPartition;
  52. import org.apache.kafka.common.TopicPartitionReplica;
  53. import org.apache.kafka.common.acl.AclOperation;
  54. import org.apache.kafka.common.config.ConfigResource;
  55. import org.apache.kafka.common.errors.GroupIdNotFoundException;
  56. import org.apache.kafka.common.errors.GroupNotEmptyException;
  57. import org.apache.kafka.common.errors.InvalidRequestException;
  58. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  59. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  60. import reactor.core.publisher.Mono;
  61. import reactor.core.scheduler.Schedulers;
  62. import reactor.util.function.Tuple2;
  63. import reactor.util.function.Tuples;
  64. @Slf4j
  65. @RequiredArgsConstructor
  66. public class ReactiveAdminClient implements Closeable {
  67. private enum SupportedFeature {
  68. INCREMENTAL_ALTER_CONFIGS(2.3f),
  69. CONFIG_DOCUMENTATION_RETRIEVAL(2.6f);
  70. private final float sinceVersion;
  71. SupportedFeature(float sinceVersion) {
  72. this.sinceVersion = sinceVersion;
  73. }
  74. static Set<SupportedFeature> forVersion(float kafkaVersion) {
  75. return Arrays.stream(SupportedFeature.values())
  76. .filter(f -> kafkaVersion >= f.sinceVersion)
  77. .collect(Collectors.toSet());
  78. }
  79. static Set<SupportedFeature> defaultFeatures() {
  80. return Set.of();
  81. }
  82. }
  83. @Value
  84. public static class ClusterDescription {
  85. @Nullable
  86. Node controller;
  87. String clusterId;
  88. Collection<Node> nodes;
  89. Set<AclOperation> authorizedOperations;
  90. }
  91. public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
  92. return getClusterVersion(adminClient)
  93. .map(ver ->
  94. new ReactiveAdminClient(
  95. adminClient,
  96. ver,
  97. getSupportedUpdateFeaturesForVersion(ver)));
  98. }
  99. private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
  100. try {
  101. float version = NumberUtil.parserClusterVersion(versionStr);
  102. return SupportedFeature.forVersion(version);
  103. } catch (NumberFormatException e) {
  104. return SupportedFeature.defaultFeatures();
  105. }
  106. }
  107. //TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here
  108. private static <T> Mono<T> toMono(KafkaFuture<T> future) {
  109. return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
  110. if (ex != null) {
  111. // KafkaFuture doc is unclear about what exception wrapper will be used
  112. // (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
  113. if (ex instanceof CompletionException || ex instanceof ExecutionException) {
  114. sink.error(ex.getCause()); //unwrapping exception
  115. } else {
  116. sink.error(ex);
  117. }
  118. } else {
  119. sink.success(res);
  120. }
  121. })).doOnCancel(() -> future.cancel(true))
  122. // AdminClient is using single thread for kafka communication
  123. // and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
  124. // If some of downstream operation are blocking (by mistake) this can lead to
  125. // other AdminClient's requests stucking, which can cause timeout exceptions.
  126. // So, we explicitly setting Scheduler for downstream processing.
  127. .publishOn(Schedulers.parallel());
  128. }
  129. //---------------------------------------------------------------------------------
  130. private final AdminClient client;
  131. private final String version;
  132. private final Set<SupportedFeature> features;
  133. public Mono<Set<String>> listTopics(boolean listInternal) {
  134. return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
  135. }
  136. public Mono<Void> deleteTopic(String topicName) {
  137. return toMono(client.deleteTopics(List.of(topicName)).all());
  138. }
  139. public String getVersion() {
  140. return version;
  141. }
  142. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
  143. return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
  144. }
  145. public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
  146. var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
  147. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  148. return partitionCalls(
  149. topicNames,
  150. 200,
  151. part -> getTopicsConfigImpl(part, includeDocFixed),
  152. (m1, m2) -> ImmutableMap.<String, List<ConfigEntry>>builder().putAll(m1).putAll(m2).build()
  153. );
  154. }
  155. private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
  156. List<ConfigResource> resources = topicNames.stream()
  157. .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
  158. .collect(toList());
  159. return toMonoWithExceptionFilter(
  160. client.describeConfigs(
  161. resources,
  162. new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
  163. UnknownTopicOrPartitionException.class
  164. ).map(config -> config.entrySet().stream()
  165. .collect(toMap(
  166. c -> c.getKey().name(),
  167. c -> List.copyOf(c.getValue().entries()))));
  168. }
  169. private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClient client, List<Integer> brokerIds) {
  170. List<ConfigResource> resources = brokerIds.stream()
  171. .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
  172. .collect(toList());
  173. return toMono(client.describeConfigs(resources).all())
  174. .doOnError(InvalidRequestException.class,
  175. th -> log.trace("Error while getting broker {} configs", brokerIds, th))
  176. // some kafka backends (like MSK serverless) do not support broker's configs retrieval,
  177. // in that case InvalidRequestException will be thrown
  178. .onErrorResume(InvalidRequestException.class, th -> Mono.just(Map.of()))
  179. .map(config -> config.entrySet().stream()
  180. .collect(toMap(
  181. c -> Integer.valueOf(c.getKey().name()),
  182. c -> new ArrayList<>(c.getValue().entries()))));
  183. }
  184. /**
  185. * Return per-broker configs or empty map if broker's configs retrieval not supported.
  186. */
  187. public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
  188. return loadBrokersConfig(client, brokerIds);
  189. }
  190. public Mono<Map<String, TopicDescription>> describeTopics() {
  191. return listTopics(true).flatMap(this::describeTopics);
  192. }
  193. public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> topics) {
  194. // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
  195. return partitionCalls(
  196. topics,
  197. 200,
  198. this::describeTopicsImpl,
  199. (m1, m2) -> ImmutableMap.<String, TopicDescription>builder().putAll(m1).putAll(m2).build()
  200. );
  201. }
  202. private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
  203. return toMonoWithExceptionFilter(
  204. client.describeTopics(topics).values(),
  205. UnknownTopicOrPartitionException.class
  206. );
  207. }
  208. /**
  209. * Returns TopicDescription mono, or Empty Mono if topic not found.
  210. */
  211. public Mono<TopicDescription> describeTopic(String topic) {
  212. return describeTopics(List.of(topic)).flatMap(m -> Mono.justOrEmpty(m.get(topic)));
  213. }
  214. /**
  215. * Kafka API often returns Map responses with KafkaFuture values. If we do allOf()
  216. * logic resulting Mono will be failing if any of Futures finished with error.
  217. * In some situations it is not what we want, ex. we call describeTopics(List names) method and
  218. * we getting UnknownTopicOrPartitionException for unknown topics and we what to just not put
  219. * such topics in resulting map.
  220. * <p/>
  221. * This method converts input map into Mono[Map] ignoring keys for which KafkaFutures
  222. * finished with <code>clazz</code> exception.
  223. */
  224. private <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> values,
  225. Class<? extends KafkaException> clazz) {
  226. if (values.isEmpty()) {
  227. return Mono.just(Map.of());
  228. }
  229. List<Mono<Tuple2<K, V>>> monos = values.entrySet().stream()
  230. .map(e -> toMono(e.getValue()).map(r -> Tuples.of(e.getKey(), r)))
  231. .collect(toList());
  232. return Mono.create(sink -> {
  233. var finishedCnt = new AtomicInteger();
  234. var results = new ConcurrentHashMap<K, V>();
  235. monos.forEach(mono -> mono.subscribe(
  236. r -> {
  237. results.put(r.getT1(), r.getT2());
  238. if (finishedCnt.incrementAndGet() == monos.size()) {
  239. sink.success(results);
  240. }
  241. },
  242. th -> {
  243. if (!th.getClass().isAssignableFrom(clazz)) {
  244. sink.error(th);
  245. } else if (finishedCnt.incrementAndGet() == monos.size()) {
  246. sink.success(results);
  247. }
  248. }
  249. ));
  250. });
  251. }
  252. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
  253. return describeCluster()
  254. .map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
  255. .flatMap(this::describeLogDirs);
  256. }
  257. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs(
  258. Collection<Integer> brokerIds) {
  259. return toMono(client.describeLogDirs(brokerIds).all());
  260. }
  261. public Mono<ClusterDescription> describeCluster() {
  262. var r = client.describeCluster();
  263. var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations());
  264. return Mono.create(sink -> all.whenComplete((res, ex) -> {
  265. if (ex != null) {
  266. sink.error(ex);
  267. } else {
  268. try {
  269. sink.success(
  270. new ClusterDescription(
  271. getUninterruptibly(r.controller()),
  272. getUninterruptibly(r.clusterId()),
  273. getUninterruptibly(r.nodes()),
  274. getUninterruptibly(r.authorizedOperations())
  275. )
  276. );
  277. } catch (ExecutionException e) {
  278. // can't be here, because all futures already completed
  279. }
  280. }
  281. }));
  282. }
  283. private static Mono<String> getClusterVersion(AdminClient client) {
  284. return toMono(client.describeCluster().controller())
  285. .flatMap(controller -> loadBrokersConfig(client, List.of(controller.id())))
  286. .map(configs -> configs.values().stream()
  287. .flatMap(Collection::stream)
  288. .filter(entry -> entry.name().contains("inter.broker.protocol.version"))
  289. .findFirst()
  290. .map(ConfigEntry::value)
  291. .orElse("1.0-UNKNOWN")
  292. );
  293. }
  294. public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
  295. return toMono(client.deleteConsumerGroups(groupIds).all())
  296. .onErrorResume(GroupIdNotFoundException.class,
  297. th -> Mono.error(new NotFoundException("The group id does not exist")))
  298. .onErrorResume(GroupNotEmptyException.class,
  299. th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
  300. }
  301. public Mono<Void> createTopic(String name,
  302. int numPartitions,
  303. @Nullable Integer replicationFactor,
  304. Map<String, String> configs) {
  305. var newTopic = new NewTopic(
  306. name,
  307. Optional.of(numPartitions),
  308. Optional.ofNullable(replicationFactor).map(Integer::shortValue)
  309. ).configs(configs);
  310. return toMono(client.createTopics(List.of(newTopic)).all());
  311. }
  312. public Mono<Void> alterPartitionReassignments(
  313. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
  314. return toMono(client.alterPartitionReassignments(reassignments).all());
  315. }
  316. public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
  317. return toMono(client.createPartitions(newPartitionsMap).all());
  318. }
  319. public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
  320. if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
  321. return incrementalAlterConfig(topicName, configs);
  322. } else {
  323. return alterConfig(topicName, configs);
  324. }
  325. }
  326. public Mono<List<String>> listConsumerGroups() {
  327. return toMono(client.listConsumerGroups().all())
  328. .map(lst -> lst.stream().map(ConsumerGroupListing::groupId).collect(toList()));
  329. }
  330. public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
  331. return toMono(client.describeConsumerGroups(groupIds).all());
  332. }
  333. public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(String groupId) {
  334. return listConsumerGroupOffsets(groupId, new ListConsumerGroupOffsetsOptions());
  335. }
  336. public Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
  337. String groupId, List<TopicPartition> partitions) {
  338. return listConsumerGroupOffsets(groupId,
  339. new ListConsumerGroupOffsetsOptions().topicPartitions(partitions));
  340. }
  341. private Mono<Map<TopicPartition, Long>> listConsumerGroupOffsets(
  342. String groupId, ListConsumerGroupOffsetsOptions options) {
  343. return toMono(client.listConsumerGroupOffsets(groupId, options).partitionsToOffsetAndMetadata())
  344. .map(MapUtil::removeNullValues)
  345. .map(m -> m.entrySet().stream()
  346. .map(e -> Tuples.of(e.getKey(), e.getValue().offset()))
  347. .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)));
  348. }
  349. public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
  350. return toMono(client.alterConsumerGroupOffsets(
  351. groupId,
  352. offsets.entrySet().stream()
  353. .collect(toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))))
  354. .all());
  355. }
  356. public Mono<Map<TopicPartition, Long>> listOffsets(String topic,
  357. OffsetSpec offsetSpec) {
  358. return topicPartitions(topic).flatMap(tps -> listOffsets(tps, offsetSpec));
  359. }
  360. public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> partitions,
  361. OffsetSpec offsetSpec) {
  362. //TODO: need to split this into multiple calls if number of target partitions is big
  363. return toMono(
  364. client.listOffsets(partitions.stream().collect(toMap(tp -> tp, tp -> offsetSpec))).all())
  365. .map(offsets -> offsets.entrySet()
  366. .stream()
  367. // filtering partitions for which offsets were not found
  368. .filter(e -> e.getValue().offset() >= 0)
  369. .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())));
  370. }
  371. private Mono<Set<TopicPartition>> topicPartitions(String topic) {
  372. return toMono(client.describeTopics(List.of(topic)).all())
  373. .map(r -> r.values().stream()
  374. .findFirst()
  375. .stream()
  376. .flatMap(d -> d.partitions().stream())
  377. .map(p -> new TopicPartition(topic, p.partition()))
  378. .collect(Collectors.toSet())
  379. );
  380. }
  381. public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
  382. ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
  383. AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
  384. return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
  385. }
  386. public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
  387. var records = offsets.entrySet().stream()
  388. .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
  389. .collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
  390. return toMono(client.deleteRecords(records).all());
  391. }
  392. public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
  393. return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
  394. }
  395. private Mono<Void> incrementalAlterConfig(String topicName, Map<String, String> configs) {
  396. var config = configs.entrySet().stream()
  397. .flatMap(cfg -> Stream.of(
  398. new AlterConfigOp(
  399. new ConfigEntry(
  400. cfg.getKey(),
  401. cfg.getValue()),
  402. AlterConfigOp.OpType.SET)))
  403. .collect(toList());
  404. var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  405. return toMono(client.incrementalAlterConfigs(Map.of(topicResource, config)).all());
  406. }
  407. @SuppressWarnings("deprecation")
  408. private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
  409. List<ConfigEntry> configEntries = configs.entrySet().stream()
  410. .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
  411. .collect(toList());
  412. Config config = new Config(configEntries);
  413. var topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  414. return toMono(client.alterConfigs(Map.of(topicResource, config)).all());
  415. }
  416. /**
  417. * Splits input collection into batches, applies each batch sequentially to function
  418. * and merges output Monos into one Mono.
  419. */
  420. private static <R, I> Mono<R> partitionCalls(Collection<I> items,
  421. int partitionSize,
  422. Function<Collection<I>, Mono<R>> call,
  423. BiFunction<R, R, R> merger) {
  424. if (items.isEmpty()) {
  425. return call.apply(items);
  426. }
  427. Iterator<List<I>> parts = Iterators.partition(items.iterator(), partitionSize);
  428. Mono<R> mono = call.apply(parts.next());
  429. while (parts.hasNext()) {
  430. var nextPart = parts.next();
  431. // calls will be executed sequentially
  432. mono = mono.flatMap(res1 -> call.apply(nextPart).map(res2 -> merger.apply(res1, res2)));
  433. }
  434. return mono;
  435. }
  436. @Override
  437. public void close() {
  438. client.close();
  439. }
  440. }