StatisticsService.java 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package com.provectus.kafka.ui.service;
  2. import static com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
  3. import com.provectus.kafka.ui.model.InternalLogDirStats;
  4. import com.provectus.kafka.ui.model.KafkaCluster;
  5. import com.provectus.kafka.ui.model.Metrics;
  6. import com.provectus.kafka.ui.model.ServerStatusDTO;
  7. import com.provectus.kafka.ui.model.Statistics;
  8. import com.provectus.kafka.ui.service.metrics.scrape.ScrapedClusterState;
  9. import java.util.Map;
  10. import java.util.stream.Collectors;
  11. import lombok.RequiredArgsConstructor;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.springframework.stereotype.Service;
  14. import reactor.core.publisher.Mono;
  15. @Service
  16. @RequiredArgsConstructor
  17. @Slf4j
  18. public class StatisticsService {
  19. private final AdminClientService adminClientService;
  20. private final FeatureService featureService;
  21. private final StatisticsCache cache;
  22. public Mono<Statistics> updateCache(KafkaCluster c) {
  23. return getStatistics(c).doOnSuccess(m -> cache.replace(c, m));
  24. }
  25. private Mono<Statistics> getStatistics(KafkaCluster cluster) {
  26. return adminClientService.get(cluster).flatMap(ac ->
  27. ac.describeCluster()
  28. .flatMap(description ->
  29. ac.updateInternalStats(description.getController())
  30. .then(
  31. Mono.zip(
  32. featureService.getAvailableFeatures(ac, cluster, description),
  33. loadClusterState(description, ac)
  34. ).flatMap(featuresAndState ->
  35. scrapeMetrics(cluster, featuresAndState.getT2(), description)
  36. .map(metrics ->
  37. Statistics.builder()
  38. .status(ServerStatusDTO.ONLINE)
  39. .clusterDescription(description)
  40. .version(ac.getVersion())
  41. .metrics(metrics)
  42. .features(featuresAndState.getT1())
  43. .clusterState(featuresAndState.getT2())
  44. //TODO: RM ->>>
  45. .topicDescriptions(
  46. featuresAndState.getT2().getTopicStates().entrySet().stream()
  47. .collect(Collectors.toMap(
  48. Map.Entry::getKey, e -> e.getValue().description())))
  49. .topicConfigs(
  50. featuresAndState.getT2().getTopicStates().entrySet().stream()
  51. .collect(Collectors.toMap(
  52. Map.Entry::getKey, e -> e.getValue().configs())))
  53. .logDirInfo(InternalLogDirStats.empty())
  54. .build())))))
  55. .doOnError(e ->
  56. log.error("Failed to collect cluster {} info", cluster.getName(), e))
  57. .onErrorResume(
  58. e -> Mono.just(Statistics.empty().toBuilder().lastKafkaException(e).build()));
  59. }
  60. private Mono<ScrapedClusterState> loadClusterState(ClusterDescription clusterDescription,
  61. ReactiveAdminClient ac) {
  62. return ScrapedClusterState.scrape(clusterDescription, ac);
  63. }
  64. private Mono<Metrics> scrapeMetrics(KafkaCluster c,
  65. ScrapedClusterState clusterState,
  66. ClusterDescription clusterDescription) {
  67. return c.getMetricsScrapping().scrape(clusterState, clusterDescription.getNodes());
  68. }
  69. }