AclsController.java 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.AclsApi;
  3. import com.provectus.kafka.ui.mapper.ClusterMapper;
  4. import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
  5. import com.provectus.kafka.ui.model.CreateProducerAclDTO;
  6. import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
  7. import com.provectus.kafka.ui.model.KafkaAclDTO;
  8. import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
  9. import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
  10. import com.provectus.kafka.ui.model.rbac.AccessContext;
  11. import com.provectus.kafka.ui.model.rbac.permission.AclAction;
  12. import com.provectus.kafka.ui.service.acl.AclsService;
  13. import java.util.Optional;
  14. import lombok.RequiredArgsConstructor;
  15. import org.apache.kafka.common.resource.PatternType;
  16. import org.apache.kafka.common.resource.ResourcePatternFilter;
  17. import org.apache.kafka.common.resource.ResourceType;
  18. import org.springframework.http.ResponseEntity;
  19. import org.springframework.web.bind.annotation.RestController;
  20. import org.springframework.web.server.ServerWebExchange;
  21. import reactor.core.publisher.Flux;
  22. import reactor.core.publisher.Mono;
  23. @RestController
  24. @RequiredArgsConstructor
  25. public class AclsController extends AbstractController implements AclsApi {
  26. private final AclsService aclsService;
  27. @Override
  28. public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
  29. ServerWebExchange exchange) {
  30. AccessContext context = AccessContext.builder()
  31. .cluster(clusterName)
  32. .aclActions(AclAction.EDIT)
  33. .operationName("createAcl")
  34. .build();
  35. return validateAccess(context)
  36. .then(kafkaAclDto)
  37. .map(ClusterMapper::toAclBinding)
  38. .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
  39. .doOnEach(sig -> audit(context, sig))
  40. .thenReturn(ResponseEntity.ok().build());
  41. }
  42. @Override
  43. public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
  44. ServerWebExchange exchange) {
  45. AccessContext context = AccessContext.builder()
  46. .cluster(clusterName)
  47. .aclActions(AclAction.EDIT)
  48. .operationName("deleteAcl")
  49. .build();
  50. return validateAccess(context)
  51. .then(kafkaAclDto)
  52. .map(ClusterMapper::toAclBinding)
  53. .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
  54. .doOnEach(sig -> audit(context, sig))
  55. .thenReturn(ResponseEntity.ok().build());
  56. }
  57. @Override
  58. public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
  59. KafkaAclResourceTypeDTO resourceTypeDto,
  60. String resourceName,
  61. KafkaAclNamePatternTypeDTO namePatternTypeDto,
  62. ServerWebExchange exchange) {
  63. AccessContext context = AccessContext.builder()
  64. .cluster(clusterName)
  65. .aclActions(AclAction.VIEW)
  66. .operationName("listAcls")
  67. .build();
  68. var resourceType = Optional.ofNullable(resourceTypeDto)
  69. .map(ClusterMapper::mapAclResourceTypeDto)
  70. .orElse(ResourceType.ANY);
  71. var namePatternType = Optional.ofNullable(namePatternTypeDto)
  72. .map(ClusterMapper::mapPatternTypeDto)
  73. .orElse(PatternType.ANY);
  74. var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
  75. return validateAccess(context).then(
  76. Mono.just(
  77. ResponseEntity.ok(
  78. aclsService.listAcls(getCluster(clusterName), filter)
  79. .map(ClusterMapper::toKafkaAclDto)))
  80. ).doOnEach(sig -> audit(context, sig));
  81. }
  82. @Override
  83. public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExchange exchange) {
  84. AccessContext context = AccessContext.builder()
  85. .cluster(clusterName)
  86. .aclActions(AclAction.VIEW)
  87. .operationName("getAclAsCsv")
  88. .build();
  89. return validateAccess(context).then(
  90. aclsService.getAclAsCsvString(getCluster(clusterName))
  91. .map(ResponseEntity::ok)
  92. .flatMap(Mono::just)
  93. .doOnEach(sig -> audit(context, sig))
  94. );
  95. }
  96. @Override
  97. public Mono<ResponseEntity<Void>> syncAclsCsv(String clusterName, Mono<String> csvMono, ServerWebExchange exchange) {
  98. AccessContext context = AccessContext.builder()
  99. .cluster(clusterName)
  100. .aclActions(AclAction.EDIT)
  101. .operationName("syncAclsCsv")
  102. .build();
  103. return validateAccess(context)
  104. .then(csvMono)
  105. .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
  106. .doOnEach(sig -> audit(context, sig))
  107. .thenReturn(ResponseEntity.ok().build());
  108. }
  109. @Override
  110. public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
  111. Mono<CreateConsumerAclDTO> createConsumerAclDto,
  112. ServerWebExchange exchange) {
  113. AccessContext context = AccessContext.builder()
  114. .cluster(clusterName)
  115. .aclActions(AclAction.EDIT)
  116. .operationName("createConsumerAcl")
  117. .build();
  118. return validateAccess(context)
  119. .then(createConsumerAclDto)
  120. .flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
  121. .doOnEach(sig -> audit(context, sig))
  122. .thenReturn(ResponseEntity.ok().build());
  123. }
  124. @Override
  125. public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
  126. Mono<CreateProducerAclDTO> createProducerAclDto,
  127. ServerWebExchange exchange) {
  128. AccessContext context = AccessContext.builder()
  129. .cluster(clusterName)
  130. .aclActions(AclAction.EDIT)
  131. .operationName("createProducerAcl")
  132. .build();
  133. return validateAccess(context)
  134. .then(createProducerAclDto)
  135. .flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
  136. .doOnEach(sig -> audit(context, sig))
  137. .thenReturn(ResponseEntity.ok().build());
  138. }
  139. @Override
  140. public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
  141. Mono<CreateStreamAppAclDTO> createStreamAppAclDto,
  142. ServerWebExchange exchange) {
  143. AccessContext context = AccessContext.builder()
  144. .cluster(clusterName)
  145. .aclActions(AclAction.EDIT)
  146. .operationName("createStreamAppAcl")
  147. .build();
  148. return validateAccess(context)
  149. .then(createStreamAppAclDto)
  150. .flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
  151. .doOnEach(sig -> audit(context, sig))
  152. .thenReturn(ResponseEntity.ok().build());
  153. }
  154. }