1234567891011121314151617181920212223242526272829303132 |
- package com.provectus.kafka.ui.cluster;
- import com.provectus.kafka.ui.cluster.model.ClustersStorage;
- import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.log4j.Log4j2;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import reactor.core.publisher.Flux;
- import reactor.core.scheduler.Schedulers;
- import java.util.Map;
- @Component
- @RequiredArgsConstructor
- @Log4j2
- public class ClustersMetricsScheduler {
- private final ClustersStorage clustersStorage;
- private final MetricsUpdateService metricsUpdateService;
- @Scheduled(fixedRate = 30000)
- public void updateMetrics() {
- Flux.fromIterable(clustersStorage.getKafkaClustersMap().entrySet())
- .subscribeOn(Schedulers.parallel())
- .map(Map.Entry::getValue)
- .flatMap(metricsUpdateService::updateMetrics)
- .doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s))
- .subscribe();
- }
- }
|