ISSUE-1128: Add respect to cluster's order in ClustersStorage (#1130)

* add respect to cluster's order in ClustersStorage
* minor cleaning
This commit is contained in:
Ilya Kuramshin 2021-11-26 14:50:32 +03:00 committed by GitHub
parent 53a641cb58
commit d674808f3c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 24 deletions

View file

@ -1,6 +1,5 @@
package com.provectus.kafka.ui.service; package com.provectus.kafka.ui.service;
import java.util.Map;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -24,10 +23,9 @@ public class ClustersMetricsScheduler {
initialDelayString = "${kafka.update-metrics-rate-millis:30000}" initialDelayString = "${kafka.update-metrics-rate-millis:30000}"
) )
public void updateMetrics() { public void updateMetrics() {
Flux.fromIterable(clustersStorage.getKafkaClustersMap().entrySet()) Flux.fromIterable(clustersStorage.getKafkaClusters())
.parallel() .parallel()
.runOn(Schedulers.parallel()) .runOn(Schedulers.parallel())
.map(Map.Entry::getValue)
.flatMap(cluster -> { .flatMap(cluster -> {
log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName()); log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName());
return metricsService.updateCache(cluster) return metricsService.updateCache(cluster)

View file

@ -1,33 +1,22 @@
package com.provectus.kafka.ui.service; package com.provectus.kafka.ui.service;
import com.google.common.collect.ImmutableMap;
import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import java.util.Collection; import java.util.Collection;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.mapstruct.factory.Mappers;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
@RequiredArgsConstructor
public class ClustersStorage { public class ClustersStorage {
private final Map<String, KafkaCluster> kafkaClusters = new ConcurrentHashMap<>(); private final ImmutableMap<String, KafkaCluster> kafkaClusters;
private final ClustersProperties clusterProperties; public ClustersStorage(ClustersProperties properties, ClusterMapper mapper) {
var builder = ImmutableMap.<String, KafkaCluster>builder();
private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class); properties.getClusters().forEach(c -> builder.put(c.getName(), mapper.toKafkaCluster(c)));
this.kafkaClusters = builder.build();
@PostConstruct
public void init() {
for (ClustersProperties.Cluster clusterProperties : clusterProperties.getClusters()) {
KafkaCluster cluster = clusterMapper.toKafkaCluster(clusterProperties);
kafkaClusters.put(clusterProperties.getName(), cluster);
}
} }
public Collection<KafkaCluster> getKafkaClusters() { public Collection<KafkaCluster> getKafkaClusters() {
@ -37,8 +26,4 @@ public class ClustersStorage {
public Optional<KafkaCluster> getClusterByName(String clusterName) { public Optional<KafkaCluster> getClusterByName(String clusterName) {
return Optional.ofNullable(kafkaClusters.get(clusterName)); return Optional.ofNullable(kafkaClusters.get(clusterName));
} }
public Map<String, KafkaCluster> getKafkaClustersMap() {
return kafkaClusters;
}
} }