Refactor for mono (#28)

* starting refactor mono

* topics and cluster refactored into mono way instead of get

* fixed dirty mistakes

* param

* changed collecttopicdata method to correctly

* refactored metrics to copyonwrite methoology

* metrics params updated

* fixed silly mistake

* Let's think immutable

* immutable

* dumb mistakes fixed

* changed to immutable

* added new mappers

* changed to immutable

* typo was fixed

* imports were cleared

* Refactored

* imports were optimized

Co-authored-by: Roman Nedzvetskiy <roman@Romans-MacBook-Pro.local>
Co-authored-by: German Osin <german.osin@gmail.com>
This commit is contained in:
Roman Nedzvetskiy 2020-04-24 11:15:58 +03:00 committed by GitHub
parent ef1edba34b
commit b2465775e5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 535 additions and 408 deletions

View file

@ -1,12 +1,15 @@
package com.provectus.kafka.ui.cluster;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.Map;
@Component
@RequiredArgsConstructor
@ -19,8 +22,11 @@ public class ClustersMetricsScheduler {
@Scheduled(fixedRate = 30000)
public void updateMetrics() {
for (KafkaCluster kafkaCluster : clustersStorage.getKafkaClusters()) {
metricsUpdateService.updateMetrics(kafkaCluster);
}
Flux.fromIterable(clustersStorage.getKafkaClustersMap().entrySet())
.subscribeOn(Schedulers.parallel())
.map(Map.Entry::getValue)
.flatMap(metricsUpdateService::updateMetrics)
.doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s))
.subscribe();
}
}

View file

@ -0,0 +1,13 @@
package com.provectus.kafka.ui.cluster.mapper;
import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
import com.provectus.kafka.ui.model.BrokersMetrics;
import org.mapstruct.Mapper;
@Mapper(componentModel = "spring")
public interface BrokersMetricsMapper {
InternalClusterMetrics toBrokersMetricsDto (BrokersMetrics brokersMetrics);
BrokersMetrics toBrokersMetrics (InternalClusterMetrics brokersMetrics);
}

View file

@ -0,0 +1,12 @@
package com.provectus.kafka.ui.cluster.mapper;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.model.Cluster;
import org.mapstruct.Mapper;
@Mapper(componentModel = "spring")
public interface ClusterDtoMapper {
KafkaCluster toInternalCluster(Cluster cluster);
Cluster toClusterDto(KafkaCluster cluster);
}

View file

