BrokersController.java 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.BrokersApi;
  3. import com.provectus.kafka.ui.mapper.ClusterMapper;
  4. import com.provectus.kafka.ui.model.BrokerConfigDTO;
  5. import com.provectus.kafka.ui.model.BrokerConfigItemDTO;
  6. import com.provectus.kafka.ui.model.BrokerDTO;
  7. import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
  8. import com.provectus.kafka.ui.model.BrokerMetricsDTO;
  9. import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
  10. import com.provectus.kafka.ui.model.rbac.AccessContext;
  11. import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
  12. import com.provectus.kafka.ui.service.BrokerService;
  13. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  14. import java.util.List;
  15. import lombok.RequiredArgsConstructor;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.springframework.http.ResponseEntity;
  18. import org.springframework.web.bind.annotation.RestController;
  19. import org.springframework.web.server.ServerWebExchange;
  20. import reactor.core.publisher.Flux;
  21. import reactor.core.publisher.Mono;
  22. @RestController
  23. @RequiredArgsConstructor
  24. @Slf4j
  25. public class BrokersController extends AbstractController implements BrokersApi {
  26. private final BrokerService brokerService;
  27. private final ClusterMapper clusterMapper;
  28. private final AccessControlService accessControlService;
  29. @Override
  30. public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
  31. ServerWebExchange exchange) {
  32. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  33. .cluster(clusterName)
  34. .build());
  35. var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
  36. return validateAccess.thenReturn(ResponseEntity.ok(job));
  37. }
  38. @Override
  39. public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
  40. ServerWebExchange exchange) {
  41. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  42. .cluster(clusterName)
  43. .build());
  44. return validateAccess.then(
  45. brokerService.getBrokerMetrics(getCluster(clusterName), id)
  46. .map(clusterMapper::toBrokerMetrics)
  47. .map(ResponseEntity::ok)
  48. .onErrorReturn(ResponseEntity.notFound().build())
  49. );
  50. }
  51. @Override
  52. public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
  53. List<Integer> brokers,
  54. ServerWebExchange exchange) {
  55. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  56. .cluster(clusterName)
  57. .build());
  58. return validateAccess.thenReturn(ResponseEntity.ok(
  59. brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers)));
  60. }
  61. @Override
  62. public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName,
  63. Integer id,
  64. ServerWebExchange exchange) {
  65. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  66. .cluster(clusterName)
  67. .clusterConfigActions(ClusterConfigAction.VIEW)
  68. .build());
  69. return validateAccess.thenReturn(
  70. ResponseEntity.ok(
  71. brokerService.getBrokerConfig(getCluster(clusterName), id)
  72. .map(clusterMapper::toBrokerConfig))
  73. );
  74. }
  75. @Override
  76. public Mono<ResponseEntity<Void>> updateBrokerTopicPartitionLogDir(String clusterName,
  77. Integer id,
  78. Mono<BrokerLogdirUpdateDTO> brokerLogdir,
  79. ServerWebExchange exchange) {
  80. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  81. .cluster(clusterName)
  82. .clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
  83. .build());
  84. return validateAccess.then(
  85. brokerLogdir
  86. .flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
  87. .map(ResponseEntity::ok)
  88. );
  89. }
  90. @Override
  91. public Mono<ResponseEntity<Void>> updateBrokerConfigByName(String clusterName,
  92. Integer id,
  93. String name,
  94. Mono<BrokerConfigItemDTO> brokerConfig,
  95. ServerWebExchange exchange) {
  96. Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
  97. .cluster(clusterName)
  98. .clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
  99. .build());
  100. return validateAccess.then(
  101. brokerConfig
  102. .flatMap(bci -> brokerService.updateBrokerConfigByName(
  103. getCluster(clusterName), id, name, bci.getValue()))
  104. .map(ResponseEntity::ok)
  105. );
  106. }
  107. }