ClusterService.java 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.mapper.ClusterMapper;
  3. import com.provectus.kafka.ui.model.ClusterDTO;
  4. import com.provectus.kafka.ui.model.ClusterMetricsDTO;
  5. import com.provectus.kafka.ui.model.ClusterStatsDTO;
  6. import com.provectus.kafka.ui.model.InternalClusterState;
  7. import com.provectus.kafka.ui.model.KafkaCluster;
  8. import java.util.List;
  9. import java.util.stream.Collectors;
  10. import lombok.RequiredArgsConstructor;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springframework.stereotype.Service;
  13. import reactor.core.publisher.Mono;
  14. @Service
  15. @RequiredArgsConstructor
  16. @Slf4j
  17. public class ClusterService {
  18. private final MetricsCache metricsCache;
  19. private final ClustersStorage clustersStorage;
  20. private final ClusterMapper clusterMapper;
  21. private final MetricsService metricsService;
  22. public List<ClusterDTO> getClusters() {
  23. return clustersStorage.getKafkaClusters()
  24. .stream()
  25. .map(c -> clusterMapper.toCluster(new InternalClusterState(c, metricsCache.get(c))))
  26. .collect(Collectors.toList());
  27. }
  28. public Mono<ClusterStatsDTO> getClusterStats(KafkaCluster cluster) {
  29. return Mono.justOrEmpty(
  30. clusterMapper.toClusterStats(
  31. new InternalClusterState(cluster, metricsCache.get(cluster)))
  32. );
  33. }
  34. public Mono<ClusterMetricsDTO> getClusterMetrics(KafkaCluster cluster) {
  35. return Mono.just(
  36. clusterMapper.toClusterMetrics(
  37. metricsCache.get(cluster).getJmxMetrics()));
  38. }
  39. public Mono<ClusterDTO> updateCluster(KafkaCluster cluster) {
  40. return metricsService.updateCache(cluster)
  41. .map(metrics -> clusterMapper.toCluster(new InternalClusterState(cluster, metrics)));
  42. }
  43. }