ClustersStatisticsScheduler.java 1.0 KB

1234567891011121314151617181920212223242526272829303132
  1. package com.provectus.kafka.ui.service;
  2. import lombok.RequiredArgsConstructor;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.scheduling.annotation.Scheduled;
  5. import org.springframework.stereotype.Component;
  6. import reactor.core.publisher.Flux;
  7. import reactor.core.scheduler.Schedulers;
  8. @Component
  9. @RequiredArgsConstructor
  10. @Slf4j
  11. public class ClustersStatisticsScheduler {
  12. private final ClustersStorage clustersStorage;
  13. private final StatisticsService statisticsService;
  14. @Scheduled(fixedRateString = "${kafka.update-metrics-rate-millis:30000}")
  15. public void updateStatistics() {
  16. Flux.fromIterable(clustersStorage.getKafkaClusters())
  17. .parallel()
  18. .runOn(Schedulers.parallel())
  19. .flatMap(cluster -> {
  20. log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName());
  21. return statisticsService.updateCache(cluster)
  22. .doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName()));
  23. })
  24. .then()
  25. .block();
  26. }
  27. }