KafkaService.java 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.exception.TopicMetadataException;
  3. import com.provectus.kafka.ui.exception.ValidationException;
  4. import com.provectus.kafka.ui.model.CleanupPolicy;
  5. import com.provectus.kafka.ui.model.CreateTopicMessage;
  6. import com.provectus.kafka.ui.model.ExtendedAdminClient;
  7. import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
  8. import com.provectus.kafka.ui.model.InternalBrokerMetrics;
  9. import com.provectus.kafka.ui.model.InternalClusterMetrics;
  10. import com.provectus.kafka.ui.model.InternalConsumerGroup;
  11. import com.provectus.kafka.ui.model.InternalPartition;
  12. import com.provectus.kafka.ui.model.InternalReplica;
  13. import com.provectus.kafka.ui.model.InternalSegmentSizeDto;
  14. import com.provectus.kafka.ui.model.InternalTopic;
  15. import com.provectus.kafka.ui.model.InternalTopicConfig;
  16. import com.provectus.kafka.ui.model.KafkaCluster;
  17. import com.provectus.kafka.ui.model.Metric;
  18. import com.provectus.kafka.ui.model.PartitionsIncrease;
  19. import com.provectus.kafka.ui.model.ReplicationFactorChange;
  20. import com.provectus.kafka.ui.model.ServerStatus;
  21. import com.provectus.kafka.ui.model.TopicCreation;
  22. import com.provectus.kafka.ui.model.TopicUpdate;
  23. import com.provectus.kafka.ui.serde.DeserializationService;
  24. import com.provectus.kafka.ui.serde.RecordSerDe;
  25. import com.provectus.kafka.ui.util.ClusterUtil;
  26. import com.provectus.kafka.ui.util.JmxClusterUtil;
  27. import com.provectus.kafka.ui.util.JmxMetricsName;
  28. import com.provectus.kafka.ui.util.JmxMetricsValueName;
  29. import java.math.BigDecimal;
  30. import java.util.ArrayList;
  31. import java.util.Collection;
  32. import java.util.Collections;
  33. import java.util.Comparator;
  34. import java.util.HashMap;
  35. import java.util.List;
  36. import java.util.LongSummaryStatistics;
  37. import java.util.Map;
  38. import java.util.Optional;
  39. import java.util.Properties;
  40. import java.util.UUID;
  41. import java.util.concurrent.CompletableFuture;
  42. import java.util.concurrent.ConcurrentHashMap;
  43. import java.util.stream.Collectors;
  44. import java.util.stream.Stream;
  45. import lombok.RequiredArgsConstructor;
  46. import lombok.Setter;
  47. import lombok.SneakyThrows;
  48. import lombok.extern.log4j.Log4j2;
  49. import org.apache.kafka.clients.admin.AdminClient;
  50. import org.apache.kafka.clients.admin.AdminClientConfig;
  51. import org.apache.kafka.clients.admin.AlterConfigOp;
  52. import org.apache.kafka.clients.admin.Config;
  53. import org.apache.kafka.clients.admin.ConfigEntry;
  54. import org.apache.kafka.clients.admin.ConsumerGroupListing;
  55. import org.apache.kafka.clients.admin.ListTopicsOptions;
  56. import org.apache.kafka.clients.admin.NewPartitionReassignment;
  57. import org.apache.kafka.clients.admin.NewPartitions;
  58. import org.apache.kafka.clients.admin.NewTopic;
  59. import org.apache.kafka.clients.admin.RecordsToDelete;
  60. import org.apache.kafka.clients.consumer.ConsumerConfig;
  61. import org.apache.kafka.clients.consumer.KafkaConsumer;
  62. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  63. import org.apache.kafka.clients.producer.KafkaProducer;
  64. import org.apache.kafka.clients.producer.ProducerConfig;
  65. import org.apache.kafka.clients.producer.ProducerRecord;
  66. import org.apache.kafka.clients.producer.RecordMetadata;
  67. import org.apache.kafka.common.Node;
  68. import org.apache.kafka.common.TopicPartition;
  69. import org.apache.kafka.common.config.ConfigResource;
  70. import org.apache.kafka.common.serialization.ByteArraySerializer;
  71. import org.apache.kafka.common.serialization.BytesDeserializer;
  72. import org.apache.kafka.common.utils.Bytes;
  73. import org.springframework.beans.factory.annotation.Value;
  74. import org.springframework.stereotype.Service;
  75. import reactor.core.publisher.Flux;
  76. import reactor.core.publisher.Mono;
  77. import reactor.util.function.Tuple2;
  78. import reactor.util.function.Tuple3;
  79. import reactor.util.function.Tuples;
  80. @Service
  81. @RequiredArgsConstructor
  82. @Log4j2
  83. public class KafkaService {
  84. private static final ListTopicsOptions LIST_TOPICS_OPTIONS =
  85. new ListTopicsOptions().listInternal(true);
  86. private final ZookeeperService zookeeperService;
  87. private final Map<String, ExtendedAdminClient> adminClientCache = new ConcurrentHashMap<>();
  88. private final JmxClusterUtil jmxClusterUtil;
  89. private final ClustersStorage clustersStorage;
  90. private final DeserializationService deserializationService;
  91. @Setter // used in tests
  92. @Value("${kafka.admin-client-timeout}")
  93. private int clientTimeout;
  94. public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) {
  95. final Map<String, InternalTopic> topics = new HashMap<>(cluster.getTopics());
  96. topics.put(updatedTopic.getName(), updatedTopic);
  97. return cluster.toBuilder().topics(topics).build();
  98. }
  99. public KafkaCluster getUpdatedCluster(KafkaCluster cluster, String topicToDelete) {
  100. final Map<String, InternalTopic> topics = new HashMap<>(cluster.getTopics());
  101. topics.remove(topicToDelete);
  102. return cluster.toBuilder().topics(topics).build();
  103. }
  104. @SneakyThrows
  105. public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
  106. return getOrCreateAdminClient(cluster)
  107. .flatMap(
  108. ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap(
  109. version ->
  110. getClusterMetrics(ac.getAdminClient())
  111. .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient()))
  112. .flatMap(clusterMetrics ->
  113. getTopicsData(ac.getAdminClient()).flatMap(it ->
  114. updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it)
  115. ).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto))
  116. )
  117. )
  118. ).onErrorResume(
  119. e -> Mono.just(cluster.toBuilder()
  120. .status(ServerStatus.OFFLINE)
  121. .lastKafkaException(e)
  122. .build())
  123. );
  124. }
  125. private KafkaCluster buildFromData(KafkaCluster currentCluster,
  126. String version,
  127. InternalSegmentSizeDto segmentSizeDto) {
  128. var topics = segmentSizeDto.getInternalTopicWithSegmentSize();
  129. var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize();
  130. var brokersIds = new ArrayList<>(brokersMetrics.getInternalBrokerMetrics().keySet());
  131. InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
  132. brokersMetrics.toBuilder();
  133. InternalClusterMetrics topicsMetrics = collectTopicsMetrics(topics);
  134. ServerStatus zookeeperStatus = ServerStatus.OFFLINE;
  135. Throwable zookeeperException = null;
  136. try {
  137. zookeeperStatus = zookeeperService.isZookeeperOnline(currentCluster) ? ServerStatus.ONLINE :
  138. ServerStatus.OFFLINE;
  139. } catch (Throwable e) {
  140. zookeeperException = e;
  141. }
  142. InternalClusterMetrics clusterMetrics = metricsBuilder
  143. .activeControllers(brokersMetrics.getActiveControllers())
  144. .topicCount(topicsMetrics.getTopicCount())
  145. .brokerCount(brokersMetrics.getBrokerCount())
  146. .underReplicatedPartitionCount(topicsMetrics.getUnderReplicatedPartitionCount())
  147. .inSyncReplicasCount(topicsMetrics.getInSyncReplicasCount())
  148. .outOfSyncReplicasCount(topicsMetrics.getOutOfSyncReplicasCount())
  149. .onlinePartitionCount(topicsMetrics.getOnlinePartitionCount())
  150. .offlinePartitionCount(topicsMetrics.getOfflinePartitionCount())
  151. .zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zookeeperStatus))
  152. .version(version)
  153. .build();
  154. return currentCluster.toBuilder()
  155. .version(version)
  156. .status(ServerStatus.ONLINE)
  157. .zookeeperStatus(zookeeperStatus)
  158. .lastZookeeperException(zookeeperException)
  159. .lastKafkaException(null)
  160. .metrics(clusterMetrics)
  161. .topics(topics)
  162. .brokers(brokersIds)
  163. .build();
  164. }
  165. private InternalClusterMetrics collectTopicsMetrics(Map<String, InternalTopic> topics) {
  166. int underReplicatedPartitions = 0;
  167. int inSyncReplicasCount = 0;
  168. int outOfSyncReplicasCount = 0;
  169. int onlinePartitionCount = 0;
  170. int offlinePartitionCount = 0;
  171. for (InternalTopic topic : topics.values()) {
  172. underReplicatedPartitions += topic.getUnderReplicatedPartitions();
  173. inSyncReplicasCount += topic.getInSyncReplicas();
  174. outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas());
  175. onlinePartitionCount +=
  176. topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1)
  177. .sum();
  178. offlinePartitionCount +=
  179. topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1)
  180. .sum();
  181. }
  182. return InternalClusterMetrics.builder()
  183. .underReplicatedPartitionCount(underReplicatedPartitions)
  184. .inSyncReplicasCount(inSyncReplicasCount)
  185. .outOfSyncReplicasCount(outOfSyncReplicasCount)
  186. .onlinePartitionCount(onlinePartitionCount)
  187. .offlinePartitionCount(offlinePartitionCount)
  188. .topicCount(topics.size())
  189. .build();
  190. }
  191. private Map<String, InternalTopic> mergeWithConfigs(
  192. List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
  193. return topics.stream()
  194. .map(t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build())
  195. .map(t -> t.toBuilder().cleanUpPolicy(
  196. CleanupPolicy.fromString(t.getTopicConfigs().stream()
  197. .filter(config -> config.getName().equals("cleanup.policy"))
  198. .findFirst()
  199. .orElseGet(() -> InternalTopicConfig.builder().value("unknown").build())
  200. .getValue())).build())
  201. .collect(Collectors.toMap(
  202. InternalTopic::getName,
  203. e -> e
  204. ));
  205. }
  206. @SneakyThrows
  207. private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
  208. return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names())
  209. .flatMap(topics -> getTopicsData(adminClient, topics).collectList());
  210. }
  211. private Flux<InternalTopic> getTopicsData(AdminClient adminClient, Collection<String> topics) {
  212. final Mono<Map<String, List<InternalTopicConfig>>> configsMono =
  213. loadTopicsConfig(adminClient, topics);
  214. return ClusterUtil.toMono(adminClient.describeTopics(topics).all())
  215. .map(m -> m.values().stream()
  216. .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()))
  217. .flatMap(internalTopics -> configsMono
  218. .map(configs -> mergeWithConfigs(internalTopics, configs).values()))
  219. .flatMapMany(Flux::fromIterable);
  220. }
  221. private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
  222. return ClusterUtil.toMono(client.describeCluster().nodes())
  223. .flatMap(brokers ->
  224. ClusterUtil.toMono(client.describeCluster().controller()).map(
  225. c -> {
  226. InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
  227. InternalClusterMetrics.builder();
  228. metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
  229. return metricsBuilder.build();
  230. }
  231. )
  232. );
  233. }
  234. @SneakyThrows
  235. private Mono<String> createTopic(AdminClient adminClient, NewTopic newTopic) {
  236. return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(),
  237. newTopic.name());
  238. }
  239. @SneakyThrows
  240. public Mono<InternalTopic> createTopic(AdminClient adminClient,
  241. Mono<TopicCreation> topicCreation) {
  242. return topicCreation.flatMap(
  243. topicData -> {
  244. NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(),
  245. topicData.getReplicationFactor().shortValue());
  246. newTopic.configs(topicData.getConfigs());
  247. return createTopic(adminClient, newTopic).map(v -> topicData);
  248. })
  249. .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage())))
  250. .flatMap(
  251. topicData ->
  252. getTopicsData(adminClient, Collections.singleton(topicData.getName()))
  253. .next()
  254. ).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
  255. .flatMap(t ->
  256. loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
  257. .map(c -> mergeWithConfigs(Collections.singletonList(t), c))
  258. .map(m -> m.values().iterator().next())
  259. );
  260. }
  261. public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicCreation> topicCreation) {
  262. return getOrCreateAdminClient(cluster)
  263. .flatMap(ac -> createTopic(ac.getAdminClient(), topicCreation));
  264. }
  265. public Mono<Void> deleteTopic(KafkaCluster cluster, String topicName) {
  266. return getOrCreateAdminClient(cluster)
  267. .map(ExtendedAdminClient::getAdminClient)
  268. .map(adminClient -> adminClient.deleteTopics(List.of(topicName)))
  269. .then();
  270. }
  271. @SneakyThrows
  272. public Mono<ExtendedAdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
  273. return Mono.justOrEmpty(adminClientCache.get(cluster.getName()))
  274. .switchIfEmpty(createAdminClient(cluster))
  275. .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e));
  276. }
  277. public Mono<ExtendedAdminClient> createAdminClient(KafkaCluster kafkaCluster) {
  278. return Mono.fromSupplier(() -> {
  279. Properties properties = new Properties();
  280. properties.putAll(kafkaCluster.getProperties());
  281. properties
  282. .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
  283. properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
  284. return AdminClient.create(properties);
  285. }).flatMap(ExtendedAdminClient::extendedAdminClient);
  286. }
  287. @SneakyThrows
  288. private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(
  289. AdminClient adminClient, Collection<String> topicNames) {
  290. List<ConfigResource> resources = topicNames.stream()
  291. .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
  292. .collect(Collectors.toList());
  293. return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
  294. .map(configs ->
  295. configs.entrySet().stream().map(
  296. c -> Tuples.of(
  297. c.getKey().name(),
  298. c.getValue().entries().stream().map(ClusterUtil::mapToInternalTopicConfig)
  299. .collect(Collectors.toList())
  300. )
  301. ).collect(Collectors.toMap(
  302. Tuple2::getT1,
  303. Tuple2::getT2
  304. ))
  305. );
  306. }
  307. public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
  308. KafkaCluster cluster) {
  309. return getOrCreateAdminClient(cluster).flatMap(ac ->
  310. ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all())
  311. .flatMap(s ->
  312. getConsumerGroupsInternal(
  313. cluster,
  314. s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()))
  315. )
  316. );
  317. }
  318. public Mono<List<InternalConsumerGroup>> getConsumerGroupsInternal(
  319. KafkaCluster cluster, List<String> groupIds) {
  320. return getOrCreateAdminClient(cluster).flatMap(ac ->
  321. ClusterUtil.toMono(
  322. ac.getAdminClient().describeConsumerGroups(groupIds).all()
  323. ).map(Map::values)
  324. ).flatMap(descriptions ->
  325. Flux.fromIterable(descriptions)
  326. .parallel()
  327. .flatMap(d ->
  328. groupMetadata(cluster, d.groupId())
  329. .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets))
  330. )
  331. .sequential()
  332. .collectList()
  333. );
  334. }
  335. public Mono<List<InternalConsumerGroup>> getConsumerGroups(
  336. KafkaCluster cluster, Optional<String> topic, List<String> groupIds) {
  337. final Mono<List<InternalConsumerGroup>> consumerGroups;
  338. if (groupIds.isEmpty()) {
  339. consumerGroups = getConsumerGroupsInternal(cluster);
  340. } else {
  341. consumerGroups = getConsumerGroupsInternal(cluster, groupIds);
  342. }
  343. return consumerGroups.map(c ->
  344. c.stream()
  345. .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic))
  346. .filter(Optional::isPresent)
  347. .map(Optional::get)
  348. .map(g ->
  349. g.toBuilder().endOffsets(
  350. topicPartitionsEndOffsets(cluster, g.getOffsets().keySet())
  351. ).build()
  352. )
  353. .collect(Collectors.toList())
  354. );
  355. }
  356. public Mono<Map<TopicPartition, OffsetAndMetadata>> groupMetadata(KafkaCluster cluster,
  357. String consumerGroupId) {
  358. return getOrCreateAdminClient(cluster).map(ac ->
  359. ac.getAdminClient()
  360. .listConsumerGroupOffsets(consumerGroupId)
  361. .partitionsToOffsetAndMetadata()
  362. ).flatMap(ClusterUtil::toMono);
  363. }
  364. public Map<TopicPartition, Long> topicPartitionsEndOffsets(
  365. KafkaCluster cluster, Collection<TopicPartition> topicPartitions) {
  366. try (KafkaConsumer<Bytes, Bytes> consumer = createConsumer(cluster)) {
  367. return consumer.endOffsets(topicPartitions);
  368. }
  369. }
  370. public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
  371. return createConsumer(cluster, Map.of());
  372. }
  373. public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
  374. Map<String, Object> properties) {
  375. Properties props = new Properties();
  376. props.putAll(cluster.getProperties());
  377. props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID());
  378. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  379. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
  380. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
  381. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  382. props.putAll(properties);
  383. return new KafkaConsumer<>(props);
  384. }
  385. @SneakyThrows
  386. public Mono<InternalTopic> updateTopic(KafkaCluster cluster, String topicName,
  387. TopicUpdate topicUpdate) {
  388. ConfigResource topicCr = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
  389. return getOrCreateAdminClient(cluster)
  390. .flatMap(ac -> {
  391. if (ac.getSupportedFeatures()
  392. .contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
  393. return incrementalAlterConfig(topicUpdate, topicCr, ac)
  394. .flatMap(c -> getUpdatedTopic(ac, topicName));
  395. } else {
  396. return alterConfig(topicUpdate, topicCr, ac)
  397. .flatMap(c -> getUpdatedTopic(ac, topicName));
  398. }
  399. });
  400. }
  401. private Mono<InternalTopic> getUpdatedTopic(ExtendedAdminClient ac, String topicName) {
  402. return getTopicsData(ac.getAdminClient())
  403. .map(s -> s.stream()
  404. .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow());
  405. }
  406. private Mono<String> incrementalAlterConfig(TopicUpdate topicUpdate, ConfigResource topicCr,
  407. ExtendedAdminClient ac) {
  408. List<AlterConfigOp> listOp = topicUpdate.getConfigs().entrySet().stream()
  409. .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()),
  410. AlterConfigOp.OpType.SET))).collect(Collectors.toList());
  411. return ClusterUtil.toMono(
  412. ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCr, listOp))
  413. .all(), topicCr.name());
  414. }
  415. @SuppressWarnings("deprecation")
  416. private Mono<String> alterConfig(TopicUpdate topicUpdate, ConfigResource topicCr,
  417. ExtendedAdminClient ac) {
  418. List<ConfigEntry> configEntries = topicUpdate.getConfigs().entrySet().stream()
  419. .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
  420. .collect(Collectors.toList());
  421. Config config = new Config(configEntries);
  422. Map<ConfigResource, Config> map = Collections.singletonMap(topicCr, config);
  423. return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCr.name());
  424. }
  425. private InternalTopic mergeWithStats(InternalTopic topic,
  426. Map<String, LongSummaryStatistics> topics,
  427. Map<TopicPartition, LongSummaryStatistics> partitions) {
  428. final LongSummaryStatistics stats = topics.get(topic.getName());
  429. return topic.toBuilder()
  430. .segmentSize(stats.getSum())
  431. .segmentCount(stats.getCount())
  432. .partitions(
  433. topic.getPartitions().entrySet().stream().map(e ->
  434. Tuples.of(e.getKey(), mergeWithStats(topic.getName(), e.getValue(), partitions))
  435. ).collect(Collectors.toMap(
  436. Tuple2::getT1,
  437. Tuple2::getT2
  438. ))
  439. ).build();
  440. }
  441. private InternalPartition mergeWithStats(String topic, InternalPartition partition,
  442. Map<TopicPartition, LongSummaryStatistics> partitions) {
  443. final LongSummaryStatistics stats =
  444. partitions.get(new TopicPartition(topic, partition.getPartition()));
  445. return partition.toBuilder()
  446. .segmentSize(stats.getSum())
  447. .segmentCount(stats.getCount())
  448. .build();
  449. }
  450. private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac,
  451. InternalClusterMetrics clusterMetrics,
  452. List<InternalTopic> internalTopics) {
  453. List<String> names =
  454. internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList());
  455. return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic ->
  456. ClusterUtil.toMono(ac.describeCluster().nodes()).flatMap(nodes ->
  457. ClusterUtil.toMono(
  458. ac.describeLogDirs(nodes.stream().map(Node::id).collect(Collectors.toList())).all())
  459. .map(log -> {
  460. final List<Tuple3<Integer, TopicPartition, Long>> topicPartitions =
  461. log.entrySet().stream().flatMap(b ->
  462. b.getValue().entrySet().stream().flatMap(topicMap ->
  463. topicMap.getValue().replicaInfos.entrySet().stream()
  464. .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
  465. )
  466. ).collect(Collectors.toList());
  467. final Map<TopicPartition, LongSummaryStatistics> partitionStats =
  468. topicPartitions.stream().collect(
  469. Collectors.groupingBy(
  470. Tuple2::getT2,
  471. Collectors.summarizingLong(Tuple3::getT3)
  472. )
  473. );
  474. final Map<String, LongSummaryStatistics> topicStats =
  475. topicPartitions.stream().collect(
  476. Collectors.groupingBy(
  477. t -> t.getT2().topic(),
  478. Collectors.summarizingLong(Tuple3::getT3)
  479. )
  480. );
  481. final Map<Integer, LongSummaryStatistics> brokerStats =
  482. topicPartitions.stream().collect(
  483. Collectors.groupingBy(
  484. Tuple2::getT1,
  485. Collectors.summarizingLong(Tuple3::getT3)
  486. )
  487. );
  488. final LongSummaryStatistics summary =
  489. topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3));
  490. final Map<String, InternalTopic> resultTopics = internalTopics.stream().map(e ->
  491. Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats))
  492. ).collect(Collectors.toMap(
  493. Tuple2::getT1,
  494. Tuple2::getT2
  495. ));
  496. final Map<Integer, InternalBrokerDiskUsage> resultBrokers =
  497. brokerStats.entrySet().stream().map(e ->
  498. Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder()
  499. .segmentSize(e.getValue().getSum())
  500. .segmentCount(e.getValue().getCount())
  501. .build()
  502. )
  503. ).collect(Collectors.toMap(
  504. Tuple2::getT1,
  505. Tuple2::getT2
  506. ));
  507. return InternalSegmentSizeDto.builder()
  508. .clusterMetricsWithSegmentSize(
  509. clusterMetrics.toBuilder()
  510. .segmentSize(summary.getSum())
  511. .segmentCount(summary.getCount())
  512. .internalBrokerDiskUsage(resultBrokers)
  513. .build()
  514. )
  515. .internalTopicWithSegmentSize(resultTopics).build();
  516. })
  517. )
  518. );
  519. }
  520. public List<Metric> getJmxMetric(String clusterName, Node node) {
  521. return clustersStorage.getClusterByName(clusterName)
  522. .filter(c -> c.getJmxPort() != null)
  523. .filter(c -> c.getJmxPort() > 0)
  524. .map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host()))
  525. .orElse(Collections.emptyList());
  526. }
  527. private Mono<InternalClusterMetrics> fillJmxMetrics(InternalClusterMetrics internalClusterMetrics,
  528. String clusterName, AdminClient ac) {
  529. return fillBrokerMetrics(internalClusterMetrics, clusterName, ac)
  530. .map(this::calculateClusterMetrics);
  531. }
  532. private Mono<InternalClusterMetrics> fillBrokerMetrics(
  533. InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) {
  534. return ClusterUtil.toMono(ac.describeCluster().nodes())
  535. .flatMapIterable(nodes -> nodes)
  536. .map(broker ->
  537. Map.of(broker.id(), InternalBrokerMetrics.builder()
  538. .metrics(getJmxMetric(clusterName, broker)).build())
  539. )
  540. .collectList()
  541. .map(s -> internalClusterMetrics.toBuilder()
  542. .internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build());
  543. }
  544. private InternalClusterMetrics calculateClusterMetrics(
  545. InternalClusterMetrics internalClusterMetrics) {
  546. final List<Metric> metrics = internalClusterMetrics.getInternalBrokerMetrics().values().stream()
  547. .flatMap(b -> b.getMetrics().stream())
  548. .collect(
  549. Collectors.groupingBy(
  550. Metric::getCanonicalName,
  551. Collectors.reducing(jmxClusterUtil::reduceJmxMetrics)
  552. )
  553. ).values().stream()
  554. .filter(Optional::isPresent)
  555. .map(Optional::get)
  556. .collect(Collectors.toList());
  557. final InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder =
  558. internalClusterMetrics.toBuilder().metrics(metrics);
  559. metricsBuilder.bytesInPerSec(findTopicMetrics(
  560. metrics, JmxMetricsName.BytesInPerSec, JmxMetricsValueName.FiveMinuteRate
  561. ));
  562. metricsBuilder.bytesOutPerSec(findTopicMetrics(
  563. metrics, JmxMetricsName.BytesOutPerSec, JmxMetricsValueName.FiveMinuteRate
  564. ));
  565. return metricsBuilder.build();
  566. }
  567. private Map<String, BigDecimal> findTopicMetrics(List<Metric> metrics, JmxMetricsName metricsName,
  568. JmxMetricsValueName valueName) {
  569. return metrics.stream().filter(m -> metricsName.name().equals(m.getName()))
  570. .filter(m -> m.getParams().containsKey("topic"))
  571. .filter(m -> m.getValue().containsKey(valueName.name()))
  572. .map(m -> Tuples.of(
  573. m.getParams().get("topic"),
  574. m.getValue().get(valueName.name())
  575. )).collect(Collectors.groupingBy(
  576. Tuple2::getT1,
  577. Collectors.reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add)
  578. ));
  579. }
  580. public Map<Integer, InternalPartition> getTopicPartitions(KafkaCluster c, InternalTopic topic) {
  581. var tps = topic.getPartitions().values().stream()
  582. .map(t -> new TopicPartition(topic.getName(), t.getPartition()))
  583. .collect(Collectors.toList());
  584. Map<Integer, InternalPartition> partitions =
  585. topic.getPartitions().values().stream().collect(Collectors.toMap(
  586. InternalPartition::getPartition,
  587. tp -> tp
  588. ));
  589. try (var consumer = createConsumer(c)) {
  590. final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(tps);
  591. final Map<TopicPartition, Long> latest = consumer.endOffsets(tps);
  592. return tps.stream()
  593. .map(tp -> partitions.get(tp.partition()).toBuilder()
  594. .offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L))
  595. .offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L))
  596. .build()
  597. ).collect(Collectors.toMap(
  598. InternalPartition::getPartition,
  599. tp -> tp
  600. ));
  601. } catch (Exception e) {
  602. return Collections.emptyMap();
  603. }
  604. }
  605. public Mono<Void> deleteTopicMessages(KafkaCluster cluster, Map<TopicPartition, Long> offsets) {
  606. var records = offsets.entrySet().stream()
  607. .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
  608. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
  609. return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient)
  610. .map(ac -> ac.deleteRecords(records)).then();
  611. }
  612. public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
  613. CreateTopicMessage msg) {
  614. RecordSerDe serde =
  615. deserializationService.getRecordDeserializerForCluster(cluster);
  616. Properties properties = new Properties();
  617. properties.putAll(cluster.getProperties());
  618. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
  619. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  620. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  621. try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
  622. final ProducerRecord<byte[], byte[]> producerRecord = serde.serialize(
  623. topic,
  624. msg.getKey(),
  625. msg.getContent(),
  626. msg.getPartition()
  627. );
  628. CompletableFuture<RecordMetadata> cf = new CompletableFuture<>();
  629. producer.send(producerRecord, (metadata, exception) -> {
  630. if (exception != null) {
  631. cf.completeExceptionally(exception);
  632. } else {
  633. cf.complete(metadata);
  634. }
  635. });
  636. return Mono.fromFuture(cf);
  637. }
  638. }
  639. private Mono<InternalTopic> increaseTopicPartitions(AdminClient adminClient,
  640. String topicName,
  641. Map<String, NewPartitions> newPartitionsMap
  642. ) {
  643. return ClusterUtil.toMono(adminClient.createPartitions(newPartitionsMap).all(), topicName)
  644. .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
  645. }
  646. public Mono<InternalTopic> increaseTopicPartitions(
  647. KafkaCluster cluster,
  648. String topicName,
  649. PartitionsIncrease partitionsIncrease) {
  650. return getOrCreateAdminClient(cluster)
  651. .flatMap(ac -> {
  652. Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount();
  653. Integer requestedCount = partitionsIncrease.getTotalPartitionsCount();
  654. if (requestedCount < actualCount) {
  655. return Mono.error(
  656. new ValidationException(String.format(
  657. "Topic currently has %s partitions, which is higher than the requested %s.",
  658. actualCount, requestedCount)));
  659. }
  660. if (requestedCount.equals(actualCount)) {
  661. return Mono.error(
  662. new ValidationException(
  663. String.format("Topic already has %s partitions.", actualCount)));
  664. }
  665. Map<String, NewPartitions> newPartitionsMap = Collections.singletonMap(
  666. topicName,
  667. NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount())
  668. );
  669. return increaseTopicPartitions(ac.getAdminClient(), topicName, newPartitionsMap);
  670. });
  671. }
  672. private Mono<InternalTopic> changeReplicationFactor(
  673. AdminClient adminClient,
  674. String topicName,
  675. Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments
  676. ) {
  677. return ClusterUtil.toMono(adminClient
  678. .alterPartitionReassignments(reassignments).all(), topicName)
  679. .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next());
  680. }
  681. /**
  682. * Change topic replication factor, works on brokers versions 5.4.x and higher
  683. */
  684. public Mono<InternalTopic> changeReplicationFactor(
  685. KafkaCluster cluster,
  686. String topicName,
  687. ReplicationFactorChange replicationFactorChange) {
  688. return getOrCreateAdminClient(cluster)
  689. .flatMap(ac -> {
  690. Integer actual = cluster.getTopics().get(topicName).getReplicationFactor();
  691. Integer requested = replicationFactorChange.getTotalReplicationFactor();
  692. Integer brokersCount = cluster.getMetrics().getBrokerCount();
  693. if (requested.equals(actual)) {
  694. return Mono.error(
  695. new ValidationException(
  696. String.format("Topic already has replicationFactor %s.", actual)));
  697. }
  698. if (requested > brokersCount) {
  699. return Mono.error(
  700. new ValidationException(
  701. String.format("Requested replication factor %s more than brokers count %s.",
  702. requested, brokersCount)));
  703. }
  704. return changeReplicationFactor(ac.getAdminClient(), topicName,
  705. getPartitionsReassignments(cluster, topicName,
  706. replicationFactorChange));
  707. });
  708. }
  709. private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsReassignments(
  710. KafkaCluster cluster,
  711. String topicName,
  712. ReplicationFactorChange replicationFactorChange) {
  713. // Current assignment map (Partition number -> List of brokers)
  714. Map<Integer, List<Integer>> currentAssignment = getCurrentAssignment(cluster, topicName);
  715. // Brokers map (Broker id -> count)
  716. Map<Integer, Integer> brokersUsage = getBrokersMap(cluster, currentAssignment);
  717. int currentReplicationFactor = cluster.getTopics().get(topicName).getReplicationFactor();
  718. // If we should to increase Replication factor
  719. if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) {
  720. // For each partition
  721. for (var assignmentList : currentAssignment.values()) {
  722. // Get brokers list sorted by usage
  723. var brokers = brokersUsage.entrySet().stream()
  724. .sorted(Map.Entry.comparingByValue())
  725. .map(Map.Entry::getKey)
  726. .collect(Collectors.toList());
  727. // Iterate brokers and try to add them in assignment
  728. // while (partition replicas count != requested replication factor)
  729. for (Integer broker : brokers) {
  730. if (!assignmentList.contains(broker)) {
  731. assignmentList.add(broker);
  732. brokersUsage.merge(broker, 1, Integer::sum);
  733. }
  734. if (assignmentList.size() == replicationFactorChange.getTotalReplicationFactor()) {
  735. break;
  736. }
  737. }
  738. if (assignmentList.size() != replicationFactorChange.getTotalReplicationFactor()) {
  739. throw new ValidationException("Something went wrong during adding replicas");
  740. }
  741. }
  742. // If we should to decrease Replication factor
  743. } else if (replicationFactorChange.getTotalReplicationFactor() < currentReplicationFactor) {
  744. for (Map.Entry<Integer, List<Integer>> assignmentEntry : currentAssignment.entrySet()) {
  745. var partition = assignmentEntry.getKey();
  746. var brokers = assignmentEntry.getValue();
  747. // Get brokers list sorted by usage in reverse order
  748. var brokersUsageList = brokersUsage.entrySet().stream()
  749. .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
  750. .map(Map.Entry::getKey)
  751. .collect(Collectors.toList());
  752. // Iterate brokers and try to remove them from assignment
  753. // while (partition replicas count != requested replication factor)
  754. for (Integer broker : brokersUsageList) {
  755. // Check is the broker the leader of partition
  756. if (!cluster.getTopics().get(topicName).getPartitions().get(partition).getLeader()
  757. .equals(broker)) {
  758. brokers.remove(broker);
  759. brokersUsage.merge(broker, -1, Integer::sum);
  760. }
  761. if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) {
  762. break;
  763. }
  764. }
  765. if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) {
  766. throw new ValidationException("Something went wrong during removing replicas");
  767. }
  768. }
  769. } else {
  770. throw new ValidationException("Replication factor already equals requested");
  771. }
  772. // Return result map
  773. return currentAssignment.entrySet().stream().collect(Collectors.toMap(
  774. e -> new TopicPartition(topicName, e.getKey()),
  775. e -> Optional.of(new NewPartitionReassignment(e.getValue()))
  776. ));
  777. }
  778. private Map<Integer, List<Integer>> getCurrentAssignment(KafkaCluster cluster, String topicName) {
  779. return cluster.getTopics().get(topicName).getPartitions().values().stream()
  780. .collect(Collectors.toMap(
  781. InternalPartition::getPartition,
  782. p -> p.getReplicas().stream()
  783. .map(InternalReplica::getBroker)
  784. .collect(Collectors.toList())
  785. ));
  786. }
  787. private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
  788. Map<Integer, List<Integer>> currentAssignment) {
  789. Map<Integer, Integer> result = cluster.getBrokers().stream()
  790. .collect(Collectors.toMap(
  791. c -> c,
  792. c -> 0
  793. ));
  794. currentAssignment.values().forEach(brokers -> brokers
  795. .forEach(broker -> result.put(broker, result.get(broker) + 1)));
  796. return result;
  797. }
  798. }