@ -1,23 +1,28 @@
package com.provectus.kafka.ui.cluster.mapper;
import com.provectus.kafka.ui.cluster.config.ClustersProperties;
import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
import com.provectus.kafka.ui.cluster.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.model.*;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
@Mapper
public abstract class ClusterMapper {
@Mapper(componentModel = "spring")
public interface ClusterMapper {
@Mapping(source = "name", target = "cluster.name")
@Mapping(target = "brokersMetrics", ignore = true)
@Mapping(target = "cluster", ignore = true)
@Mapping(target = "lastKafkaException", ignore = true)
@Mapping(target = "lastZookeeperException", ignore = true)
@Mapping(target = "topicConfigsMap", ignore = true)
@Mapping(target = "topicDetailsMap", ignore = true)
@Mapping(target = "topics", ignore = true)
@Mapping(target = "zkClient", ignore = true)
@Mapping(target = "zookeeperStatus", ignore = true)
@Mapping(target = "adminClient", ignore = true)
public abstract KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
@Mapping(target = "brokerCount", source = "metrics.brokerCount")
@Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
@Mapping(target = "topicCount", source = "metrics.topicCount")
@Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec")
@Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec")
Cluster toCluster(KafkaCluster cluster);
BrokersMetrics toBrokerMetrics(InternalClusterMetrics metrics);
Topic toTopic(InternalTopic topic);
TopicDetails toTopicDetails(InternalTopic topic);
TopicConfig toTopicConfig(InternalTopicConfig topic);
}

View file

@ -7,13 +7,16 @@ import org.mapstruct.factory.Mappers;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@Component
@RequiredArgsConstructor
public class ClustersStorage {
private final Map<String, KafkaCluster> kafkaClusters = new HashMap<>();
private final Map<String, KafkaCluster> kafkaClusters = new ConcurrentHashMap<>();
private final ClustersProperties clusterProperties;
@ -33,7 +36,15 @@ public class ClustersStorage {
return kafkaClusters.values();
}
public KafkaCluster getClusterByName(String clusterName) {
return kafkaClusters.get(clusterName);
public Optional<KafkaCluster> getClusterByName(String clusterName) {
return Optional.ofNullable(kafkaClusters.get(clusterName));
}
public void setKafkaCluster(String key, KafkaCluster kafkaCluster) {
this.kafkaClusters.put(key, kafkaCluster);
}
public Map<String, KafkaCluster> getKafkaClustersMap() {
return kafkaClusters;
}
}

View file

@ -0,0 +1,25 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
@Data
@Builder(toBuilder = true)
public class InternalClusterMetrics {
private final int brokerCount;
private final int topicCount;
private final int activeControllers;
private final int uncleanLeaderElectionCount;
private final int onlinePartitionCount;
private final int underReplicatedPartitionCount;
private final int offlinePartitionCount;
private final int inSyncReplicasCount;
private final int outOfSyncReplicasCount;
//TODO: find way to fill
private final int bytesInPerSec;
private final int bytesOutPerSec;
//TODO: find way to fill
private final int segmentSize;
private final int segmentCount;
}

View file

@ -0,0 +1,16 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder
public class InternalPartition {
private final int partition;
private final Integer leader;
private final List<InternalReplica> replicas;
private final int inSyncReplicasCount;
private final int replicasCount;
}

View file

@ -0,0 +1,14 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;
@Data
@Builder
@RequiredArgsConstructor
public class InternalReplica {
private final int broker;
private final boolean leader;
private final boolean inSync;
}

View file

@ -0,0 +1,25 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder(toBuilder = true)
public class InternalTopic {
private final String name;
private final boolean internal;
private final List<InternalPartition> partitions;
private final List<InternalTopicConfig> topicConfigs;
private final int replicas;
private final int partitionCount;
private final int inSyncReplicas;
private final int replicationFactor;
private final int underReplicatedPartitions;
//TODO: find way to fill
private final int segmentSize;
private final int segmentCount;
}

View file

@ -0,0 +1,13 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class InternalTopicConfig {
private final String name;
private final String value;
private final String defaultValue;
}

View file

@ -1,49 +1,26 @@
package com.provectus.kafka.ui.cluster.model;
import com.provectus.kafka.ui.model.*;
import lombok.AccessLevel;
import com.provectus.kafka.ui.model.ServerStatus;
import lombok.Builder;
import lombok.Data;
import lombok.experimental.FieldDefaults;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.admin.AdminClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
@Builder(toBuilder = true)
public class KafkaCluster {
String id = "";
String name;
String jmxHost;
String jmxPort;
String bootstrapServers;
String zookeeper;
private final String id = "";
private final String name;
private final String jmxHost;
private final String jmxPort;
private final String bootstrapServers;
private final String zookeeper;
private final ServerStatus status;
private final ServerStatus zookeeperStatus;
private final InternalClusterMetrics metrics;
private final Map<String, InternalTopic> topics;
private final Throwable lastKafkaException;
private final Throwable lastZookeeperException;
Cluster cluster = new Cluster();
BrokersMetrics brokersMetrics = new BrokersMetrics();
List<Topic> topics = new ArrayList<>();
private Map<String, TopicDetails> topicDetailsMap = new ConcurrentHashMap<>();
private Map<String, List<TopicConfig>> topicConfigsMap = new ConcurrentHashMap<>();
ZkClient zkClient;
AdminClient adminClient;
ServerStatus zookeeperStatus = ServerStatus.OFFLINE;
Exception lastKafkaException;
Exception lastZookeeperException;
public TopicDetails getOrCreateTopicDetails(String key) {
var topicDetails = topicDetailsMap.get(key);
if(topicDetails == null) {
topicDetailsMap.putIfAbsent(key, new TopicDetails());
topicDetails = topicDetailsMap.get(key);
}
return topicDetails;
}
}

View file

@ -1,16 +0,0 @@
package com.provectus.kafka.ui.cluster.model;
import lombok.Data;
@Data
public class KafkaMetrics {
Double bytesInPerSec;
Double bytesOutPerSec;
Integer brokersCount;
Integer topicCount;
Integer activeControllerCount;
Integer onlinePartitionCount;
Integer offlinePartitionCount;
Integer underReplicatedPartitions;
}

View file

@ -1,19 +1,18 @@
package com.provectus.kafka.ui.cluster.service;
import com.provectus.kafka.ui.cluster.mapper.ClusterMapper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Service
@ -21,55 +20,53 @@ import java.util.stream.Collectors;
public class ClusterService {
private final ClustersStorage clustersStorage;
private final ClusterMapper clusterMapper;
private final KafkaService kafkaService;
public Mono<ResponseEntity<Flux<Cluster>>> getClusters() {
List<Cluster> clusters = clustersStorage.getKafkaClusters()
public List<Cluster> getClusters() {
return clustersStorage.getKafkaClusters()
.stream()
.map(KafkaCluster::getCluster)
.map(clusterMapper::toCluster)
.collect(Collectors.toList());
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusters)));
}
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String name) {
KafkaCluster cluster = clustersStorage.getClusterByName(name);
if (cluster == null) return null;
return Mono.just(ResponseEntity.ok(cluster.getBrokersMetrics()));
public Optional<BrokersMetrics> getBrokersMetrics(String name) {
return clustersStorage.getClusterByName(name)
.map(KafkaCluster::getMetrics)
.map(clusterMapper::toBrokerMetrics);
}
public Mono<ResponseEntity<Flux<Topic>>> getTopics(String name) {
KafkaCluster cluster = clustersStorage.getClusterByName(name);
if (cluster == null) return null;
return Mono.just(ResponseEntity.ok(Flux.fromIterable(cluster.getTopics())));
public List<Topic> getTopics(String name) {
return clustersStorage.getClusterByName(name)
.map( c ->
c.getTopics().values().stream()
.map(clusterMapper::toTopic)
.collect(Collectors.toList())
).orElse(Collections.emptyList());
}
public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String name, String topicName) {
KafkaCluster cluster = clustersStorage.getClusterByName(name);
if (cluster == null) return null;
return Mono.just(ResponseEntity.ok(cluster.getOrCreateTopicDetails(topicName)));
public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
return clustersStorage.getClusterByName(name).flatMap(
c -> Optional.ofNullable(c.getTopics().get(topicName))
).map(clusterMapper::toTopicDetails);
}
public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String name, String topicName) {
KafkaCluster cluster = clustersStorage.getClusterByName(name);
if (cluster == null) return null;
return Mono.just(ResponseEntity.ok(Flux.fromIterable(cluster.getTopicConfigsMap().get(topicName))));
public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
return clustersStorage.getClusterByName(name).flatMap(
c -> Optional.ofNullable(c.getTopics().get(topicName))
).map( t -> t.getTopicConfigs().stream().map(clusterMapper::toTopicConfig).collect(Collectors.toList()));
}
public Mono<ResponseEntity<Topic>> createTopic(String name, Mono<TopicFormData> topicFormData) {
KafkaCluster cluster = clustersStorage.getClusterByName(name);
if (cluster == null) return null;
return kafkaService.createTopic(cluster, topicFormData);
public Mono<Topic> createTopic(String name, Mono<TopicFormData> topicFormData) {
return clustersStorage.getClusterByName(name).map(
cluster -> kafkaService.createTopic(cluster, topicFormData)
).orElse(Mono.empty()).map(clusterMapper::toTopic);
}
@SneakyThrows
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup (String clusterName) {
var cluster = clustersStorage.getClusterByName(clusterName);
return ClusterUtil.toMono(cluster.getAdminClient().listConsumerGroups().all())
.flatMap(s -> ClusterUtil.toMono(cluster.getAdminClient()
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
.map(s -> s.values().stream()
.map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()))
.map(s -> ResponseEntity.ok(Flux.fromIterable(s)));
public Mono<List<ConsumerGroup>> getConsumerGroups(String clusterName) {
return clustersStorage.getClusterByName(clusterName)
.map(kafkaService::getConsumerGroups)
.orElse(Mono.empty());
}
}

View file

@ -5,8 +5,8 @@ import com.provectus.kafka.ui.kafka.KafkaService;
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
@ -14,12 +14,9 @@ import org.springframework.stereotype.Service;
public class MetricsUpdateService {
private final KafkaService kafkaService;
private final ZookeeperService zookeeperService;
@Async
public void updateMetrics(KafkaCluster kafkaCluster) {
log.debug("Start getting metrics for kafkaCluster: " + kafkaCluster.getName());
kafkaService.loadClusterMetrics(kafkaCluster);
zookeeperService.checkZookeeperStatus(kafkaCluster);
public Mono<KafkaCluster> updateMetrics(KafkaCluster kafkaCluster) {
log.debug("Start getting metrics for kafkaCluster: {}", kafkaCluster);
return kafkaService.getUpdatedCluster(kafkaCluster);
}
}

View file

@ -1,13 +1,18 @@
package com.provectus.kafka.ui.cluster.util;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.cluster.model.*;
import com.provectus.kafka.ui.model.ConsumerGroup;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import reactor.core.publisher.Mono;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import java.util.stream.Collectors;
import static com.provectus.kafka.ui.kafka.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
public class ClusterUtil {
@ -23,12 +28,72 @@ public class ClusterUtil {
public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
ConsumerGroup consumerGroup = new ConsumerGroup();
consumerGroup.setClusterId(cluster.getCluster().getId());
consumerGroup.setClusterId(cluster.getId());
consumerGroup.setConsumerGroupId(c.groupId());
consumerGroup.setNumConsumers(c.members().size());
Set<String> topics = new HashSet<>();
c.members().forEach(s1 -> s1.assignment().topicPartitions().forEach(s2 -> topics.add(s2.topic())));
consumerGroup.setNumTopics(topics.size());
int numTopics = c.members().stream().mapToInt( m -> m.assignment().topicPartitions().size()).sum();
consumerGroup.setNumTopics(numTopics);
return consumerGroup;
}
public static InternalTopicConfig mapToInternalTopicConfig(ConfigEntry configEntry) {
InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder()
.name(configEntry.name())
.value(configEntry.value());
if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
builder.defaultValue(configEntry.value());
} else {
builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name()));
}
return builder.build();
}
public static InternalTopic mapToInternalTopic(TopicDescription topicDescription) {
var topic = InternalTopic.builder();
topic.internal(topicDescription.isInternal());
topic.name(topicDescription.name());
List<InternalPartition> partitions = topicDescription.partitions().stream().map(
partition -> {
var partitionDto = InternalPartition.builder();
partitionDto.leader(partition.leader().id());
partitionDto.partition(partition.partition());
partitionDto.inSyncReplicasCount(partition.isr().size());
partitionDto.replicasCount(partition.replicas().size());
List<InternalReplica> replicas = partition.replicas().stream().map(
r -> new InternalReplica(r.id(), partition.leader().id()!=r.id(), partition.isr().contains(r)))
.collect(Collectors.toList());
partitionDto.replicas(replicas);
return partitionDto.build();
})
.collect(Collectors.toList());
int urpCount = partitions.stream()
.flatMap(partition -> partition.getReplicas().stream())
.filter(InternalReplica::isInSync).mapToInt(e -> 1)
.sum();
int inSyncReplicasCount = partitions.stream()
.mapToInt(InternalPartition::getInSyncReplicasCount)
.sum();
int replicasCount = partitions.stream()
.mapToInt(InternalPartition::getReplicasCount)
.sum();
topic.partitions(partitions);
topic.replicas(replicasCount);
topic.partitionCount(topicDescription.partitions().size());
topic.inSyncReplicas(inSyncReplicasCount);
topic.replicationFactor(
topicDescription.partitions().size() > 0 ?
topicDescription.partitions().get(0).replicas().size() : 0
);
topic.underReplicatedPartitions(urpCount);
return topic.build();
}
}

