ClustersController.java 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.ClustersApi;
  3. import com.provectus.kafka.ui.model.ClusterDTO;
  4. import com.provectus.kafka.ui.model.ClusterMetricsDTO;
  5. import com.provectus.kafka.ui.model.ClusterStatsDTO;
  6. import com.provectus.kafka.ui.model.rbac.AccessContext;
  7. import com.provectus.kafka.ui.service.ClusterService;
  8. import lombok.RequiredArgsConstructor;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.http.ResponseEntity;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import org.springframework.web.server.ServerWebExchange;
  13. import reactor.core.publisher.Flux;
  14. import reactor.core.publisher.Mono;
  15. @RestController
  16. @RequiredArgsConstructor
  17. @Slf4j
  18. public class ClustersController extends AbstractController implements ClustersApi {
  19. private final ClusterService clusterService;
  20. @Override
  21. public Mono<ResponseEntity<Flux<ClusterDTO>>> getClusters(ServerWebExchange exchange) {
  22. Flux<ClusterDTO> job = Flux.fromIterable(clusterService.getClusters())
  23. .filterWhen(accessControlService::isClusterAccessible);
  24. return Mono.just(ResponseEntity.ok(job));
  25. }
  26. @Override
  27. public Mono<ResponseEntity<ClusterMetricsDTO>> getClusterMetrics(String clusterName,
  28. ServerWebExchange exchange) {
  29. AccessContext context = AccessContext.builder()
  30. .cluster(clusterName)
  31. .operationName("getClusterMetrics")
  32. .build();
  33. return validateAccess(context)
  34. .then(
  35. clusterService.getClusterMetrics(getCluster(clusterName))
  36. .map(ResponseEntity::ok)
  37. .onErrorReturn(ResponseEntity.notFound().build())
  38. )
  39. .doOnEach(sig -> audit(context, sig));
  40. }
  41. @Override
  42. public Mono<ResponseEntity<ClusterStatsDTO>> getClusterStats(String clusterName,
  43. ServerWebExchange exchange) {
  44. AccessContext context = AccessContext.builder()
  45. .cluster(clusterName)
  46. .operationName("getClusterStats")
  47. .build();
  48. return validateAccess(context)
  49. .then(
  50. clusterService.getClusterStats(getCluster(clusterName))
  51. .map(ResponseEntity::ok)
  52. .onErrorReturn(ResponseEntity.notFound().build())
  53. )
  54. .doOnEach(sig -> audit(context, sig));
  55. }
  56. @Override
  57. public Mono<ResponseEntity<ClusterDTO>> updateClusterInfo(String clusterName,
  58. ServerWebExchange exchange) {
  59. AccessContext context = AccessContext.builder()
  60. .cluster(clusterName)
  61. .operationName("updateClusterInfo")
  62. .build();
  63. return validateAccess(context)
  64. .then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok))
  65. .doOnEach(sig -> audit(context, sig));
  66. }
  67. }