ClustersMetricsScheduler.java 1.0 KB

1234567891011121314151617181920212223242526272829303132
  1. package com.provectus.kafka.ui.cluster;
  2. import com.provectus.kafka.ui.cluster.model.ClustersStorage;
  3. import com.provectus.kafka.ui.cluster.service.MetricsUpdateService;
  4. import lombok.RequiredArgsConstructor;
  5. import lombok.extern.log4j.Log4j2;
  6. import org.springframework.scheduling.annotation.Scheduled;
  7. import org.springframework.stereotype.Component;
  8. import reactor.core.publisher.Flux;
  9. import reactor.core.scheduler.Schedulers;
  10. import java.util.Map;
  11. @Component
  12. @RequiredArgsConstructor
  13. @Log4j2
  14. public class ClustersMetricsScheduler {
  15. private final ClustersStorage clustersStorage;
  16. private final MetricsUpdateService metricsUpdateService;
  17. @Scheduled(fixedRate = 30000)
  18. public void updateMetrics() {
  19. Flux.fromIterable(clustersStorage.getKafkaClustersMap().entrySet())
  20. .subscribeOn(Schedulers.parallel())
  21. .map(Map.Entry::getValue)
  22. .flatMap(metricsUpdateService::updateMetrics)
  23. .doOnNext(s -> clustersStorage.setKafkaCluster(s.getId(), s))
  24. .subscribe();
  25. }
  26. }