View file

@ -1,270 +1,249 @@
package com.provectus.kafka.ui.kafka;
import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
import com.provectus.kafka.ui.cluster.model.InternalTopic;
import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import com.provectus.kafka.ui.model.*;
import com.provectus.kafka.ui.cluster.util.ClusterUtil;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ServerStatus;
import com.provectus.kafka.ui.model.TopicFormData;
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.*;
import static com.provectus.kafka.ui.kafka.KafkaConstants.*;
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
@Log4j2
public class KafkaService {
@SneakyThrows
@Async
public void loadClusterMetrics(KafkaCluster kafkaCluster) {
log.debug("Start getting Kafka metrics for cluster: " + kafkaCluster.getName());
boolean isConnected = false;
if (kafkaCluster.getAdminClient() != null) {
isConnected = isAdminClientConnected(kafkaCluster);
}
if (kafkaCluster.getAdminClient() == null || !isConnected) {
isConnected = createAdminClient(kafkaCluster);
}
if (!isConnected) {
kafkaCluster.getCluster().setStatus(ServerStatus.OFFLINE);
return;
}
kafkaCluster.getCluster().setId(kafkaCluster.getId());
kafkaCluster.getCluster().setStatus(ServerStatus.ONLINE);
loadMetrics(kafkaCluster);
loadTopicsData(kafkaCluster);
}
private static final ListTopicsOptions LIST_TOPICS_OPTIONS = new ListTopicsOptions().listInternal(true);
private final ZookeeperService zookeeperService;
private final Map<String, AdminClient> adminClientCache = new ConcurrentHashMap<>();
@SneakyThrows
public Mono<ResponseEntity<Topic>> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
return topicFormData.flatMap(
topicData -> {
AdminClient adminClient = cluster.getAdminClient();
NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
newTopic.configs(topicData.getConfigs());
createTopic(adminClient, newTopic);
DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
var entry = topicDescriptionFuturesMap.entrySet().iterator().next();
var topicDescription = getTopicDescription(entry);
if (topicDescription == null) return Mono.error(new RuntimeException("Can't find created topic"));
Topic topic = collectTopicData(cluster, topicDescription);
cluster.getTopics().add(topic);
return Mono.just(new ResponseEntity<>(topic, HttpStatus.CREATED));
}
public Mono<KafkaCluster> getUpdatedCluster(KafkaCluster cluster) {
return getOrCreateAdminClient(cluster).flatMap(
ac -> getClusterMetrics(ac).flatMap( clusterMetrics ->
getTopicsData(ac).flatMap( topics ->
loadTopicsConfig(ac, topics.stream().map(InternalTopic::getName).collect(Collectors.toList()))
.map( configs -> mergeWithConfigs(topics, configs) )
).map( topics -> buildFromData(cluster, clusterMetrics, topics))
)
).onErrorResume(
e -> Mono.just(cluster.toBuilder()
.status(ServerStatus.OFFLINE)
.lastKafkaException(e)
.build())
);
}
@SneakyThrows
private String getClusterId(KafkaCluster kafkaCluster) {
return kafkaCluster.getAdminClient().describeCluster().clusterId().get();
}
private KafkaCluster buildFromData(KafkaCluster currentCluster, InternalClusterMetrics brokersMetrics, Map<String, InternalTopic> topics) {
private boolean createAdminClient(KafkaCluster kafkaCluster) {
InternalClusterMetrics.InternalClusterMetricsBuilder metricsBuilder = brokersMetrics.toBuilder();
InternalClusterMetrics topicsMetrics = collectTopicsMetrics(topics);
ServerStatus zookeeperStatus = ServerStatus.OFFLINE;
Throwable zookeeperException = null;
try {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
kafkaCluster.setAdminClient(AdminClient.create(properties));
kafkaCluster.setId(getClusterId(kafkaCluster));
kafkaCluster.getCluster().setId(kafkaCluster.getId());
return true;
} catch (Exception e) {
log.error(e);
kafkaCluster.setLastKafkaException(e);
return false;
zookeeperStatus = zookeeperService.isZookeeperOnline(currentCluster) ? ServerStatus.ONLINE : ServerStatus.OFFLINE;
} catch (Throwable e) {
zookeeperException = e;
}
InternalClusterMetrics clusterMetrics = metricsBuilder
.activeControllers(brokersMetrics.getActiveControllers())
.brokerCount(brokersMetrics.getBrokerCount())
.underReplicatedPartitionCount(topicsMetrics.getUnderReplicatedPartitionCount())
.inSyncReplicasCount(topicsMetrics.getInSyncReplicasCount())
.outOfSyncReplicasCount(topicsMetrics.getOutOfSyncReplicasCount())
.onlinePartitionCount(topicsMetrics.getOnlinePartitionCount())
.offlinePartitionCount(topicsMetrics.getOfflinePartitionCount()).build();
return currentCluster.toBuilder()
.status(ServerStatus.ONLINE)
.zookeeperStatus(zookeeperStatus)
.lastZookeeperException(zookeeperException)
.lastKafkaException(null)
.metrics(clusterMetrics)
.topics(topics)
.build();
}
private boolean isAdminClientConnected(KafkaCluster kafkaCluster) {
try {
getClusterId(kafkaCluster);
private InternalClusterMetrics collectTopicsMetrics(Map<String,InternalTopic> topics) {
return true;
} catch (Exception e) {
log.error(e);
kafkaCluster.setLastKafkaException(e);
int underReplicatedPartitions = 0;
int inSyncReplicasCount = 0;
int outOfSyncReplicasCount = 0;
int onlinePartitionCount = 0;
int offlinePartitionCount = 0;
return false;
for (InternalTopic topic : topics.values()) {
underReplicatedPartitions += topic.getUnderReplicatedPartitions();
inSyncReplicasCount += topic.getInSyncReplicas();
outOfSyncReplicasCount += (topic.getReplicas() - topic.getInSyncReplicas());
onlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() == null ? 0 : 1).sum();
offlinePartitionCount += topic.getPartitions().stream().mapToInt(s -> s.getLeader() != null ? 0 : 1).sum();
}
return InternalClusterMetrics.builder()
.underReplicatedPartitionCount(underReplicatedPartitions)
.inSyncReplicasCount(inSyncReplicasCount)
.outOfSyncReplicasCount(outOfSyncReplicasCount)
.onlinePartitionCount(onlinePartitionCount)
.offlinePartitionCount(offlinePartitionCount)
.build();
}
private Map<String, InternalTopic> mergeWithConfigs(List<InternalTopic> topics, Map<String, List<InternalTopicConfig>> configs) {
return topics.stream().map(
t -> t.toBuilder().topicConfigs(configs.get(t.getName())).build()
).collect(Collectors.toMap(
InternalTopic::getName,
e -> e
));
}
@SneakyThrows
private void loadTopicsData(KafkaCluster kafkaCluster) {
AdminClient adminClient = kafkaCluster.getAdminClient();
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
listTopicsOptions.listInternal(true);
var topicListings = adminClient.listTopics(listTopicsOptions).names().get();
kafkaCluster.getCluster().setTopicCount(topicListings.size());
DescribeTopicsResult topicDescriptionsWrapper = adminClient.describeTopics(topicListings);
Map<String, KafkaFuture<TopicDescription>> topicDescriptionFuturesMap = topicDescriptionsWrapper.values();
List<Topic> foundTopics = new ArrayList<>();
resetMetrics(kafkaCluster);
for (var entry : topicDescriptionFuturesMap.entrySet()) {
var topicDescription = getTopicDescription(entry);
if (topicDescription == null) continue;
Topic topic = collectTopicData(kafkaCluster, topicDescription);
foundTopics.add(topic);
}
kafkaCluster.setTopics(foundTopics);
private Mono<List<InternalTopic>> getTopicsData(AdminClient adminClient) {
return ClusterUtil.toMono(adminClient.listTopics(LIST_TOPICS_OPTIONS).names())
.flatMap(topics -> ClusterUtil.toMono(adminClient.describeTopics(topics).all()))
.map( m -> m.values().stream().map(ClusterUtil::mapToInternalTopic).collect(Collectors.toList()));
}
private void resetMetrics(KafkaCluster kafkaCluster) {
kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(0);
kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(0);
kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(0);
kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(0);
kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(0);
private Mono<InternalClusterMetrics> getClusterMetrics(AdminClient client) {
return ClusterUtil.toMono(client.describeCluster().nodes())
.flatMap(brokers ->
ClusterUtil.toMono(client.describeCluster().controller()).map(
c -> {
InternalClusterMetrics.InternalClusterMetricsBuilder builder = InternalClusterMetrics.builder();
builder.brokerCount(brokers.size()).activeControllers(c != null ? 1 : 0);
// TODO: fill bytes in/out metrics
List<Integer> brokerIds = brokers.stream().map(Node::id).collect(Collectors.toList());
return builder.build();
}
)
);
}
private Topic collectTopicData(KafkaCluster kafkaCluster, TopicDescription topicDescription) {
var topic = new Topic();
topic.setInternal(topicDescription.isInternal());
topic.setName(topicDescription.name());
int inSyncReplicasCount = 0, replicasCount = 0;
List<Partition> partitions = new ArrayList<>();
int urpCount = 0;
for (TopicPartitionInfo partition : topicDescription.partitions()) {
var partitionDto = new Partition();
partitionDto.setLeader(partition.leader().id());
partitionDto.setPartition(partition.partition());
List<Replica> replicas = new ArrayList<>();
boolean isUrp = false;
for (Node replicaNode : partition.replicas()) {
var replica = new Replica();
replica.setBroker(replicaNode.id());
replica.setLeader(partition.leader() != null && partition.leader().id() == replicaNode.id());
replica.setInSync(partition.isr().contains(replicaNode));
if (!replica.getInSync()) {
isUrp = true;
}
replicas.add(replica);
inSyncReplicasCount += partition.isr().size();
replicasCount += partition.replicas().size();
}
if (isUrp) {
urpCount++;
}
partitionDto.setReplicas(replicas);
partitions.add(partitionDto);
if (partition.leader() != null) {
kafkaCluster.getBrokersMetrics().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount() + 1);
} else {
kafkaCluster.getBrokersMetrics().setOfflinePartitionCount(kafkaCluster.getBrokersMetrics().getOfflinePartitionCount() + 1);
}
}
kafkaCluster.getCluster().setOnlinePartitionCount(kafkaCluster.getBrokersMetrics().getOnlinePartitionCount());
kafkaCluster.getBrokersMetrics().setUnderReplicatedPartitionCount(
kafkaCluster.getBrokersMetrics().getUnderReplicatedPartitionCount() + urpCount);
kafkaCluster.getBrokersMetrics().setInSyncReplicasCount(
kafkaCluster.getBrokersMetrics().getInSyncReplicasCount() + inSyncReplicasCount);
kafkaCluster.getBrokersMetrics().setOutOfSyncReplicasCount(
kafkaCluster.getBrokersMetrics().getOutOfSyncReplicasCount() + (replicasCount - inSyncReplicasCount));
topic.setPartitions(partitions);
TopicDetails topicDetails = kafkaCluster.getOrCreateTopicDetails(topicDescription.name());
topicDetails.setReplicas(replicasCount);
topicDetails.setPartitionCount(topicDescription.partitions().size());
topicDetails.setInSyncReplicas(inSyncReplicasCount);
topicDetails.setReplicationFactor(topicDescription.partitions().size() > 0
? topicDescription.partitions().get(0).replicas().size()
: null);
topicDetails.setUnderReplicatedPartitions(urpCount);
loadTopicConfig(kafkaCluster, topicDescription.name());
return topic;
}
private TopicDescription getTopicDescription(Map.Entry<String, KafkaFuture<TopicDescription>> entry) {
try {
return entry.getValue().get();
} catch (Exception e) {
log.error("Can't get topic with name: " + entry.getKey(), e);
return null;
}
}
private void loadMetrics(KafkaCluster kafkaCluster) throws InterruptedException, java.util.concurrent.ExecutionException {
AdminClient adminClient = kafkaCluster.getAdminClient();
int brokerCount = adminClient.describeCluster().nodes().get().size();
kafkaCluster.getCluster().setBrokerCount(brokerCount);
kafkaCluster.getBrokersMetrics().setBrokerCount(brokerCount);
kafkaCluster.getBrokersMetrics().setActiveControllers(adminClient.describeCluster().controller().get() != null ? 1 : 0);
for (Map.Entry<MetricName, ? extends Metric> metricNameEntry : adminClient.metrics().entrySet()) {
if (metricNameEntry.getKey().name().equals(IN_BYTE_PER_SEC_METRIC)
&& metricNameEntry.getKey().description().equals(IN_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
kafkaCluster.getCluster().setBytesInPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
}
if (metricNameEntry.getKey().name().equals(OUT_BYTE_PER_SEC_METRIC)
&& metricNameEntry.getKey().description().equals(OUT_BYTE_PER_SEC_METRIC_DESCRIPTION)) {
kafkaCluster.getCluster().setBytesOutPerSec((int) Math.round((double) metricNameEntry.getValue().metricValue()));
}
}
public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicFormData> topicFormData) {
AdminClient adminClient = this.createAdminClient(cluster);
return this.createTopic(adminClient, topicFormData);
}
@SneakyThrows
private void loadTopicConfig(KafkaCluster kafkaCluster, String topicName) {
AdminClient adminClient = kafkaCluster.getAdminClient();
Set<ConfigResource> resources = Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName));
final Map<ConfigResource, Config> configs = adminClient.describeConfigs(resources).all().get();
if (configs.isEmpty()) return;
Collection<ConfigEntry> entries = configs.values().iterator().next().entries();
List<TopicConfig> topicConfigs = new ArrayList<>();
for (ConfigEntry entry : entries) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setName(entry.name());
topicConfig.setValue(entry.value());
if (topicConfig.getName().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
topicConfig.setDefaultValue(topicConfig.getValue());
} else {
topicConfig.setDefaultValue(TOPIC_DEFAULT_CONFIGS.get(entry.name()));
}
topicConfigs.add(topicConfig);
}
kafkaCluster.getTopicConfigsMap().put(topicName, topicConfigs);
public Mono<InternalTopic> createTopic(AdminClient adminClient, Mono<TopicFormData> topicFormData) {
return topicFormData.flatMap(
topicData -> {
NewTopic newTopic = new NewTopic(topicData.getName(), topicData.getPartitions(), topicData.getReplicationFactor().shortValue());
newTopic.configs(topicData.getConfigs());
return createTopic(adminClient, newTopic).map( v -> topicData);
}).flatMap(topicData -> {
var tdw = adminClient.describeTopics(Collections.singletonList(topicData.getName()));
return getTopicDescription(tdw.values().get(topicData.getName()), topicData.getName());
})
.switchIfEmpty(Mono.error(new RuntimeException("Can't find created topic")))
.map(ClusterUtil::mapToInternalTopic)
.flatMap( t ->
loadTopicsConfig(adminClient, Collections.singletonList(t.getName()))
.map( c -> mergeWithConfigs(Collections.singletonList(t), c))
.map( m -> m.values().iterator().next())
);
}
@SneakyThrows
private void createTopic(AdminClient adminClient, NewTopic newTopic) {
adminClient.createTopics(Collections.singletonList(newTopic))
.values()
.values()
.iterator()
.next()
.get();
private Mono<String> getClusterId(AdminClient adminClient) {
return ClusterUtil.toMono(adminClient.describeCluster().clusterId());
}
public Mono<AdminClient> getOrCreateAdminClient(KafkaCluster cluster) {
AdminClient adminClient = adminClientCache.computeIfAbsent(
cluster.getId(),
(id) -> createAdminClient(cluster)
);
return isAdminClientConnected(adminClient);
}
public AdminClient createAdminClient(KafkaCluster kafkaCluster) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
return AdminClient.create(properties);
}
private Mono<AdminClient> isAdminClientConnected(AdminClient adminClient) {
return getClusterId(adminClient).map( r -> adminClient);
}
private Mono<TopicDescription> getTopicDescription(KafkaFuture<TopicDescription> entry, String topicName) {
return ClusterUtil.toMono(entry)
.onErrorResume(e -> {
log.error("Can't get topic with name: " + topicName);
return Mono.empty();
});
}
@SneakyThrows
private Mono<Map<String, List<InternalTopicConfig>>> loadTopicsConfig(AdminClient adminClient, List<String> topicNames) {
List<ConfigResource> resources = topicNames.stream()
.map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
.collect(Collectors.toList());
return ClusterUtil.toMono(adminClient.describeConfigs(resources).all())
.map(configs ->
configs.entrySet().stream().map(
c -> Tuples.of(
c.getKey().name(),
c.getValue().entries().stream().map(ClusterUtil::mapToInternalTopicConfig).collect(Collectors.toList())
)
).collect(Collectors.toMap(
Tuple2::getT1,
Tuple2::getT2
))
);
}
public Mono<List<ConsumerGroup>> getConsumerGroups(KafkaCluster cluster) {
var adminClient = this.createAdminClient(cluster);
return ClusterUtil.toMono(adminClient.listConsumerGroups().all())
.flatMap(s -> ClusterUtil.toMono(adminClient
.describeConsumerGroups(s.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).all()))
.map(s -> s.values().stream()
.map(c -> ClusterUtil.convertToConsumerGroup(c, cluster)).collect(Collectors.toList()));
}
@SneakyThrows
private Mono<Void> createTopic(AdminClient adminClient, NewTopic newTopic) {
return ClusterUtil.toMono(adminClient.createTopics(Collections.singletonList(newTopic))
.values()
.values()
.iterator()
.next());
}
}

View file

@ -4,7 +4,7 @@ import com.provectus.kafka.ui.api.ApiClustersApi;
import com.provectus.kafka.ui.cluster.service.ClusterService;
import com.provectus.kafka.ui.model.*;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
@ -22,41 +22,60 @@ public class MetricsRestController implements ApiClustersApi {
@Override
public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchange) {
return clusterService.getClusters();
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters())));
}
@Override
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterId, ServerWebExchange exchange) {
return clusterService.getBrokersMetrics(clusterId);
return Mono.just(
clusterService.getBrokersMetrics(clusterId)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterId, ServerWebExchange exchange) {
return clusterService.getTopics(clusterId);
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterId))));
}
@Override
public Mono<ResponseEntity<TopicDetails>> getTopicDetails(String clusterId, String topicName, ServerWebExchange exchange) {
return clusterService.getTopicDetails(clusterId, topicName);
return Mono.just(
clusterService.getTopicDetails(clusterId, topicName)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<Flux<TopicConfig>>> getTopicConfigs(String clusterId, String topicName, ServerWebExchange exchange) {
return clusterService.getTopicConfigs(clusterId, topicName);
return Mono.just(
clusterService.getTopicConfigs(clusterId, topicName)
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build())
);
}
@Override
public Mono<ResponseEntity<Topic>> createTopic(String clusterId, @Valid Mono<TopicFormData> topicFormData, ServerWebExchange exchange) {
return clusterService.createTopic(clusterId, topicFormData);
return clusterService.createTopic(clusterId, topicFormData)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
@Override
public Mono<ResponseEntity<Flux<Broker>>> getBrokers(String clusterId, ServerWebExchange exchange) {
//TODO: ????
return Mono.just(ResponseEntity.ok(Flux.fromIterable(new ArrayList<>())));
}
@Override
public Mono<ResponseEntity<Flux<ConsumerGroup>>> getConsumerGroup(String clusterName, ServerWebExchange exchange) {
return clusterService.getConsumerGroup(clusterName);
return clusterService.getConsumerGroups(clusterName)
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); // TODO: check behaviour on cluster not found and empty groups list
}
}

