KafkaService.java 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.IllegalEntityStateException;
  3. import com.provectus.kafka.ui.exception.InvalidRequestApiException;
  4. import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
  5. import com.provectus.kafka.ui.exception.NotFoundException;
  6. import com.provectus.kafka.ui.exception.TopicMetadataException;
  7. import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
  8. import com.provectus.kafka.ui.exception.ValidationException;
  9. import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
  10. import com.provectus.kafka.ui.model.CleanupPolicy;
  11. import com.provectus.kafka.ui.model.CreateTopicMessage;
  12. import com.provectus.kafka.ui.model.ExtendedAdminClient;
  13. import com.provectus.kafka.ui.model.InternalBrokerConfig;
  14. import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
  15. import com.provectus.kafka.ui.model.InternalBrokerMetrics;
  16. import com.provectus.kafka.ui.model.InternalClusterMetrics;
  17. import com.provectus.kafka.ui.model.InternalConsumerGroup;
  18. import com.provectus.kafka.ui.model.InternalPartition;
  19. import com.provectus.kafka.ui.model.InternalReplica;
  20. import com.provectus.kafka.ui.model.InternalSegmentSizeDto;
  21. import com.provectus.kafka.ui.model.InternalTopic;
  22. import com.provectus.kafka.ui.model.InternalTopicConfig;
  23. import com.provectus.kafka.ui.model.KafkaCluster;
  24. import com.provectus.kafka.ui.model.Metric;
  25. import com.provectus.kafka.ui.model.PartitionsIncrease;
  26. import com.provectus.kafka.ui.model.ReplicationFactorChange;
  27. import com.provectus.kafka.ui.model.ServerStatus;
  28. import com.provectus.kafka.ui.model.TopicCreation;
  29. import com.provectus.kafka.ui.model.TopicUpdate;
  30. import com.provectus.kafka.ui.serde.DeserializationService;
  31. import com.provectus.kafka.ui.serde.RecordSerDe;
  32. import com.provectus.kafka.ui.util.ClusterUtil;
  33. import com.provectus.kafka.ui.util.JmxClusterUtil;
  34. import com.provectus.kafka.ui.util.JmxMetricsName;
  35. import com.provectus.kafka.ui.util.JmxMetricsValueName;
  36. import java.math.BigDecimal;
  37. import java.util.ArrayList;
  38. import java.util.Collection;
  39. import java.util.Collections;
  40. import java.util.Comparator;
  41. import java.util.HashMap;
  42. import java.util.List;
  43. import java.util.LongSummaryStatistics;
  44. import java.util.Map;
  45. import java.util.Optional;
  46. import java.util.Properties;
  47. import java.util.UUID;
  48. import java.util.concurrent.CompletableFuture;
  49. import java.util.concurrent.ConcurrentHashMap;
  50. import java.util.stream.Collectors;
  51. import java.util.stream.Stream;
  52. import lombok.RequiredArgsConstructor;
  53. import lombok.Setter;
  54. import lombok.SneakyThrows;
  55. import lombok.extern.log4j.Log4j2;
  56. import org.apache.kafka.clients.admin.AdminClient;
  57. import org.apache.kafka.clients.admin.AdminClientConfig;
  58. import org.apache.kafka.clients.admin.AlterConfigOp;
  59. import org.apache.kafka.clients.admin.Config;
  60. import org.apache.kafka.clients.admin.ConfigEntry;
  61. import org.apache.kafka.clients.admin.ConsumerGroupListing;
  62. import org.apache.kafka.clients.admin.DescribeConfigsOptions;
  63. import org.apache.kafka.clients.admin.ListTopicsOptions;
  64. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  65. import org.apache.kafka.clients.admin.NewPartitions;
  66. import org.apache.kafka.clients.admin.NewTopic;
  67. import org.apache.kafka.clients.admin.RecordsToDelete;
  68. import org.apache.kafka.clients.consumer.ConsumerConfig;
  69. import org.apache.kafka.clients.consumer.KafkaConsumer;
  70. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  71. import org.apache.kafka.clients.producer.KafkaProducer;
  72. import org.apache.kafka.clients.producer.ProducerConfig;
  73. import org.apache.kafka.clients.producer.ProducerRecord;
  74. import org.apache.kafka.clients.producer.RecordMetadata;
  75. import org.apache.kafka.common.Node;
  76. import org.apache.kafka.common.TopicPartition;
  77. import org.apache.kafka.common.TopicPartitionReplica;
  78. import org.apache.kafka.common.config.ConfigResource;
  79. import org.apache.kafka.common.errors.InvalidRequestException;
  80. import org.apache.kafka.common.errors.LogDirNotFoundException;
  81. import org.apache.kafka.common.errors.TimeoutException;
  82. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  83. import org.apache.kafka.common.requests.DescribeLogDirsResponse;
  84. import org.apache.kafka.common.serialization.ByteArraySerializer;
  85. import org.apache.kafka.common.serialization.BytesDeserializer;
  86. import org.apache.kafka.common.utils.Bytes;
  87. import org.springframework.beans.factory.annotation.Value;
  88. import org.springframework.stereotype.Service;
  89. import reactor.core.publisher.Flux;
  90. import reactor.core.publisher.Mono;
  91. import reactor.util.function.Tuple2;
  92. import reactor.util.function.Tuple3;
  93. import reactor.util.function.Tuples;
  94. @Service
  95. @RequiredArgsConstructor
  96. @Log4j2
  97. public class KafkaService {
  98. private static final ListTopicsOptions LIST_TOPICS_OPTIONS =
  99. new ListTopicsOptions().listInternal(true);
  100. private final ZookeeperService zookeeperService;
  101. private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
  102. private final JmxClusterUtil jmxClusterUtil;
  103. private final ClustersStorage clustersStorage;
  104. private final DeserializationService deserializationService;
  105. @Setter // used in tests
  106. @Value("${kafka.admin-client-timeout}")
  107. private int clientTimeout;
  108. public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) {
  109. final Map<String, InternalTopic> topics = new HashMap<>(cluster.getTopics());
  110. topics.put(updatedTopic.getName(), updatedTopic);
  111. return cluster.toBuilder().topics(topics).build();
  112. }
  113. public KafkaCluster getUpdatedCluster(KafkaCluster cluster, String topicToDelete) {
  114. final Map<String, InternalTopic> topics = new HashMap<>(cluster.getTopics());
  115. topics.remove(topicToDelete);
  116. return cluster.toBuilder().topics(topics).build();
  117. }
  118. @SneakyThrows
  119. public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
  120. return getOrCreateAdminClient(cluster)
  121. .flatMap(
  122. ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap(
  123. version ->
  124. getClusterMetrics(ac.getAdminClient())
  125. .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
  126. .flatMap(clusterMetrics ->
  127. getTopicsData(ac.getAdminClient()).flatMap(it ->
  128. updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it)
  129. ).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto))
  130. )
  131. )
  132. ).onErrorResume(
  133. e -> Mono.just(cluster.toBuilder()
  134. .status(ServerStatus.OFFLINE)
  135. .lastKafkaException(e)
  136. .build())
  137. );
  138. }
  139. private KafkaCluster buildFromData(KafkaCluster currentCluster,
  140. String version,
  141. InternalSegmentSizeDto segmentSizeDto) {
  142. var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
  143. var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize();
  144. var brokersIds = new ArrayList<>(brokersMetrics.getInternalBrokerMetrics().keySet());
  145. InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
  146. brokersMetrics.toBuilder();
  147. InternalClusterMetrics topicsMetrics = collectTopicsMetrics(topics);
  148. ServerStatus zookeeperStatus = ServerStatus.OFFLINE;
  149. Throwable zookeeperException = null;
  150. try {
  151. zookeeperStatus = zookeeperService.isZookeeperOnline(currentCluster)
  152. ? ServerStatus.ONLINE
  153. : ServerStatus.OFFLINE;
  154. } catch (Throwable e) {
  155. zookeeperException = e;
  156. }
  157. InternalClusterMetrics clusterMetrics = metricsBuilder
  158. .activeControllers(brokersMetrics.getActiveControllers())
  159. .topicCount(topicsMetrics.getTopicCount())
  160. .brokerCount(brokersMetrics.getBrokerCount())
  161. .underReplicatedPartitionCount(topicsMetrics.getUnderReplicatedPartitionCount())
  162. .inSyncReplicasCount(topicsMetrics.getInSyncReplicasCount())
  163. .outOfSyncReplicasCount(topicsMetrics.getOutOfSyncReplicasCount())
  164. .onlinePartitionCount(topicsMetrics.getOnlinePartitionCount())
  165. .offlinePartitionCount(topicsMetrics.getOfflinePartitionCount())
  166. .zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zookeeperStatus))
  167. .version(version)
  168. .build();
  169. return currentCluster.toBuilder()
  170. .version(version)
  171. .status(ServerStatus.ONLINE)
  172. .zookeeperStatus(zookeeperStatus)
  173. .lastZookeeperException(zookeeperException)
  174. .lastKafkaException(null)
  175. .metrics(clusterMetrics)
  176. .topics(topics)
  177. .brokers(brokersIds)
  178. .build();
  179. }
  180. private InternalClusterMetrics collectTopicsMetrics(Map<String, InternalTopic> topics) {
  181. int underReplicatedPartitions = 0;
  182. int inSyncReplicasCount = 0;
  183. int outOfSyncReplicasCount = 0;
  184. int onlinePartitionCount = 0;
  185. int offlinePartitionCount = 0;
  186. for (InternalTopic topic : topics.values()) {
  187. underReplicatedPartitions += topic.getUnderReplicatedPartitions();
  188. inSyncReplicasCount += topic.getInSyncReplicas();
  189. outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas());
  190. onlinePartitionCount +=
  191. topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1)
  192. .sum();
  193. offlinePartitionCount +=
  194. topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1)
  195. .sum();
  196. }
  197. return InternalClusterMetrics.builder()
  198. .underReplicatedPartitionCount(underReplicatedPartitions)
  199. .inSyncReplicasCount(inSyncReplicasCount)
  200. .outOfSyncReplicasCount(outOfSyncReplicasCount)
  201. .onlinePartitionCount(onlinePartitionCount)
  202. .offlinePartitionCount(offlinePartitionCount)
  203. .topicCount(topics.size())
  204. .build();
  205. }
  206. private Map<String, InternalTopic> mergeWithConfigs(
  207. List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
  208. return topics.stream()
  209. .map(t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build())
  210. .map(t -> t.toBuilder().cleanUpPolicy(
  211. CleanupPolicy.fromString(t.getTopicConfigs().stream()
  212. .filter(config -> config.getName().equals("cleanup.policy"))
  213. .findFirst()
  214. .orElseGet(() -> InternalTopicConfig.builder().value("unknown").build())
  215. .getValue())).build())
  216. .collect(Collectors.toMap(
  217. InternalTopic::getName,
  218. e -> e
  219. ));
  220. }
  221. @SneakyThrows
  222. private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
  223. return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names())
  224. .flatMap(topics -> getTopicsData(adminClient, topics).collectList());
  225. }
  226. private Flux<InternalTopic> getTopicsData(AdminClient adminClient, Collection<String> topics) {
  227. final Mono<Map<String, List<InternalTopicConfig>>> configsMono =
  228. loadTopicsConfig(adminClient, topics);
  229. return ClusterUtil.toMono(adminClient.describeTopics(topics).all())
  230. .map(m -> m.values().stream()
  231. .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()))
  232. .flatMap(internalTopics -> configsMono
  233. .map(configs -> mergeWithConfigs(internalTopics, configs).values()))
  234. .flatMapMany(Flux::fromIterable);
  235. }
  236. private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
  237. return ClusterUtil.toMono(client.describeCluster().nodes())
  238. .flatMap(brokers ->
  239. ClusterUtil.toMono(client.describeCluster().controller()).map(
  240. c -> {
  241. InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
  242. InternalClusterMetrics.builder();
  243. metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
  244. return metricsBuilder.build();
  245. }
  246. )
  247. );
  248. }
  249. @SneakyThrows
  250. private Mono<String> createTopic(AdminClient adminClient, NewTopic newTopic) {
  251. return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(),
  252. newTopic.name());
  253. }
  254. @SneakyThrows
  255. public Mono<InternalTopic> createTopic(AdminClient adminClient,
  256. Mono<TopicCreation> topicCreation) {
  257. return topicCreation.flatMap(
  258. topicData -> {
  259. NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(),
  260. topicData.getReplicationFactor().shortValue());
  261. newTopic.configs(topicData.getConfigs());
  262. return createTopic(adminClient, newTopic).map(v -> topicData);
  263. })
  264. .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage())))
  265. .flatMap(
  266. topicData ->
  267. getTopicsData(adminClient, Collections.singleton(topicData.getName()))
  268. .next()
  269. ).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
  270. .flatMap(t ->
  271. loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
  272. .map(c -> mergeWithConfigs(Collections.singletonList(t), c))
  273. .map(m -> m.values().iterator().next())
  274. );
  275. }
  276. public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicCreation> topicCreation) {
  277. return getOrCreateAdminClient(cluster)
  278. .flatMap(ac -> createTopic(ac.getAdminClient(), topicCreation));
  279. }
  280. public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
  281. return getOrCreateAdminClient(cluster)
  282. .map(ExtendedAdminClient::getAdminClient)
  283. .map(adminClient -> adminClient.deleteTopics(List.of(topicName)))
  284. .then();
  285. }
  286. @SneakyThrows
  287. public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
  288. return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
  289. .switchIfEmpty(createAdminClient(cluster))
  290. .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));
  291. }
  292. public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
  293. return Mono.fromSupplier(() -> {
  294. Properties properties = new Properties();
  295. properties.putAll(kafkaCluster.getProperties());
  296. properties
  297. .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
  298. properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
  299. return AdminClient.create(properties);
  300. }).flatMap(ExtendedAdminClient::extendedAdminClient);
  301. }
  302. @SneakyThrows
  303. private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(
  304. AdminClient adminClient, Collection<String> topicNames) {
  305. List<ConfigResource> resources = topicNames.stream()
  306. .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
  307. .collect(Collectors.toList());
  308. return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
  309. .map(configs ->
  310. configs.entrySet().stream().collect(Collectors.toMap(
  311. c -> c.getKey().name(),
  312. c -> c.getValue().entries().stream()
  313. .map(ClusterUtil::mapToInternalTopicConfig)
  314. .collect(Collectors.toList()))));
  315. }
  316. private Mono<Map<String, List<InternalBrokerConfig>>> loadBrokersConfig(
  317. AdminClient adminClient, List<Integer> brokersIds) {
  318. List<ConfigResource> resources = brokersIds.stream()
  319. .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
  320. .collect(Collectors.toList());
  321. return ClusterUtil.toMono(adminClient.describeConfigs(resources,
  322. new DescribeConfigsOptions().includeSynonyms(true)).all())
  323. .map(configs ->
  324. configs.entrySet().stream().collect(Collectors.toMap(
  325. c -> c.getKey().name(),
  326. c -> c.getValue().entries().stream()
  327. .map(ClusterUtil::mapToInternalBrokerConfig)
  328. .collect(Collectors.toList()))));
  329. }
  330. private Mono<List<InternalBrokerConfig>> loadBrokersConfig(
  331. AdminClient adminClient, Integer brokerId) {
  332. return loadBrokersConfig(adminClient, Collections.singletonList(brokerId))
  333. .map(map -> map.values().stream()
  334. .findFirst()
  335. .orElseThrow(() -> new IllegalEntityStateException(
  336. String.format("Config for broker %s not found", brokerId))));
  337. }
  338. public Mono<List<InternalBrokerConfig>> getBrokerConfigs(KafkaCluster cluster, Integer brokerId) {
  339. return getOrCreateAdminClient(cluster)
  340. .flatMap(adminClient -> {
  341. if (!cluster.getBrokers().contains(brokerId)) {
  342. return Mono.error(
  343. new NotFoundException(String.format("Broker with id %s not found", brokerId)));
  344. }
  345. return loadBrokersConfig(adminClient.getAdminClient(), brokerId);
  346. });
  347. }
  348. public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
  349. KafkaCluster cluster) {
  350. return getOrCreateAdminClient(cluster).flatMap(ac ->
  351. ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
  352. .flatMap(s ->
  353. getConsumerGroupsInternal(
  354. cluster,
  355. s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()))
  356. )
  357. );
  358. }
  359. public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
  360. KafkaCluster cluster, List<String> groupIds) {
  361. return getOrCreateAdminClient(cluster).flatMap(ac ->
  362. ClusterUtil.toMono(
  363. ac.getAdminClient().describeConsumerGroups(groupIds).all()
  364. ).map(Map::values)
  365. ).flatMap(descriptions ->
  366. Flux.fromIterable(descriptions)
  367. .parallel()
  368. .flatMap(d ->
  369. groupMetadata(cluster, d.groupId())
  370. .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets))
  371. )
  372. .sequential()
  373. .collectList()
  374. );
  375. }
  376. public Mono<List<InternalConsumerGroup>> getConsumerGroups(
  377. KafkaCluster cluster, Optional<String> topic, List<String> groupIds) {
  378. final Mono<List<InternalConsumerGroup>> consumerGroups;
  379. if (groupIds.isEmpty()) {
  380. consumerGroups = getConsumerGroupsInternal(cluster);
  381. } else {
  382. consumerGroups = getConsumerGroupsInternal(cluster, groupIds);
  383. }
  384. return consumerGroups.map(c ->
  385. c.stream()
  386. .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic))
  387. .filter(Optional::isPresent)
  388. .map(Optional::get)
  389. .map(g ->
  390. g.toBuilder().endOffsets(
  391. topicPartitionsEndOffsets(cluster, g.getOffsets().keySet())
  392. ).build()
  393. )
  394. .collect(Collectors.toList())
  395. );
  396. }
  397. public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
  398. String consumerGroupId) {
  399. return getOrCreateAdminClient(cluster).map(ac ->
  400. ac.getAdminClient()
  401. .listConsumerGroupOffsets(consumerGroupId)
  402. .partitionsToOffsetAndMetadata()
  403. ).flatMap(ClusterUtil::toMono);
  404. }
  405. public Map<TopicPartition, Long> topicPartitionsEndOffsets(
  406. KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
  407. try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
  408. return consumer.endOffsets(topicPartitions);
  409. }
  410. }
  411. public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
  412. return createConsumer(cluster, Map.of());
  413. }
  414. public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
  415. Map<String, Object> properties) {
  416. Properties props = new Properties();
  417. props.putAll(cluster.getProperties());
  418. props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID());
  419. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  420. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
  421. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
  422. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  423. props.putAll(properties);
  424. return new KafkaConsumer<>(props);
  425. }
  426. @SneakyThrows
  427. public Mono<InternalTopic> updateTopic(KafkaCluster cluster, String topicName,
  428. TopicUpdate topicUpdate) {
  429. ConfigResource topicCr = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  430. return getOrCreateAdminClient(cluster)
  431. .flatMap(ac -> {
  432. if (ac.getSupportedFeatures()
  433. .contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
  434. return incrementalAlterConfig(topicUpdate, topicCr, ac)
  435. .flatMap(c -> getUpdatedTopic(ac, topicName));
  436. } else {
  437. return alterConfig(topicUpdate, topicCr, ac)
  438. .flatMap(c -> getUpdatedTopic(ac, topicName));
  439. }
  440. });
  441. }
  442. private Mono<InternalTopic> getUpdatedTopic(ExtendedAdminClient ac, String topicName) {
  443. return getTopicsData(ac.getAdminClient())
  444. .map(s -> s.stream()
  445. .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow());
  446. }
  447. private Mono<String> incrementalAlterConfig(TopicUpdate topicUpdate, ConfigResource topicCr,
  448. ExtendedAdminClient ac) {
  449. List<AlterConfigOp> listOp = topicUpdate.getConfigs().entrySet().stream()
  450. .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()),
  451. AlterConfigOp.OpType.SET))).collect(Collectors.toList());
  452. return ClusterUtil.toMono(
  453. ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCr, listOp))
  454. .all(), topicCr.name());
  455. }
  456. @SuppressWarnings("deprecation")
  457. private Mono<String> alterConfig(TopicUpdate topicUpdate, ConfigResource topicCr,
  458. ExtendedAdminClient ac) {
  459. List<ConfigEntry> configEntries = topicUpdate.getConfigs().entrySet().stream()
  460. .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
  461. .collect(Collectors.toList());
  462. Config config = new Config(configEntries);
  463. Map<ConfigResource, Config> map = Collections.singletonMap(topicCr, config);
  464. return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCr.name());
  465. }
  466. private InternalTopic mergeWithStats(InternalTopic topic,
  467. Map<String, LongSummaryStatistics> topics,
  468. Map<TopicPartition, LongSummaryStatistics> partitions) {
  469. final LongSummaryStatistics stats = topics.get(topic.getName());
  470. return topic.toBuilder()
  471. .segmentSize(stats.getSum())
  472. .segmentCount(stats.getCount())
  473. .partitions(
  474. topic.getPartitions().entrySet().stream().map(e ->
  475. Tuples.of(e.getKey(), mergeWithStats(topic.getName(), e.getValue(), partitions))
  476. ).collect(Collectors.toMap(
  477. Tuple2::getT1,
  478. Tuple2::getT2
  479. ))
  480. ).build();
  481. }
  482. private InternalPartition mergeWithStats(String topic, InternalPartition partition,
  483. Map<TopicPartition, LongSummaryStatistics> partitions) {
  484. final LongSummaryStatistics stats =
  485. partitions.get(new TopicPartition(topic, partition.getPartition()));
  486. return partition.toBuilder()
  487. .segmentSize(stats.getSum())
  488. .segmentCount(stats.getCount())
  489. .build();
  490. }
  491. private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac,
  492. InternalClusterMetrics clusterMetrics,
  493. List<InternalTopic> internalTopics) {
  494. List<String> names =
  495. internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList());
  496. return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic ->
  497. ClusterUtil.toMono(ac.describeCluster().nodes()).flatMap(nodes ->
  498. ClusterUtil.toMono(
  499. ac.describeLogDirs(nodes.stream().map(Node::id).collect(Collectors.toList())).all())
  500. .map(log -> {
  501. final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
  502. log.entrySet().stream().flatMap(b ->
  503. b.getValue().entrySet().stream().flatMap(topicMap ->
  504. topicMap.getValue().replicaInfos.entrySet().stream()
  505. .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
  506. )
  507. ).collect(Collectors.toList());
  508. final Map<TopicPartition, LongSummaryStatistics> partitionStats =
  509. topicPartitions.stream().collect(
  510. Collectors.groupingBy(
  511. Tuple2::getT2,
  512. Collectors.summarizingLong(Tuple3::getT3)
  513. )
  514. );
  515. final Map<String, LongSummaryStatistics> topicStats =
  516. topicPartitions.stream().collect(
  517. Collectors.groupingBy(
  518. t -> t.getT2().topic(),
  519. Collectors.summarizingLong(Tuple3::getT3)
  520. )
  521. );
  522. final Map<Integer, LongSummaryStatistics> brokerStats =
  523. topicPartitions.stream().collect(
  524. Collectors.groupingBy(
  525. Tuple2::getT1,
  526. Collectors.summarizingLong(Tuple3::getT3)
  527. )
  528. );
  529. final LongSummaryStatistics summary =
  530. topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3));
  531. final Map<String, InternalTopic> resultTopics = internalTopics.stream().map(e ->
  532. Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats))
  533. ).collect(Collectors.toMap(
  534. Tuple2::getT1,
  535. Tuple2::getT2
  536. ));
  537. final Map<Integer, InternalBrokerDiskUsage> resultBrokers =
  538. brokerStats.entrySet().stream().map(e ->
  539. Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder()
  540. .segmentSize(e.getValue().getSum())
  541. .segmentCount(e.getValue().getCount())
  542. .build()
  543. )
  544. ).collect(Collectors.toMap(
  545. Tuple2::getT1,
  546. Tuple2::getT2
  547. ));
  548. return InternalSegmentSizeDto.builder()
  549. .clusterMetricsWithSegmentSize(
  550. clusterMetrics.toBuilder()
  551. .segmentSize(summary.getSum())
  552. .segmentCount(summary.getCount())
  553. .internalBrokerDiskUsage(resultBrokers)
  554. .build()
  555. )
  556. .internalTopicWithSegmentSize(resultTopics).build();
  557. })
  558. )
  559. );
  560. }
  561. public List<Metric> getJmxMetric(String clusterName, Node node) {
  562. return clustersStorage.getClusterByName(clusterName)
  563. .filter(c -> c.getJmxPort() != null)
  564. .filter(c -> c.getJmxPort() > 0)
  565. .map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host()))
  566. .orElse(Collections.emptyList());
  567. }
  568. private Mono<InternalClusterMetrics> fillJmxMetrics(InternalClusterMetrics internalClusterMetrics,
  569. String clusterName, AdminClient ac) {
  570. return fillBrokerMetrics(internalClusterMetrics, clusterName, ac)
  571. .map(this::calculateClusterMetrics);
  572. }
  573. private Mono<InternalClusterMetrics> fillBrokerMetrics(
  574. InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) {
  575. return ClusterUtil.toMono(ac.describeCluster().nodes())
  576. .flatMapIterable(nodes -> nodes)
  577. .map(broker ->
  578. Map.of(broker.id(), InternalBrokerMetrics.builder()
  579. .metrics(getJmxMetric(clusterName, broker)).build())
  580. )
  581. .collectList()
  582. .map(s -> internalClusterMetrics.toBuilder()
  583. .internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build());
  584. }
  585. private InternalClusterMetrics calculateClusterMetrics(
  586. InternalClusterMetrics internalClusterMetrics) {
  587. final List<Metric> metrics = internalClusterMetrics.getInternalBrokerMetrics().values().stream()
  588. .flatMap(b -> b.getMetrics().stream())
  589. .collect(
  590. Collectors.groupingBy(
  591. Metric::getCanonicalName,
  592. Collectors.reducing(jmxClusterUtil::reduceJmxMetrics)
  593. )
  594. ).values().stream()
  595. .filter(Optional::isPresent)
  596. .map(Optional::get)
  597. .collect(Collectors.toList());
  598. final InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
  599. internalClusterMetrics.toBuilder().metrics(metrics);
  600. metricsBuilder.bytesInPerSec(findTopicMetrics(
  601. metrics, JmxMetricsName.BytesInPerSec, JmxMetricsValueName.FiveMinuteRate
  602. ));
  603. metricsBuilder.bytesOutPerSec(findTopicMetrics(
  604. metrics, JmxMetricsName.BytesOutPerSec, JmxMetricsValueName.FiveMinuteRate
  605. ));
  606. return metricsBuilder.build();
  607. }
  608. private Map<String, BigDecimal> findTopicMetrics(List<Metric> metrics, JmxMetricsName metricsName,
  609. JmxMetricsValueName valueName) {
  610. return metrics.stream().filter(m -> metricsName.name().equals(m.getName()))
  611. .filter(m -> m.getParams().containsKey("topic"))
  612. .filter(m -> m.getValue().containsKey(valueName.name()))
  613. .map(m -> Tuples.of(
  614. m.getParams().get("topic"),
  615. m.getValue().get(valueName.name())
  616. )).collect(Collectors.groupingBy(
  617. Tuple2::getT1,
  618. Collectors.reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add)
  619. ));
  620. }
  621. public Map<Integer, InternalPartition> getTopicPartitions(KafkaCluster c, InternalTopic topic) {
  622. var tps = topic.getPartitions().values().stream()
  623. .map(t -> new TopicPartition(topic.getName(), t.getPartition()))
  624. .collect(Collectors.toList());
  625. Map<Integer, InternalPartition> partitions =
  626. topic.getPartitions().values().stream().collect(Collectors.toMap(
  627. InternalPartition::getPartition,
  628. tp -> tp
  629. ));
  630. try (var consumer = createConsumer(c)) {
  631. final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(tps);
  632. final Map<TopicPartition, Long> latest = consumer.endOffsets(tps);
  633. return tps.stream()
  634. .map(tp -> partitions.get(tp.partition()).toBuilder()
  635. .offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L))
  636. .offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L))
  637. .build()
  638. ).collect(Collectors.toMap(
  639. InternalPartition::getPartition,
  640. tp -> tp
  641. ));
  642. } catch (Exception e) {
  643. return Collections.emptyMap();
  644. }
  645. }
  646. public Mono<Void> deleteTopicMessages(KafkaCluster cluster, Map<TopicPartition, Long> offsets) {
  647. var records = offsets.entrySet().stream()
  648. .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
  649. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  650. return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient)
  651. .map(ac -> ac.deleteRecords(records)).then();
  652. }
  653. public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
  654. CreateTopicMessage msg) {
  655. RecordSerDe serde =
  656. deserializationService.getRecordDeserializerForCluster(cluster);
  657. Properties properties = new Properties();
  658. properties.putAll(cluster.getProperties());
  659. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  660. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  661. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  662. try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
  663. final ProducerRecord<byte[], byte[]> producerRecord = serde.serialize(
  664. topic,
  665. msg.getKey(),
  666. msg.getContent(),
  667. msg.getPartition()
  668. );
  669. CompletableFuture<RecordMetadata> cf = new CompletableFuture<>();
  670. producer.send(producerRecord, (metadata, exception) -> {
  671. if (exception != null) {
  672. cf.completeExceptionally(exception);
  673. } else {
  674. cf.complete(metadata);
  675. }
  676. });
  677. return Mono.fromFuture(cf);
  678. }
  679. }
  680. private Mono<InternalTopic> increaseTopicPartitions(AdminClient adminClient,
  681. String topicName,
  682. Map<String, NewPartitions> newPartitionsMap
  683. ) {
  684. return ClusterUtil.toMono(adminClient.createPartitions(newPartitionsMap).all(), topicName)
  685. .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
  686. }
  687. public Mono<InternalTopic> increaseTopicPartitions(
  688. KafkaCluster cluster,
  689. String topicName,
  690. PartitionsIncrease partitionsIncrease) {
  691. return getOrCreateAdminClient(cluster)
  692. .flatMap(ac -> {
  693. Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount();
  694. Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
  695. if (requestedCount < actualCount) {
  696. return Mono.error(
  697. new ValidationException(String.format(
  698. "Topic currently has %s partitions, which is higher than the requested %s.",
  699. actualCount, requestedCount)));
  700. }
  701. if (requestedCount.equals(actualCount)) {
  702. return Mono.error(
  703. new ValidationException(
  704. String.format("Topic already has %s partitions.", actualCount)));
  705. }
  706. Map<String, NewPartitions> newPartitionsMap = Collections.singletonMap(
  707. topicName,
  708. NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount())
  709. );
  710. return increaseTopicPartitions(ac.getAdminClient(), topicName, newPartitionsMap);
  711. });
  712. }
  713. private Mono<InternalTopic> changeReplicationFactor(
  714. AdminClient adminClient,
  715. String topicName,
  716. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments
  717. ) {
  718. return ClusterUtil.toMono(adminClient
  719. .alterPartitionReassignments(reassignments).all(), topicName)
  720. .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
  721. }
  722. /**
  723. * Change topic replication factor, works on brokers versions 5.4.x and higher
  724. */
  725. public Mono<InternalTopic> changeReplicationFactor(
  726. KafkaCluster cluster,
  727. String topicName,
  728. ReplicationFactorChange replicationFactorChange) {
  729. return getOrCreateAdminClient(cluster)
  730. .flatMap(ac -> {
  731. Integer actual = cluster.getTopics().get(topicName).getReplicationFactor();
  732. Integer requested = replicationFactorChange.getTotalReplicationFactor();
  733. Integer brokersCount = cluster.getMetrics().getBrokerCount();
  734. if (requested.equals(actual)) {
  735. return Mono.error(
  736. new ValidationException(
  737. String.format("Topic already has replicationFactor %s.", actual)));
  738. }
  739. if (requested > brokersCount) {
  740. return Mono.error(
  741. new ValidationException(
  742. String.format("Requested replication factor %s more than brokers count %s.",
  743. requested, brokersCount)));
  744. }
  745. return changeReplicationFactor(ac.getAdminClient(), topicName,
  746. getPartitionsReassignments(cluster, topicName,
  747. replicationFactorChange));
  748. });
  749. }
  750. public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(
  751. KafkaCluster cluster, List<Integer> reqBrokers) {
  752. return getOrCreateAdminClient(cluster)
  753. .map(admin -> {
  754. List<Integer> brokers = new ArrayList<>(cluster.getBrokers());
  755. if (reqBrokers != null && !reqBrokers.isEmpty()) {
  756. brokers.retainAll(reqBrokers);
  757. }
  758. return admin.getAdminClient().describeLogDirs(brokers);
  759. })
  760. .flatMap(result -> ClusterUtil.toMono(result.all()))
  761. .onErrorResume(TimeoutException.class, (TimeoutException e) -> {
  762. log.error("Error during fetching log dirs", e);
  763. return Mono.just(new HashMap<>());
  764. });
  765. }
  766. private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsReassignments(
  767. KafkaCluster cluster,
  768. String topicName,
  769. ReplicationFactorChange replicationFactorChange) {
  770. // Current assignment map (Partition number -> List of brokers)
  771. Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(cluster, topicName);
  772. // Brokers map (Broker id -> count)
  773. Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment);
  774. int currentReplicationFactor = cluster.getTopics().get(topicName).getReplicationFactor();
  775. // If we should to increase Replication factor
  776. if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
  777. // For each partition
  778. for (var assignmentList : currentAssignment.values()) {
  779. // Get brokers list sorted by usage
  780. var brokers = brokersUsage.entrySet().stream()
  781. .sorted(Map.Entry.comparingByValue())
  782. .map(Map.Entry::getKey)
  783. .collect(Collectors.toList());
  784. // Iterate brokers and try to add them in assignment
  785. // while (partition replicas count != requested replication factor)
  786. for (Integer broker : brokers) {
  787. if (!assignmentList.contains(broker)) {
  788. assignmentList.add(broker);
  789. brokersUsage.merge(broker, 1, Integer::sum);
  790. }
  791. if (assignmentList.size() == replicationFactorChange.getTotalReplicationFactor()) {
  792. break;
  793. }
  794. }
  795. if (assignmentList.size() != replicationFactorChange.getTotalReplicationFactor()) {
  796. throw new ValidationException("Something went wrong during adding replicas");
  797. }
  798. }
  799. // If we should to decrease Replication factor
  800. } else if (replicationFactorChange.getTotalReplicationFactor() < currentReplicationFactor) {
  801. for (Map.Entry<Integer, List<Integer>> assignmentEntry : currentAssignment.entrySet()) {
  802. var partition = assignmentEntry.getKey();
  803. var brokers = assignmentEntry.getValue();
  804. // Get brokers list sorted by usage in reverse order
  805. var brokersUsageList = brokersUsage.entrySet().stream()
  806. .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
  807. .map(Map.Entry::getKey)
  808. .collect(Collectors.toList());
  809. // Iterate brokers and try to remove them from assignment
  810. // while (partition replicas count != requested replication factor)
  811. for (Integer broker : brokersUsageList) {
  812. // Check is the broker the leader of partition
  813. if (!cluster.getTopics().get(topicName).getPartitions().get(partition).getLeader()
  814. .equals(broker)) {
  815. brokers.remove(broker);
  816. brokersUsage.merge(broker, -1, Integer::sum);
  817. }
  818. if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) {
  819. break;
  820. }
  821. }
  822. if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) {
  823. throw new ValidationException("Something went wrong during removing replicas");
  824. }
  825. }
  826. } else {
  827. throw new ValidationException("Replication factor already equals requested");
  828. }
  829. // Return result map
  830. return currentAssignment.entrySet().stream().collect(Collectors.toMap(
  831. e -> new TopicPartition(topicName, e.getKey()),
  832. e -> Optional.of(new NewPartitionReassignment(e.getValue()))
  833. ));
  834. }
  835. private Map<Integer, List<Integer>> getCurrentAssignment(KafkaCluster cluster, String topicName) {
  836. return cluster.getTopics().get(topicName).getPartitions().values().stream()
  837. .collect(Collectors.toMap(
  838. InternalPartition::getPartition,
  839. p -> p.getReplicas().stream()
  840. .map(InternalReplica::getBroker)
  841. .collect(Collectors.toList())
  842. ));
  843. }
  844. private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
  845. Map<Integer, List<Integer>> currentAssignment) {
  846. Map<Integer, Integer> result = cluster.getBrokers().stream()
  847. .collect(Collectors.toMap(
  848. c -> c,
  849. c -> 0
  850. ));
  851. currentAssignment.values().forEach(brokers -> brokers
  852. .forEach(broker -> result.put(broker, result.get(broker) + 1)));
  853. return result;
  854. }
  855. public Mono<Void> updateBrokerLogDir(KafkaCluster cluster, Integer broker,
  856. BrokerLogdirUpdate brokerLogDir) {
  857. return getOrCreateAdminClient(cluster)
  858. .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker));
  859. }
  860. private Mono<Void> updateBrokerLogDir(ExtendedAdminClient adminMono,
  861. BrokerLogdirUpdate b,
  862. Integer broker) {
  863. Map<TopicPartitionReplica, String> req = Map.of(
  864. new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker),
  865. b.getLogDir());
  866. return Mono.just(adminMono)
  867. .map(admin -> admin.getAdminClient().alterReplicaLogDirs(req))
  868. .flatMap(result -> ClusterUtil.toMono(result.all()))
  869. .onErrorResume(UnknownTopicOrPartitionException.class,
  870. e -> Mono.error(new TopicOrPartitionNotFoundException()))
  871. .onErrorResume(LogDirNotFoundException.class,
  872. e -> Mono.error(new LogDirNotFoundApiException()))
  873. .doOnError(log::error);
  874. }
  875. public Mono<Void> updateBrokerConfigByName(KafkaCluster cluster,
  876. Integer broker,
  877. String name,
  878. String value) {
  879. return getOrCreateAdminClient(cluster)
  880. .flatMap(ac -> updateBrokerConfigByName(ac, broker, name, value));
  881. }
  882. private Mono<Void> updateBrokerConfigByName(ExtendedAdminClient admin,
  883. Integer broker,
  884. String name,
  885. String value) {
  886. ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(broker));
  887. AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
  888. return Mono.just(admin)
  889. .map(a -> a.getAdminClient().incrementalAlterConfigs(Map.of(cr, List.of(op))))
  890. .flatMap(result -> ClusterUtil.toMono(result.all()))
  891. .onErrorResume(InvalidRequestException.class,
  892. e -> Mono.error(new InvalidRequestApiException(e.getMessage())))
  893. .doOnError(log::error);
  894. }
  895. }