1234567891011121314151617181920212223242526272829303132 |
- package com.provectus.kafka.ui.service;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import reactor.core.publisher.Flux;
- import reactor.core.scheduler.Schedulers;
- @Component
- @RequiredArgsConstructor
- @Slf4j
- public class ClustersStatisticsScheduler {
- private final ClustersStorage clustersStorage;
- private final StatisticsService statisticsService;
- @Scheduled(fixedRateString = "${kafka.update-metrics-rate-millis:30000}")
- public void updateStatistics() {
- Flux.fromIterable(clustersStorage.getKafkaClusters())
- .parallel()
- .runOn(Schedulers.parallel())
- .flatMap(cluster -> {
- log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName());
- return statisticsService.updateCache(cluster)
- .doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName()));
- })
- .then()
- .block();
- }
- }
|