View file

@ -1,10 +0,0 @@
package com.provectus.kafka.ui.zookeeper;
public final class ZooKeeperConstants {
private ZooKeeperConstants() {}
public static int ONLINE = 1;
public static int OFFLINE = 0;
}

View file

@ -1,60 +1,43 @@
package com.provectus.kafka.ui.zookeeper;
import com.provectus.kafka.ui.cluster.model.ClustersStorage;
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@RequiredArgsConstructor
@Log4j2
public class ZookeeperService {
@Async
public void checkZookeeperStatus(KafkaCluster kafkaCluster) {
log.debug("Start getting Zookeeper metrics for kafkaCluster: " + kafkaCluster.getName());
boolean isConnected = false;
if (kafkaCluster.getZkClient() != null) {
isConnected = isZkClientConnected(kafkaCluster);
}
if (kafkaCluster.getZkClient() == null || !isConnected) {
isConnected = createZookeeperConnection(kafkaCluster);
}
private final Map<String, ZkClient> cachedZkClient = new HashMap<>();
if (!isConnected) {
kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.OFFLINE);
return;
public boolean isZookeeperOnline(KafkaCluster kafkaCluster) {
var isConnected = false;
var zkClient = getOrCreateZkClient(kafkaCluster);
log.debug("Start getting Zookeeper metrics for kafkaCluster: {}", kafkaCluster.getName());
if (zkClient != null) {
isConnected = isZkClientConnected(zkClient);
}
kafkaCluster.getBrokersMetrics().setZooKeeperStatus(ZooKeeperConstants.ONLINE);
return isConnected;
}
private boolean createZookeeperConnection(KafkaCluster kafkaCluster) {
try {
kafkaCluster.setZkClient(new ZkClient(kafkaCluster.getZookeeper(), 1000));
return true;
} catch (Exception e) {
log.error(e);
kafkaCluster.setLastZookeeperException(e);
return false;
}
private boolean isZkClientConnected(ZkClient zkClient) {
zkClient.getChildren("/brokers/ids");
return true;
}
private boolean isZkClientConnected(KafkaCluster kafkaCluster) {
private ZkClient getOrCreateZkClient (KafkaCluster cluster) {
try {
kafkaCluster.getZkClient().getChildren("/brokers/ids");
return true;
return cachedZkClient.getOrDefault(cluster.getName(), new ZkClient(cluster.getZookeeper(), 1000));
} catch (Exception e) {
log.error(e);
kafkaCluster.setLastZookeeperException(e);
return false;
log.error("Error while creating zookeeper client for cluster {}", cluster.getName());
return null;
}
}
}

View file

@ -228,16 +228,12 @@ components:
BrokersMetrics:
type: object
properties:
brokerCount:
type: integer
zooKeeperStatus:
type: integer
activeControllers:
type: integer
uncleanLeaderElectionCount:
type: integer
onlinePartitionCount:
type: integer
underReplicatedPartitionCount:
type: integer
offlinePartitionCount: