StatisticsService.java 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package com.provectus.kafka.ui.service;
  2. import static com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
  3. import com.provectus.kafka.ui.model.ClusterFeature;
  4. import com.provectus.kafka.ui.model.InternalLogDirStats;
  5. import com.provectus.kafka.ui.model.KafkaCluster;
  6. import com.provectus.kafka.ui.model.Metrics;
  7. import com.provectus.kafka.ui.model.ServerStatusDTO;
  8. import com.provectus.kafka.ui.model.Statistics;
  9. import com.provectus.kafka.ui.service.metrics.MetricsCollector;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.stream.Collectors;
  13. import lombok.RequiredArgsConstructor;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.apache.kafka.clients.admin.ConfigEntry;
  16. import org.apache.kafka.clients.admin.TopicDescription;
  17. import org.apache.kafka.common.Node;
  18. import org.springframework.stereotype.Service;
  19. import reactor.core.publisher.Mono;
  20. @Service
  21. @RequiredArgsConstructor
  22. @Slf4j
  23. public class StatisticsService {
  24. private final MetricsCollector metricsCollector;
  25. private final AdminClientService adminClientService;
  26. private final FeatureService featureService;
  27. private final StatisticsCache cache;
  28. public Mono<Statistics> updateCache(KafkaCluster c) {
  29. return getStatistics(c).doOnSuccess(m -> cache.replace(c, m));
  30. }
  31. private Mono<Statistics> getStatistics(KafkaCluster cluster) {
  32. return adminClientService.get(cluster).flatMap(ac ->
  33. ac.describeCluster().flatMap(description ->
  34. ac.updateInternalStats(description.getController()).then(
  35. Mono.zip(
  36. List.of(
  37. metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
  38. getLogDirInfo(description, ac),
  39. featureService.getAvailableFeatures(ac, cluster, description),
  40. loadTopicConfigs(cluster),
  41. describeTopics(cluster)),
  42. results ->
  43. Statistics.builder()
  44. .status(ServerStatusDTO.ONLINE)
  45. .clusterDescription(description)
  46. .version(ac.getVersion())
  47. .metrics((Metrics) results[0])
  48. .logDirInfo((InternalLogDirStats) results[1])
  49. .features((List<ClusterFeature>) results[2])
  50. .topicConfigs((Map<String, List<ConfigEntry>>) results[3])
  51. .topicDescriptions((Map<String, TopicDescription>) results[4])
  52. .build()
  53. ))))
  54. .doOnError(e ->
  55. log.error("Failed to collect cluster {} info", cluster.getName(), e))
  56. .onErrorResume(
  57. e -> Mono.just(Statistics.empty().toBuilder().lastKafkaException(e).build()));
  58. }
  59. private Mono<InternalLogDirStats> getLogDirInfo(ClusterDescription desc, ReactiveAdminClient ac) {
  60. var brokerIds = desc.getNodes().stream().map(Node::id).collect(Collectors.toSet());
  61. return ac.describeLogDirs(brokerIds).map(InternalLogDirStats::new);
  62. }
  63. private Mono<Map<String, TopicDescription>> describeTopics(KafkaCluster c) {
  64. return adminClientService.get(c).flatMap(ReactiveAdminClient::describeTopics);
  65. }
  66. private Mono<Map<String, List<ConfigEntry>>> loadTopicConfigs(KafkaCluster c) {
  67. return adminClientService.get(c).flatMap(ReactiveAdminClient::getTopicsConfig);
  68. }
  69. }