package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.ConsumerGroup; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics; import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalSegmentSizeDto; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Metric; import com.provectus.kafka.ui.model.PartitionsIncrease; import com.provectus.kafka.ui.model.ReplicationFactorChange; import com.provectus.kafka.ui.model.ServerStatus; import com.provectus.kafka.ui.model.TopicConsumerGroups; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicUpdate; import com.provectus.kafka.ui.serde.DeserializationService; import com.provectus.kafka.ui.serde.RecordSerDe; import com.provectus.kafka.ui.util.ClusterUtil; import com.provectus.kafka.ui.util.JmxClusterUtil; import com.provectus.kafka.ui.util.JmxMetricsName; import com.provectus.kafka.ui.util.JmxMetricsValueName; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.LongSummaryStatistics; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.utils.Bytes; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; @Service @RequiredArgsConstructor @Log4j2 public class KafkaService { private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true); private final ZookeeperService zookeeperService; private final Map adminClientCache = new ConcurrentHashMap<>(); private final JmxClusterUtil jmxClusterUtil; private final ClustersStorage clustersStorage; private final DeserializationService deserializationService; @Setter // used in tests @Value("${kafka.admin-client-timeout}") private int clientTimeout; public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) { final Map topics = new HashMap<>(cluster.getTopics()); topics.put(updatedTopic.getName(), updatedTopic); return cluster.toBuilder().topics(topics).build(); } public KafkaCluster getUpdatedCluster(KafkaCluster cluster, String topicToDelete) { final Map topics = new HashMap<>(cluster.getTopics()); topics.remove(topicToDelete); return cluster.toBuilder().topics(topics).build(); } @SneakyThrows public Mono getUpdatedCluster(KafkaCluster cluster) { return getOrCreateAdminClient(cluster) .flatMap( ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap( version -> getClusterMetrics(ac.getAdminClient()) .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient())) .flatMap(clusterMetrics -> getTopicsData(ac.getAdminClient()).flatMap(it -> updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it) ).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto)) ) ) ).onErrorResume( e -> Mono.just(cluster.toBuilder() .status(ServerStatus.OFFLINE) .lastKafkaException(e) .build()) ); } private KafkaCluster buildFromData(KafkaCluster currentCluster, String version, InternalSegmentSizeDto segmentSizeDto) { var topics = segmentSizeDto.getInternalTopicWithSegmentSize(); var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize(); var brokersIds = new ArrayList<>(brokersMetrics.getInternalBrokerMetrics().keySet()); InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = brokersMetrics.toBuilder(); InternalClusterMetrics topicsMetrics = collectTopicsMetrics(topics); ServerStatus zookeeperStatus = ServerStatus.OFFLINE; Throwable zookeeperException = null; try { zookeeperStatus = zookeeperService.isZookeeperOnline(currentCluster) ? ServerStatus.ONLINE : ServerStatus.OFFLINE; } catch (Throwable e) { zookeeperException = e; } InternalClusterMetrics clusterMetrics = metricsBuilder .activeControllers(brokersMetrics.getActiveControllers()) .topicCount(topicsMetrics.getTopicCount()) .brokerCount(brokersMetrics.getBrokerCount()) .underReplicatedPartitionCount(topicsMetrics.getUnderReplicatedPartitionCount()) .inSyncReplicasCount(topicsMetrics.getInSyncReplicasCount()) .outOfSyncReplicasCount(topicsMetrics.getOutOfSyncReplicasCount()) .onlinePartitionCount(topicsMetrics.getOnlinePartitionCount()) .offlinePartitionCount(topicsMetrics.getOfflinePartitionCount()) .zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zookeeperStatus)) .build(); return currentCluster.toBuilder() .version(version) .status(ServerStatus.ONLINE) .zookeeperStatus(zookeeperStatus) .lastZookeeperException(zookeeperException) .lastKafkaException(null) .metrics(clusterMetrics) .topics(topics) .brokers(brokersIds) .build(); } private InternalClusterMetrics collectTopicsMetrics(Map topics) { int underReplicatedPartitions = 0; int inSyncReplicasCount = 0; int outOfSyncReplicasCount = 0; int onlinePartitionCount = 0; int offlinePartitionCount = 0; for (InternalTopic topic : topics.values()) { underReplicatedPartitions += topic.getUnderReplicatedPartitions(); inSyncReplicasCount += topic.getInSyncReplicas(); outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas()); onlinePartitionCount += topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1) .sum(); offlinePartitionCount += topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1) .sum(); } return InternalClusterMetrics.builder() .underReplicatedPartitionCount(underReplicatedPartitions) .inSyncReplicasCount(inSyncReplicasCount) .outOfSyncReplicasCount(outOfSyncReplicasCount) .onlinePartitionCount(onlinePartitionCount) .offlinePartitionCount(offlinePartitionCount) .topicCount(topics.size()) .build(); } private Map mergeWithConfigs( List topics, Map> configs) { return topics.stream().map( t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build() ).collect(Collectors.toMap( InternalTopic::getName, e -> e )); } @SneakyThrows private Mono> getTopicsData(AdminClient adminClient) { return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names()) .flatMap(topics -> getTopicsData(adminClient, topics).collectList()); } private Flux getTopicsData(AdminClient adminClient, Collection topics) { final Mono>> configsMono = loadTopicsConfig(adminClient, topics); return ClusterUtil.toMono(adminClient.describeTopics(topics).all()).map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()) ).flatMap(internalTopics -> configsMono.map(configs -> mergeWithConfigs(internalTopics, configs).values() )).flatMapMany(Flux::fromIterable); } private Mono getClusterMetrics(AdminClient client) { return ClusterUtil.toMono(client.describeCluster().nodes()) .flatMap(brokers -> ClusterUtil.toMono(client.describeCluster().controller()).map( c -> { InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder(); metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0); return metricsBuilder.build(); } ) ); } @SneakyThrows private Mono createTopic(AdminClient adminClient, NewTopic newTopic) { return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(), newTopic.name()); } @SneakyThrows public Mono createTopic(AdminClient adminClient, Mono topicCreation) { return topicCreation.flatMap( topicData -> { NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue()); newTopic.configs(topicData.getConfigs()); return createTopic(adminClient, newTopic).map(v -> topicData); }).flatMap( topicData -> getTopicsData(adminClient, Collections.singleton(topicData.getName())) .next() ).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic"))) .flatMap(t -> loadTopicsConfig(adminClient, Collections.singletonList(t.getName())) .map(c -> mergeWithConfigs(Collections.singletonList(t), c)) .map(m -> m.values().iterator().next()) ); } public Mono createTopic(KafkaCluster cluster, Mono topicCreation) { return getOrCreateAdminClient(cluster) .flatMap(ac -> createTopic(ac.getAdminClient(), topicCreation)); } public Mono deleteTopic(KafkaCluster cluster, String topicName) { return getOrCreateAdminClient(cluster) .map(ExtendedAdminClient::getAdminClient) .map(adminClient -> adminClient.deleteTopics(List.of(topicName))) .then(); } @SneakyThrows public Mono getOrCreateAdminClient(KafkaCluster cluster) { return Mono.justOrEmpty(adminClientCache.get(cluster.getName())) .switchIfEmpty(createAdminClient(cluster)) .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e)); } public Mono createAdminClient(KafkaCluster kafkaCluster) { return Mono.fromSupplier(() -> { Properties properties = new Properties(); properties.putAll(kafkaCluster.getProperties()); properties .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); return AdminClient.create(properties); }).flatMap(ExtendedAdminClient::extendedAdminClient); } @SneakyThrows private Mono>> loadTopicsConfig( AdminClient adminClient, Collection topicNames) { List resources = topicNames.stream() .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .collect(Collectors.toList()); return ClusterUtil.toMono(adminClient.describeConfigs(resources).all()) .map(configs -> configs.entrySet().stream().map( c -> Tuples.of( c.getKey().name(), c.getValue().entries().stream().map(ClusterUtil::mapToInternalTopicConfig) .collect(Collectors.toList()) ) ).collect(Collectors.toMap( Tuple2::getT1, Tuple2::getT2 )) ); } public Mono> getConsumerGroupsInternal( KafkaCluster cluster) { return getOrCreateAdminClient(cluster).flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()) .flatMap(s -> ClusterUtil.toMono( ac.getAdminClient().describeConsumerGroups( s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList()) ).all() ).map(Map::values) ) ); } public Mono> getConsumerGroups(KafkaCluster cluster) { return getConsumerGroupsInternal(cluster) .map(c -> c.stream().map(ClusterUtil::convertToConsumerGroup).collect(Collectors.toList())); } public Mono getTopicConsumerGroups(KafkaCluster cluster, String topic) { final Map endOffsets = topicEndOffsets(cluster, topic); return getConsumerGroupsInternal(cluster) .flatMapIterable(c -> c.stream() .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic)) .filter(Optional::isPresent) .map(Optional::get) .map(d -> groupMetadata(cluster, d.groupId()) .flatMapIterable(meta -> d.members().stream().flatMap(m -> ClusterUtil.convertToConsumerTopicPartitionDetails( m, meta, endOffsets, d.groupId() ).stream() ).collect(Collectors.toList()) ) ).collect(Collectors.toList()) ).flatMap(f -> f).collectList().map(l -> new TopicConsumerGroups().consumers(l)); } public Mono> groupMetadata(KafkaCluster cluster, String consumerGroupId) { return getOrCreateAdminClient(cluster).map(ac -> ac.getAdminClient() .listConsumerGroupOffsets(consumerGroupId) .partitionsToOffsetAndMetadata() ).flatMap(ClusterUtil::toMono); } public Map topicEndOffsets( KafkaCluster cluster, String topic) { try (KafkaConsumer consumer = createConsumer(cluster)) { final List topicPartitions = consumer.partitionsFor(topic).stream() .map(i -> new TopicPartition(i.topic(), i.partition())) .collect(Collectors.toList()); return consumer.endOffsets(topicPartitions); } } public Map topicPartitionsEndOffsets( KafkaCluster cluster, Collection topicPartitions) { try (KafkaConsumer consumer = createConsumer(cluster)) { return consumer.endOffsets(topicPartitions); } } public KafkaConsumer createConsumer(KafkaCluster cluster) { return createConsumer(cluster, Map.of()); } public KafkaConsumer createConsumer(KafkaCluster cluster, Map properties) { Properties props = new Properties(); props.putAll(cluster.getProperties()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putAll(properties); return new KafkaConsumer<>(props); } @SneakyThrows public Mono updateTopic(KafkaCluster cluster, String topicName, TopicUpdate topicUpdate) { ConfigResource topicCr = new ConfigResource(ConfigResource.Type.TOPIC, topicName); return getOrCreateAdminClient(cluster) .flatMap(ac -> { if (ac.getSupportedFeatures() .contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) { return incrementalAlterConfig(topicUpdate, topicCr, ac) .flatMap(c -> getUpdatedTopic(ac, topicName)); } else { return alterConfig(topicUpdate, topicCr, ac) .flatMap(c -> getUpdatedTopic(ac, topicName)); } }); } private Mono getUpdatedTopic(ExtendedAdminClient ac, String topicName) { return getTopicsData(ac.getAdminClient()) .map(s -> s.stream() .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow()); } private Mono incrementalAlterConfig(TopicUpdate topicUpdate, ConfigResource topicCr, ExtendedAdminClient ac) { List listOp = topicUpdate.getConfigs().entrySet().stream() .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList()); return ClusterUtil.toMono( ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCr, listOp)) .all(), topicCr.name()); } @SuppressWarnings("deprecation") private Mono alterConfig(TopicUpdate topicUpdate, ConfigResource topicCr, ExtendedAdminClient ac) { List configEntries = topicUpdate.getConfigs().entrySet().stream() .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))) .collect(Collectors.toList()); Config config = new Config(configEntries); Map map = Collections.singletonMap(topicCr, config); return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCr.name()); } private InternalTopic mergeWithStats(InternalTopic topic, Map topics, Map partitions) { final LongSummaryStatistics stats = topics.get(topic.getName()); return topic.toBuilder() .segmentSize(stats.getSum()) .segmentCount(stats.getCount()) .partitions( topic.getPartitions().entrySet().stream().map(e -> Tuples.of(e.getKey(), mergeWithStats(topic.getName(), e.getValue(), partitions)) ).collect(Collectors.toMap( Tuple2::getT1, Tuple2::getT2 )) ).build(); } private InternalPartition mergeWithStats(String topic, InternalPartition partition, Map partitions) { final LongSummaryStatistics stats = partitions.get(new TopicPartition(topic, partition.getPartition())); return partition.toBuilder() .segmentSize(stats.getSum()) .segmentCount(stats.getCount()) .build(); } private Mono updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, List internalTopics) { List names = internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList()); return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic -> ClusterUtil.toMono(ac.describeCluster().nodes()).flatMap(nodes -> ClusterUtil.toMono( ac.describeLogDirs(nodes.stream().map(Node::id).collect(Collectors.toList())).all()) .map(log -> { final List> topicPartitions = log.entrySet().stream().flatMap(b -> b.getValue().entrySet().stream().flatMap(topicMap -> topicMap.getValue().replicaInfos.entrySet().stream() .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size)) ) ).collect(Collectors.toList()); final Map partitionStats = topicPartitions.stream().collect( Collectors.groupingBy( Tuple2::getT2, Collectors.summarizingLong(Tuple3::getT3) ) ); final Map topicStats = topicPartitions.stream().collect( Collectors.groupingBy( t -> t.getT2().topic(), Collectors.summarizingLong(Tuple3::getT3) ) ); final Map brokerStats = topicPartitions.stream().collect( Collectors.groupingBy( Tuple2::getT1, Collectors.summarizingLong(Tuple3::getT3) ) ); final LongSummaryStatistics summary = topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3)); final Map resultTopics = internalTopics.stream().map(e -> Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats)) ).collect(Collectors.toMap( Tuple2::getT1, Tuple2::getT2 )); final Map resultBrokers = brokerStats.entrySet().stream().map(e -> Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder() .segmentSize(e.getValue().getSum()) .segmentCount(e.getValue().getCount()) .build() ) ).collect(Collectors.toMap( Tuple2::getT1, Tuple2::getT2 )); return InternalSegmentSizeDto.builder() .clusterMetricsWithSegmentSize( clusterMetrics.toBuilder() .segmentSize(summary.getSum()) .segmentCount(summary.getCount()) .internalBrokerDiskUsage(resultBrokers) .build() ) .internalTopicWithSegmentSize(resultTopics).build(); }) ) ); } public List getJmxMetric(String clusterName, Node node) { return clustersStorage.getClusterByName(clusterName) .filter(c -> c.getJmxPort() != null) .filter(c -> c.getJmxPort() > 0) .map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host())) .orElse(Collections.emptyList()); } private Mono fillJmxMetrics(InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) { return fillBrokerMetrics(internalClusterMetrics, clusterName, ac) .map(this::calculateClusterMetrics); } private Mono fillBrokerMetrics( InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) { return ClusterUtil.toMono(ac.describeCluster().nodes()) .flatMapIterable(nodes -> nodes) .map(broker -> Map.of(broker.id(), InternalBrokerMetrics.builder() .metrics(getJmxMetric(clusterName, broker)).build()) ) .collectList() .map(s -> internalClusterMetrics.toBuilder() .internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build()); } private InternalClusterMetrics calculateClusterMetrics( InternalClusterMetrics internalClusterMetrics) { final List metrics = internalClusterMetrics.getInternalBrokerMetrics().values().stream() .flatMap(b -> b.getMetrics().stream()) .collect( Collectors.groupingBy( Metric::getCanonicalName, Collectors.reducing(jmxClusterUtil::reduceJmxMetrics) ) ).values().stream() .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); final InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = internalClusterMetrics.toBuilder().metrics(metrics); metricsBuilder.bytesInPerSec(findTopicMetrics( metrics, JmxMetricsName.BytesInPerSec, JmxMetricsValueName.FiveMinuteRate )); metricsBuilder.bytesOutPerSec(findTopicMetrics( metrics, JmxMetricsName.BytesOutPerSec, JmxMetricsValueName.FiveMinuteRate )); return metricsBuilder.build(); } private Map findTopicMetrics(List metrics, JmxMetricsName metricsName, JmxMetricsValueName valueName) { return metrics.stream().filter(m -> metricsName.name().equals(m.getName())) .filter(m -> m.getParams().containsKey("topic")) .filter(m -> m.getValue().containsKey(valueName.name())) .map(m -> Tuples.of( m.getParams().get("topic"), m.getValue().get(valueName.name()) )).collect(Collectors.groupingBy( Tuple2::getT1, Collectors.reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add) )); } public Map getTopicPartitions(KafkaCluster c, InternalTopic topic) { var tps = topic.getPartitions().values().stream() .map(t -> new TopicPartition(topic.getName(), t.getPartition())) .collect(Collectors.toList()); Map partitions = topic.getPartitions().values().stream().collect(Collectors.toMap( InternalPartition::getPartition, tp -> tp )); try (var consumer = createConsumer(c)) { final Map earliest = consumer.beginningOffsets(tps); final Map latest = consumer.endOffsets(tps); return tps.stream() .map(tp -> partitions.get(tp.partition()).toBuilder() .offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L)) .offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L)) .build() ).collect(Collectors.toMap( InternalPartition::getPartition, tp -> tp )); } catch (Exception e) { return Collections.emptyMap(); } } public Mono deleteTopicMessages(KafkaCluster cluster, Map offsets) { var records = offsets.entrySet().stream() .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient) .map(ac -> ac.deleteRecords(records)).then(); } public Mono sendMessage(KafkaCluster cluster, String topic, CreateTopicMessage msg) { RecordSerDe serde = deserializationService.getRecordDeserializerForCluster(cluster); Properties properties = new Properties(); properties.putAll(cluster.getProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); try (KafkaProducer producer = new KafkaProducer<>(properties)) { final ProducerRecord producerRecord = serde.serialize( topic, msg.getKey(), msg.getContent(), msg.getPartition() ); CompletableFuture cf = new CompletableFuture<>(); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { cf.completeExceptionally(exception); } else { cf.complete(metadata); } }); return Mono.fromFuture(cf); } } private Mono increaseTopicPartitions(AdminClient adminClient, String topicName, Map newPartitionsMap ) { return ClusterUtil.toMono(adminClient.createPartitions(newPartitionsMap).all(), topicName) .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next()); } public Mono increaseTopicPartitions( KafkaCluster cluster, String topicName, PartitionsIncrease partitionsIncrease) { return getOrCreateAdminClient(cluster) .flatMap(ac -> { Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount(); Integer requestedCount = partitionsIncrease.getTotalPartitionsCount(); if (requestedCount < actualCount) { return Mono.error( new ValidationException(String.format( "Topic currently has %s partitions, which is higher than the requested %s.", actualCount, requestedCount))); } if (requestedCount.equals(actualCount)) { return Mono.error( new ValidationException( String.format("Topic already has %s partitions.", actualCount))); } Map newPartitionsMap = Collections.singletonMap( topicName, NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount()) ); return increaseTopicPartitions(ac.getAdminClient(), topicName, newPartitionsMap); }); } private Mono changeReplicationFactor( AdminClient adminClient, String topicName, Map> reassignments ) { return ClusterUtil.toMono(adminClient .alterPartitionReassignments(reassignments).all(), topicName) .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next()); } /** * Change topic replication factor, works on brokers versions 5.4.x and higher */ public Mono changeReplicationFactor( KafkaCluster cluster, String topicName, ReplicationFactorChange replicationFactorChange) { return getOrCreateAdminClient(cluster) .flatMap(ac -> { Integer actual = cluster.getTopics().get(topicName).getReplicationFactor(); Integer requested = replicationFactorChange.getTotalReplicationFactor(); Integer brokersCount = cluster.getMetrics().getBrokerCount(); if (requested.equals(actual)) { return Mono.error( new ValidationException( String.format("Topic already has replicationFactor %s.", actual))); } if (requested > brokersCount) { return Mono.error( new ValidationException( String.format("Requested replication factor %s more than brokers count %s.", requested, brokersCount))); } return changeReplicationFactor(ac.getAdminClient(), topicName, getPartitionsReassignments(cluster, topicName, replicationFactorChange)); }); } private Map> getPartitionsReassignments( KafkaCluster cluster, String topicName, ReplicationFactorChange replicationFactorChange) { // Current assignment map (Partition number -> List of brokers) Map> currentAssignment = getCurrentAssignment(cluster, topicName); // Brokers map (Broker id -> count) Map brokersUsage = getBrokersMap(cluster, currentAssignment); int currentReplicationFactor = cluster.getTopics().get(topicName).getReplicationFactor(); // If we should to increase Replication factor if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) { // For each partition for (var assignmentList : currentAssignment.values()) { // Get brokers list sorted by usage var brokers = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) .collect(Collectors.toList()); // Iterate brokers and try to add them in assignment // while (partition replicas count != requested replication factor) for (Integer broker : brokers) { if (!assignmentList.contains(broker)) { assignmentList.add(broker); brokersUsage.merge(broker, 1, Integer::sum); } if (assignmentList.size() == replicationFactorChange.getTotalReplicationFactor()) { break; } } if (assignmentList.size() != replicationFactorChange.getTotalReplicationFactor()) { throw new ValidationException("Something went wrong during adding replicas"); } } // If we should to decrease Replication factor } else if (replicationFactorChange.getTotalReplicationFactor() < currentReplicationFactor) { for (Map.Entry> assignmentEntry : currentAssignment.entrySet()) { var partition = assignmentEntry.getKey(); var brokers = assignmentEntry.getValue(); // Get brokers list sorted by usage in reverse order var brokersUsageList = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) .map(Map.Entry::getKey) .collect(Collectors.toList()); // Iterate brokers and try to remove them from assignment // while (partition replicas count != requested replication factor) for (Integer broker : brokersUsageList) { // Check is the broker the leader of partition if (!cluster.getTopics().get(topicName).getPartitions().get(partition).getLeader() .equals(broker)) { brokers.remove(broker); brokersUsage.merge(broker, -1, Integer::sum); } if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) { break; } } if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) { throw new ValidationException("Something went wrong during removing replicas"); } } } else { throw new ValidationException("Replication factor already equals requested"); } // Return result map return currentAssignment.entrySet().stream().collect(Collectors.toMap( e -> new TopicPartition(topicName, e.getKey()), e -> Optional.of(new NewPartitionReassignment(e.getValue())) )); } private Map> getCurrentAssignment(KafkaCluster cluster, String topicName) { return cluster.getTopics().get(topicName).getPartitions().values().stream() .collect(Collectors.toMap( InternalPartition::getPartition, p -> p.getReplicas().stream() .map(InternalReplica::getBroker) .collect(Collectors.toList()) )); } private Map getBrokersMap(KafkaCluster cluster, Map> currentAssignment) { Map result = cluster.getBrokers().stream() .collect(Collectors.toMap( c -> c, c -> 0 )); currentAssignment.values().forEach(brokers -> brokers .forEach(broker -> result.put(broker, result.get(broker) + 1))); return result; } }