BrokersController.java 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.BrokersApi;
  3. import com.provectus.kafka.ui.mapper.ClusterMapper;
  4. import com.provectus.kafka.ui.model.BrokerConfigDTO;
  5. import com.provectus.kafka.ui.model.BrokerConfigItemDTO;
  6. import com.provectus.kafka.ui.model.BrokerDTO;
  7. import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
  8. import com.provectus.kafka.ui.model.BrokerMetricsDTO;
  9. import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
  10. import com.provectus.kafka.ui.model.rbac.AccessContext;
  11. import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
  12. import com.provectus.kafka.ui.service.BrokerService;
  13. import com.provectus.kafka.ui.service.audit.AuditService;
  14. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  15. import java.util.List;
  16. import java.util.Map;
  17. import javax.annotation.Nullable;
  18. import lombok.RequiredArgsConstructor;
  19. import lombok.extern.slf4j.Slf4j;
  20. import org.springframework.http.ResponseEntity;
  21. import org.springframework.web.bind.annotation.RestController;
  22. import org.springframework.web.server.ServerWebExchange;
  23. import reactor.core.publisher.Flux;
  24. import reactor.core.publisher.Mono;
  25. @RestController
  26. @RequiredArgsConstructor
  27. @Slf4j
  28. public class BrokersController extends AbstractController implements BrokersApi {
  29. private final BrokerService brokerService;
  30. private final ClusterMapper clusterMapper;
  31. private final AuditService auditService;
  32. private final AccessControlService accessControlService;
  33. @Override
  34. public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
  35. ServerWebExchange exchange) {
  36. var context = AccessContext.builder()
  37. .cluster(clusterName)
  38. .operationName("getBrokers")
  39. .build();
  40. var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
  41. return accessControlService.validateAccess(context)
  42. .thenReturn(ResponseEntity.ok(job))
  43. .doOnEach(sig -> auditService.audit(context, sig));
  44. }
  45. @Override
  46. public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
  47. ServerWebExchange exchange) {
  48. var context = AccessContext.builder()
  49. .cluster(clusterName)
  50. .operationName("getBrokersMetrics")
  51. .operationParams(Map.of("id", id))
  52. .build();
  53. return accessControlService.validateAccess(context)
  54. .then(
  55. brokerService.getBrokerMetrics(getCluster(clusterName), id)
  56. .map(clusterMapper::toBrokerMetrics)
  57. .map(ResponseEntity::ok)
  58. .onErrorReturn(ResponseEntity.notFound().build())
  59. )
  60. .doOnEach(sig -> auditService.audit(context, sig));
  61. }
  62. @Override
  63. public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
  64. @Nullable List<Integer> brokers,
  65. ServerWebExchange exchange) {
  66. List<Integer> brokerIds = brokers == null ? List.of() : brokers;
  67. var context = AccessContext.builder()
  68. .cluster(clusterName)
  69. .operationName("getAllBrokersLogdirs")
  70. .operationParams(Map.of("brokerIds", brokerIds))
  71. .build();
  72. return accessControlService.validateAccess(context)
  73. .thenReturn(ResponseEntity.ok(
  74. brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
  75. .doOnEach(sig -> auditService.audit(context, sig));
  76. }
  77. @Override
  78. public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName,
  79. Integer id,
  80. ServerWebExchange exchange) {
  81. var context = AccessContext.builder()
  82. .cluster(clusterName)
  83. .clusterConfigActions(ClusterConfigAction.VIEW)
  84. .operationName("getBrokerConfig")
  85. .operationParams(Map.of("brokerId", id))
  86. .build();
  87. return accessControlService.validateAccess(context).thenReturn(
  88. ResponseEntity.ok(
  89. brokerService.getBrokerConfig(getCluster(clusterName), id)
  90. .map(clusterMapper::toBrokerConfig))
  91. ).doOnEach(sig -> auditService.audit(context, sig));
  92. }
  93. @Override
  94. public Mono<ResponseEntity<Void>> updateBrokerTopicPartitionLogDir(String clusterName,
  95. Integer id,
  96. Mono<BrokerLogdirUpdateDTO> brokerLogdir,
  97. ServerWebExchange exchange) {
  98. var context = AccessContext.builder()
  99. .cluster(clusterName)
  100. .clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
  101. .operationName("updateBrokerTopicPartitionLogDir")
  102. .operationParams(Map.of("brokerId", id))
  103. .build();
  104. return accessControlService.validateAccess(context).then(
  105. brokerLogdir
  106. .flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
  107. .map(ResponseEntity::ok)
  108. ).doOnEach(sig -> auditService.audit(context, sig));
  109. }
  110. @Override
  111. public Mono<ResponseEntity<Void>> updateBrokerConfigByName(String clusterName,
  112. Integer id,
  113. String name,
  114. Mono<BrokerConfigItemDTO> brokerConfig,
  115. ServerWebExchange exchange) {
  116. var context = AccessContext.builder()
  117. .cluster(clusterName)
  118. .clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
  119. .operationName("updateBrokerConfigByName")
  120. .operationParams(Map.of("brokerId", id))
  121. .build();
  122. return accessControlService.validateAccess(context).then(
  123. brokerConfig
  124. .flatMap(bci -> brokerService.updateBrokerConfigByName(
  125. getCluster(clusterName), id, name, bci.getValue()))
  126. .map(ResponseEntity::ok)
  127. ).doOnEach(sig -> auditService.audit(context, sig));
  128. }
  129. }