package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.exception.IllegalEntityStateException; import com.provectus.kafka.ui.exception.InvalidRequestApiException; import com.provectus.kafka.ui.exception.LogDirNotFoundApiException; import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.TopicMetadataException; import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.BrokerLogdirUpdate; import com.provectus.kafka.ui.model.CleanupPolicy; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics; import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalSegmentSizeDto; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.Metric; import com.provectus.kafka.ui.model.PartitionsIncrease; import com.provectus.kafka.ui.model.ReplicationFactorChange; import com.provectus.kafka.ui.model.ServerStatus; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicUpdate; import com.provectus.kafka.ui.serde.DeserializationService; import com.provectus.kafka.ui.serde.RecordSerDe; import com.provectus.kafka.ui.util.ClusterUtil; import com.provectus.kafka.ui.util.JmxClusterUtil; import com.provectus.kafka.ui.util.JmxMetricsName; import com.provectus.kafka.ui.util.JmxMetricsValueName; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.LongSummaryStatistics; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupListing; import org.apache.kafka.clients.admin.DescribeConfigsOptions; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.LogDirNotFoundException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesDeserializer; import org.apache.kafka.common.utils.Bytes; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; @Service @RequiredArgsConstructor @Log4j2 public class KafkaService { private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true); private final ZookeeperService zookeeperService; private final Map adminClientCache = new ConcurrentHashMap<>(); private final JmxClusterUtil jmxClusterUtil; private final ClustersStorage clustersStorage; private final DeserializationService deserializationService; @Setter // used in tests @Value("${kafka.admin-client-timeout}") private int clientTimeout; public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) { final Map topics = new HashMap<>(cluster.getTopics()); topics.put(updatedTopic.getName(), updatedTopic); return cluster.toBuilder().topics(topics).build(); } public KafkaCluster getUpdatedCluster(KafkaCluster cluster, String topicToDelete) { final Map topics = new HashMap<>(cluster.getTopics()); topics.remove(topicToDelete); return cluster.toBuilder().topics(topics).build(); } @SneakyThrows public Mono getUpdatedCluster(KafkaCluster cluster) { return getOrCreateAdminClient(cluster) .flatMap( ac -> ClusterUtil.getClusterVersion(ac.getAdminClient()).flatMap( version -> getClusterMetrics(ac.getAdminClient()) .flatMap(i -> fillJmxMetrics(i, cluster.getName(), ac.getAdminClient())) .flatMap(clusterMetrics -> getTopicsData(ac.getAdminClient()).flatMap(it -> updateSegmentMetrics(ac.getAdminClient(), clusterMetrics, it) ).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto)) ) ) ).onErrorResume( e -> Mono.just(cluster.toBuilder() .status(ServerStatus.OFFLINE) .lastKafkaException(e) .build()) ); } private KafkaCluster buildFromData(KafkaCluster currentCluster, String version, InternalSegmentSizeDto segmentSizeDto) { var topics = segmentSizeDto.getInternalTopicWithSegmentSize(); var brokersMetrics = segmentSizeDto.getClusterMetricsWithSegmentSize(); var brokersIds = new ArrayList<>(brokersMetrics.getInternalBrokerMetrics().keySet()); InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = brokersMetrics.toBuilder(); InternalClusterMetrics topicsMetrics = collectTopicsMetrics(topics); ServerStatus zookeeperStatus = ServerStatus.OFFLINE; Throwable zookeeperException = null; try { zookeeperStatus = zookeeperService.isZookeeperOnline(currentCluster) ? ServerStatus.ONLINE : ServerStatus.OFFLINE; } catch (Throwable e) { zookeeperException = e; } InternalClusterMetrics clusterMetrics = metricsBuilder .activeControllers(brokersMetrics.getActiveControllers()) .topicCount(topicsMetrics.getTopicCount()) .brokerCount(brokersMetrics.getBrokerCount()) .underReplicatedPartitionCount(topicsMetrics.getUnderReplicatedPartitionCount()) .inSyncReplicasCount(topicsMetrics.getInSyncReplicasCount()) .outOfSyncReplicasCount(topicsMetrics.getOutOfSyncReplicasCount()) .onlinePartitionCount(topicsMetrics.getOnlinePartitionCount()) .offlinePartitionCount(topicsMetrics.getOfflinePartitionCount()) .zooKeeperStatus(ClusterUtil.convertToIntServerStatus(zookeeperStatus)) .version(version) .build(); return currentCluster.toBuilder() .version(version) .status(ServerStatus.ONLINE) .zookeeperStatus(zookeeperStatus) .lastZookeeperException(zookeeperException) .lastKafkaException(null) .metrics(clusterMetrics) .topics(topics) .brokers(brokersIds) .build(); } private InternalClusterMetrics collectTopicsMetrics(Map topics) { int underReplicatedPartitions = 0; int inSyncReplicasCount = 0; int outOfSyncReplicasCount = 0; int onlinePartitionCount = 0; int offlinePartitionCount = 0; for (InternalTopic topic : topics.values()) { underReplicatedPartitions += topic.getUnderReplicatedPartitions(); inSyncReplicasCount += topic.getInSyncReplicas(); outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas()); onlinePartitionCount += topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1) .sum(); offlinePartitionCount += topic.getPartitions().values().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1) .sum(); } return InternalClusterMetrics.builder() .underReplicatedPartitionCount(underReplicatedPartitions) .inSyncReplicasCount(inSyncReplicasCount) .outOfSyncReplicasCount(outOfSyncReplicasCount) .onlinePartitionCount(onlinePartitionCount) .offlinePartitionCount(offlinePartitionCount) .topicCount(topics.size()) .build(); } private Map mergeWithConfigs( List topics, Map> configs) { return topics.stream() .map(t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build()) .map(t -> t.toBuilder().cleanUpPolicy( CleanupPolicy.fromString(t.getTopicConfigs().stream() .filter(config -> config.getName().equals("cleanup.policy")) .findFirst() .orElseGet(() -> InternalTopicConfig.builder().value("unknown").build()) .getValue())).build()) .collect(Collectors.toMap( InternalTopic::getName, e -> e )); } @SneakyThrows private Mono> getTopicsData(AdminClient adminClient) { return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names()) .flatMap(topics -> getTopicsData(adminClient, topics).collectList()); } private Flux getTopicsData(AdminClient adminClient, Collection topics) { final Mono>> configsMono = loadTopicsConfig(adminClient, topics); return ClusterUtil.toMono(adminClient.describeTopics(topics).all()) .map(m -> m.values().stream() .map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList())) .flatMap(internalTopics -> configsMono .map(configs -> mergeWithConfigs(internalTopics, configs).values())) .flatMapMany(Flux::fromIterable); } private Mono getClusterMetrics(AdminClient client) { return ClusterUtil.toMono(client.describeCluster().nodes()) .flatMap(brokers -> ClusterUtil.toMono(client.describeCluster().controller()).map( c -> { InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = InternalClusterMetrics.builder(); metricsBuilder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0); return metricsBuilder.build(); } ) ); } @SneakyThrows private Mono createTopic(AdminClient adminClient, NewTopic newTopic) { return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic)).all(), newTopic.name()); } @SneakyThrows public Mono createTopic(AdminClient adminClient, Mono topicCreation) { return topicCreation.flatMap( topicData -> { NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue()); newTopic.configs(topicData.getConfigs()); return createTopic(adminClient, newTopic).map(v -> topicData); }) .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage()))) .flatMap( topicData -> getTopicsData(adminClient, Collections.singleton(topicData.getName())) .next() ).switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic"))) .flatMap(t -> loadTopicsConfig(adminClient, Collections.singletonList(t.getName())) .map(c -> mergeWithConfigs(Collections.singletonList(t), c)) .map(m -> m.values().iterator().next()) ); } public Mono createTopic(KafkaCluster cluster, Mono topicCreation) { return getOrCreateAdminClient(cluster) .flatMap(ac -> createTopic(ac.getAdminClient(), topicCreation)); } public Mono deleteTopic(KafkaCluster cluster, String topicName) { return getOrCreateAdminClient(cluster) .map(ExtendedAdminClient::getAdminClient) .map(adminClient -> adminClient.deleteTopics(List.of(topicName))) .then(); } @SneakyThrows public Mono getOrCreateAdminClient(KafkaCluster cluster) { return Mono.justOrEmpty(adminClientCache.get(cluster.getName())) .switchIfEmpty(createAdminClient(cluster)) .map(e -> adminClientCache.computeIfAbsent(cluster.getName(), key -> e)); } public Mono createAdminClient(KafkaCluster kafkaCluster) { return Mono.fromSupplier(() -> { Properties properties = new Properties(); properties.putAll(kafkaCluster.getProperties()); properties .put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); return AdminClient.create(properties); }).flatMap(ExtendedAdminClient::extendedAdminClient); } @SneakyThrows private Mono>> loadTopicsConfig( AdminClient adminClient, Collection topicNames) { List resources = topicNames.stream() .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .collect(Collectors.toList()); return ClusterUtil.toMono(adminClient.describeConfigs(resources).all()) .map(configs -> configs.entrySet().stream().collect(Collectors.toMap( c -> c.getKey().name(), c -> c.getValue().entries().stream() .map(ClusterUtil::mapToInternalTopicConfig) .collect(Collectors.toList())))); } private Mono>> loadBrokersConfig( AdminClient adminClient, List brokersIds) { List resources = brokersIds.stream() .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId))) .collect(Collectors.toList()); return ClusterUtil.toMono(adminClient.describeConfigs(resources, new DescribeConfigsOptions().includeSynonyms(true)).all()) .map(configs -> configs.entrySet().stream().collect(Collectors.toMap( c -> c.getKey().name(), c -> c.getValue().entries().stream() .map(ClusterUtil::mapToInternalBrokerConfig) .collect(Collectors.toList())))); } private Mono> loadBrokersConfig( AdminClient adminClient, Integer brokerId) { return loadBrokersConfig(adminClient, Collections.singletonList(brokerId)) .map(map -> map.values().stream() .findFirst() .orElseThrow(() -> new IllegalEntityStateException( String.format("Config for broker %s not found", brokerId)))); } public Mono> getBrokerConfigs(KafkaCluster cluster, Integer brokerId) { return getOrCreateAdminClient(cluster) .flatMap(adminClient -> { if (!cluster.getBrokers().contains(brokerId)) { return Mono.error( new NotFoundException(String.format("Broker with id %s not found", brokerId))); } return loadBrokersConfig(adminClient.getAdminClient(), brokerId); }); } public Mono> getConsumerGroupsInternal( KafkaCluster cluster) { return getOrCreateAdminClient(cluster).flatMap(ac -> ClusterUtil.toMono(ac.getAdminClient().listConsumerGroups().all()) .flatMap(s -> getConsumerGroupsInternal( cluster, s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())) ) ); } public Mono> getConsumerGroupsInternal( KafkaCluster cluster, List groupIds) { return getOrCreateAdminClient(cluster).flatMap(ac -> ClusterUtil.toMono( ac.getAdminClient().describeConsumerGroups(groupIds).all() ).map(Map::values) ).flatMap(descriptions -> Flux.fromIterable(descriptions) .parallel() .flatMap(d -> groupMetadata(cluster, d.groupId()) .map(offsets -> ClusterUtil.convertToInternalConsumerGroup(d, offsets)) ) .sequential() .collectList() ); } public Mono> getConsumerGroups( KafkaCluster cluster, Optional topic, List groupIds) { final Mono> consumerGroups; if (groupIds.isEmpty()) { consumerGroups = getConsumerGroupsInternal(cluster); } else { consumerGroups = getConsumerGroupsInternal(cluster, groupIds); } return consumerGroups.map(c -> c.stream() .map(d -> ClusterUtil.filterConsumerGroupTopic(d, topic)) .filter(Optional::isPresent) .map(Optional::get) .map(g -> g.toBuilder().endOffsets( topicPartitionsEndOffsets(cluster, g.getOffsets().keySet()) ).build() ) .collect(Collectors.toList()) ); } public Mono> groupMetadata(KafkaCluster cluster, String consumerGroupId) { return getOrCreateAdminClient(cluster).map(ac -> ac.getAdminClient() .listConsumerGroupOffsets(consumerGroupId) .partitionsToOffsetAndMetadata() ).flatMap(ClusterUtil::toMono); } public Map topicPartitionsEndOffsets( KafkaCluster cluster, Collection topicPartitions) { try (KafkaConsumer consumer = createConsumer(cluster)) { return consumer.endOffsets(topicPartitions); } } public KafkaConsumer createConsumer(KafkaCluster cluster) { return createConsumer(cluster, Map.of()); } public KafkaConsumer createConsumer(KafkaCluster cluster, Map properties) { Properties props = new Properties(); props.putAll(cluster.getProperties()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putAll(properties); return new KafkaConsumer<>(props); } @SneakyThrows public Mono updateTopic(KafkaCluster cluster, String topicName, TopicUpdate topicUpdate) { ConfigResource topicCr = new ConfigResource(ConfigResource.Type.TOPIC, topicName); return getOrCreateAdminClient(cluster) .flatMap(ac -> { if (ac.getSupportedFeatures() .contains(ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) { return incrementalAlterConfig(topicUpdate, topicCr, ac) .flatMap(c -> getUpdatedTopic(ac, topicName)); } else { return alterConfig(topicUpdate, topicCr, ac) .flatMap(c -> getUpdatedTopic(ac, topicName)); } }); } private Mono getUpdatedTopic(ExtendedAdminClient ac, String topicName) { return getTopicsData(ac.getAdminClient()) .map(s -> s.stream() .filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow()); } private Mono incrementalAlterConfig(TopicUpdate topicUpdate, ConfigResource topicCr, ExtendedAdminClient ac) { List listOp = topicUpdate.getConfigs().entrySet().stream() .flatMap(cfg -> Stream.of(new AlterConfigOp(new ConfigEntry(cfg.getKey(), cfg.getValue()), AlterConfigOp.OpType.SET))).collect(Collectors.toList()); return ClusterUtil.toMono( ac.getAdminClient().incrementalAlterConfigs(Collections.singletonMap(topicCr, listOp)) .all(), topicCr.name()); } @SuppressWarnings("deprecation") private Mono alterConfig(TopicUpdate topicUpdate, ConfigResource topicCr, ExtendedAdminClient ac) { List configEntries = topicUpdate.getConfigs().entrySet().stream() .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))) .collect(Collectors.toList()); Config config = new Config(configEntries); Map map = Collections.singletonMap(topicCr, config); return ClusterUtil.toMono(ac.getAdminClient().alterConfigs(map).all(), topicCr.name()); } private InternalTopic mergeWithStats(InternalTopic topic, Map topics, Map partitions) { final LongSummaryStatistics stats = topics.get(topic.getName()); return topic.toBuilder() .segmentSize(stats.getSum()) .segmentCount(stats.getCount()) .partitions( topic.getPartitions().entrySet().stream().map(e -> Tuples.of(e.getKey(), mergeWithStats(topic.getName(), e.getValue(), partitions)) ).collect(Collectors.toMap( Tuple2::getT1, Tuple2::getT2 )) ).build(); } private InternalPartition mergeWithStats(String topic, InternalPartition partition, Map partitions) { final LongSummaryStatistics stats = partitions.get(new TopicPartition(topic, partition.getPartition())); return partition.toBuilder() .segmentSize(stats.getSum()) .segmentCount(stats.getCount()) .build(); } private Mono updateSegmentMetrics(AdminClient ac, InternalClusterMetrics clusterMetrics, List internalTopics) { List names = internalTopics.stream().map(InternalTopic::getName).collect(Collectors.toList()); return ClusterUtil.toMono(ac.describeTopics(names).all()).flatMap(topic -> ClusterUtil.toMono(ac.describeCluster().nodes()).flatMap(nodes -> ClusterUtil.toMono( ac.describeLogDirs(nodes.stream().map(Node::id).collect(Collectors.toList())).all()) .map(log -> { final List> topicPartitions = log.entrySet().stream().flatMap(b -> b.getValue().entrySet().stream().flatMap(topicMap -> topicMap.getValue().replicaInfos.entrySet().stream() .map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size)) ) ).collect(Collectors.toList()); final Map partitionStats = topicPartitions.stream().collect( Collectors.groupingBy( Tuple2::getT2, Collectors.summarizingLong(Tuple3::getT3) ) ); final Map topicStats = topicPartitions.stream().collect( Collectors.groupingBy( t -> t.getT2().topic(), Collectors.summarizingLong(Tuple3::getT3) ) ); final Map brokerStats = topicPartitions.stream().collect( Collectors.groupingBy( Tuple2::getT1, Collectors.summarizingLong(Tuple3::getT3) ) ); final LongSummaryStatistics summary = topicPartitions.stream().collect(Collectors.summarizingLong(Tuple3::getT3)); final Map resultTopics = internalTopics.stream().map(e -> Tuples.of(e.getName(), mergeWithStats(e, topicStats, partitionStats)) ).collect(Collectors.toMap( Tuple2::getT1, Tuple2::getT2 )); final Map resultBrokers = brokerStats.entrySet().stream().map(e -> Tuples.of(e.getKey(), InternalBrokerDiskUsage.builder() .segmentSize(e.getValue().getSum()) .segmentCount(e.getValue().getCount()) .build() ) ).collect(Collectors.toMap( Tuple2::getT1, Tuple2::getT2 )); return InternalSegmentSizeDto.builder() .clusterMetricsWithSegmentSize( clusterMetrics.toBuilder() .segmentSize(summary.getSum()) .segmentCount(summary.getCount()) .internalBrokerDiskUsage(resultBrokers) .build() ) .internalTopicWithSegmentSize(resultTopics).build(); }) ) ); } public List getJmxMetric(String clusterName, Node node) { return clustersStorage.getClusterByName(clusterName) .filter(c -> c.getJmxPort() != null) .filter(c -> c.getJmxPort() > 0) .map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host())) .orElse(Collections.emptyList()); } private Mono fillJmxMetrics(InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) { return fillBrokerMetrics(internalClusterMetrics, clusterName, ac) .map(this::calculateClusterMetrics); } private Mono fillBrokerMetrics( InternalClusterMetrics internalClusterMetrics, String clusterName, AdminClient ac) { return ClusterUtil.toMono(ac.describeCluster().nodes()) .flatMapIterable(nodes -> nodes) .map(broker -> Map.of(broker.id(), InternalBrokerMetrics.builder() .metrics(getJmxMetric(clusterName, broker)).build()) ) .collectList() .map(s -> internalClusterMetrics.toBuilder() .internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build()); } private InternalClusterMetrics calculateClusterMetrics( InternalClusterMetrics internalClusterMetrics) { final List metrics = internalClusterMetrics.getInternalBrokerMetrics().values().stream() .flatMap(b -> b.getMetrics().stream()) .collect( Collectors.groupingBy( Metric::getCanonicalName, Collectors.reducing(jmxClusterUtil::reduceJmxMetrics) ) ).values().stream() .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toList()); final InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = internalClusterMetrics.toBuilder().metrics(metrics); metricsBuilder.bytesInPerSec(findTopicMetrics( metrics, JmxMetricsName.BytesInPerSec, JmxMetricsValueName.FiveMinuteRate )); metricsBuilder.bytesOutPerSec(findTopicMetrics( metrics, JmxMetricsName.BytesOutPerSec, JmxMetricsValueName.FiveMinuteRate )); return metricsBuilder.build(); } private Map findTopicMetrics(List metrics, JmxMetricsName metricsName, JmxMetricsValueName valueName) { return metrics.stream().filter(m -> metricsName.name().equals(m.getName())) .filter(m -> m.getParams().containsKey("topic")) .filter(m -> m.getValue().containsKey(valueName.name())) .map(m -> Tuples.of( m.getParams().get("topic"), m.getValue().get(valueName.name()) )).collect(Collectors.groupingBy( Tuple2::getT1, Collectors.reducing(BigDecimal.ZERO, Tuple2::getT2, BigDecimal::add) )); } public Map getTopicPartitions(KafkaCluster c, InternalTopic topic) { var tps = topic.getPartitions().values().stream() .map(t -> new TopicPartition(topic.getName(), t.getPartition())) .collect(Collectors.toList()); Map partitions = topic.getPartitions().values().stream().collect(Collectors.toMap( InternalPartition::getPartition, tp -> tp )); try (var consumer = createConsumer(c)) { final Map earliest = consumer.beginningOffsets(tps); final Map latest = consumer.endOffsets(tps); return tps.stream() .map(tp -> partitions.get(tp.partition()).toBuilder() .offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L)) .offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L)) .build() ).collect(Collectors.toMap( InternalPartition::getPartition, tp -> tp )); } catch (Exception e) { return Collections.emptyMap(); } } public Mono deleteTopicMessages(KafkaCluster cluster, Map offsets) { var records = offsets.entrySet().stream() .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return getOrCreateAdminClient(cluster).map(ExtendedAdminClient::getAdminClient) .map(ac -> ac.deleteRecords(records)).then(); } public Mono sendMessage(KafkaCluster cluster, String topic, CreateTopicMessage msg) { RecordSerDe serde = deserializationService.getRecordDeserializerForCluster(cluster); Properties properties = new Properties(); properties.putAll(cluster.getProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); try (KafkaProducer producer = new KafkaProducer<>(properties)) { final ProducerRecord producerRecord = serde.serialize( topic, msg.getKey(), msg.getContent(), msg.getPartition() ); CompletableFuture cf = new CompletableFuture<>(); producer.send(producerRecord, (metadata, exception) -> { if (exception != null) { cf.completeExceptionally(exception); } else { cf.complete(metadata); } }); return Mono.fromFuture(cf); } } private Mono increaseTopicPartitions(AdminClient adminClient, String topicName, Map newPartitionsMap ) { return ClusterUtil.toMono(adminClient.createPartitions(newPartitionsMap).all(), topicName) .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next()); } public Mono increaseTopicPartitions( KafkaCluster cluster, String topicName, PartitionsIncrease partitionsIncrease) { return getOrCreateAdminClient(cluster) .flatMap(ac -> { Integer actualCount = cluster.getTopics().get(topicName).getPartitionCount(); Integer requestedCount = partitionsIncrease.getTotalPartitionsCount(); if (requestedCount < actualCount) { return Mono.error( new ValidationException(String.format( "Topic currently has %s partitions, which is higher than the requested %s.", actualCount, requestedCount))); } if (requestedCount.equals(actualCount)) { return Mono.error( new ValidationException( String.format("Topic already has %s partitions.", actualCount))); } Map newPartitionsMap = Collections.singletonMap( topicName, NewPartitions.increaseTo(partitionsIncrease.getTotalPartitionsCount()) ); return increaseTopicPartitions(ac.getAdminClient(), topicName, newPartitionsMap); }); } private Mono changeReplicationFactor( AdminClient adminClient, String topicName, Map> reassignments ) { return ClusterUtil.toMono(adminClient .alterPartitionReassignments(reassignments).all(), topicName) .flatMap(topic -> getTopicsData(adminClient, Collections.singleton(topic)).next()); } /** * Change topic replication factor, works on brokers versions 5.4.x and higher */ public Mono changeReplicationFactor( KafkaCluster cluster, String topicName, ReplicationFactorChange replicationFactorChange) { return getOrCreateAdminClient(cluster) .flatMap(ac -> { Integer actual = cluster.getTopics().get(topicName).getReplicationFactor(); Integer requested = replicationFactorChange.getTotalReplicationFactor(); Integer brokersCount = cluster.getMetrics().getBrokerCount(); if (requested.equals(actual)) { return Mono.error( new ValidationException( String.format("Topic already has replicationFactor %s.", actual))); } if (requested > brokersCount) { return Mono.error( new ValidationException( String.format("Requested replication factor %s more than brokers count %s.", requested, brokersCount))); } return changeReplicationFactor(ac.getAdminClient(), topicName, getPartitionsReassignments(cluster, topicName, replicationFactorChange)); }); } public Mono>> getClusterLogDirs( KafkaCluster cluster, List reqBrokers) { return getOrCreateAdminClient(cluster) .map(admin -> { List brokers = new ArrayList<>(cluster.getBrokers()); if (reqBrokers != null && !reqBrokers.isEmpty()) { brokers.retainAll(reqBrokers); } return admin.getAdminClient().describeLogDirs(brokers); }) .flatMap(result -> ClusterUtil.toMono(result.all())) .onErrorResume(TimeoutException.class, (TimeoutException e) -> { log.error("Error during fetching log dirs", e); return Mono.just(new HashMap<>()); }); } private Map> getPartitionsReassignments( KafkaCluster cluster, String topicName, ReplicationFactorChange replicationFactorChange) { // Current assignment map (Partition number -> List of brokers) Map> currentAssignment = getCurrentAssignment(cluster, topicName); // Brokers map (Broker id -> count) Map brokersUsage = getBrokersMap(cluster, currentAssignment); int currentReplicationFactor = cluster.getTopics().get(topicName).getReplicationFactor(); // If we should to increase Replication factor if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) { // For each partition for (var assignmentList : currentAssignment.values()) { // Get brokers list sorted by usage var brokers = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) .collect(Collectors.toList()); // Iterate brokers and try to add them in assignment // while (partition replicas count != requested replication factor) for (Integer broker : brokers) { if (!assignmentList.contains(broker)) { assignmentList.add(broker); brokersUsage.merge(broker, 1, Integer::sum); } if (assignmentList.size() == replicationFactorChange.getTotalReplicationFactor()) { break; } } if (assignmentList.size() != replicationFactorChange.getTotalReplicationFactor()) { throw new ValidationException("Something went wrong during adding replicas"); } } // If we should to decrease Replication factor } else if (replicationFactorChange.getTotalReplicationFactor() < currentReplicationFactor) { for (Map.Entry> assignmentEntry : currentAssignment.entrySet()) { var partition = assignmentEntry.getKey(); var brokers = assignmentEntry.getValue(); // Get brokers list sorted by usage in reverse order var brokersUsageList = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) .map(Map.Entry::getKey) .collect(Collectors.toList()); // Iterate brokers and try to remove them from assignment // while (partition replicas count != requested replication factor) for (Integer broker : brokersUsageList) { // Check is the broker the leader of partition if (!cluster.getTopics().get(topicName).getPartitions().get(partition).getLeader() .equals(broker)) { brokers.remove(broker); brokersUsage.merge(broker, -1, Integer::sum); } if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) { break; } } if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) { throw new ValidationException("Something went wrong during removing replicas"); } } } else { throw new ValidationException("Replication factor already equals requested"); } // Return result map return currentAssignment.entrySet().stream().collect(Collectors.toMap( e -> new TopicPartition(topicName, e.getKey()), e -> Optional.of(new NewPartitionReassignment(e.getValue())) )); } private Map> getCurrentAssignment(KafkaCluster cluster, String topicName) { return cluster.getTopics().get(topicName).getPartitions().values().stream() .collect(Collectors.toMap( InternalPartition::getPartition, p -> p.getReplicas().stream() .map(InternalReplica::getBroker) .collect(Collectors.toList()) )); } private Map getBrokersMap(KafkaCluster cluster, Map> currentAssignment) { Map result = cluster.getBrokers().stream() .collect(Collectors.toMap( c -> c, c -> 0 )); currentAssignment.values().forEach(brokers -> brokers .forEach(broker -> result.put(broker, result.get(broker) + 1))); return result; } public Mono updateBrokerLogDir(KafkaCluster cluster, Integer broker, BrokerLogdirUpdate brokerLogDir) { return getOrCreateAdminClient(cluster) .flatMap(ac -> updateBrokerLogDir(ac, brokerLogDir, broker)); } private Mono updateBrokerLogDir(ExtendedAdminClient adminMono, BrokerLogdirUpdate b, Integer broker) { Map req = Map.of( new TopicPartitionReplica(b.getTopic(), b.getPartition(), broker), b.getLogDir()); return Mono.just(adminMono) .map(admin -> admin.getAdminClient().alterReplicaLogDirs(req)) .flatMap(result -> ClusterUtil.toMono(result.all())) .onErrorResume(UnknownTopicOrPartitionException.class, e -> Mono.error(new TopicOrPartitionNotFoundException())) .onErrorResume(LogDirNotFoundException.class, e -> Mono.error(new LogDirNotFoundApiException())) .doOnError(log::error); } public Mono updateBrokerConfigByName(KafkaCluster cluster, Integer broker, String name, String value) { return getOrCreateAdminClient(cluster) .flatMap(ac -> updateBrokerConfigByName(ac, broker, name, value)); } private Mono updateBrokerConfigByName(ExtendedAdminClient admin, Integer broker, String name, String value) { ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(broker)); AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET); return Mono.just(admin) .map(a -> a.getAdminClient().incrementalAlterConfigs(Map.of(cr, List.of(op)))) .flatMap(result -> ClusterUtil.toMono(result.all())) .onErrorResume(InvalidRequestException.class, e -> Mono.error(new InvalidRequestApiException(e.getMessage()))) .doOnError(log::error); } }