AclsController.java 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.AclsApi;
  3. import com.provectus.kafka.ui.mapper.ClusterMapper;
  4. import com.provectus.kafka.ui.model.KafkaAclDTO;
  5. import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
  6. import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
  7. import com.provectus.kafka.ui.model.rbac.AccessContext;
  8. import com.provectus.kafka.ui.model.rbac.permission.AclAction;
  9. import com.provectus.kafka.ui.service.acl.AclsService;
  10. import com.provectus.kafka.ui.service.audit.AuditService;
  11. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  12. import java.util.Optional;
  13. import lombok.RequiredArgsConstructor;
  14. import org.apache.kafka.common.resource.PatternType;
  15. import org.apache.kafka.common.resource.ResourcePatternFilter;
  16. import org.apache.kafka.common.resource.ResourceType;
  17. import org.springframework.http.ResponseEntity;
  18. import org.springframework.web.bind.annotation.RestController;
  19. import org.springframework.web.server.ServerWebExchange;
  20. import reactor.core.publisher.Flux;
  21. import reactor.core.publisher.Mono;
  22. @RestController
  23. @RequiredArgsConstructor
  24. public class AclsController extends AbstractController implements AclsApi {
  25. private final AclsService aclsService;
  26. private final AccessControlService accessControlService;
  27. private final AuditService auditService;
  28. @Override
  29. public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
  30. ServerWebExchange exchange) {
  31. AccessContext context = AccessContext.builder()
  32. .cluster(clusterName)
  33. .aclActions(AclAction.EDIT)
  34. .operationName("createAcl")
  35. .build();
  36. return accessControlService.validateAccess(context)
  37. .then(kafkaAclDto)
  38. .map(ClusterMapper::toAclBinding)
  39. .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
  40. .doOnEach(sig -> auditService.audit(context, sig))
  41. .thenReturn(ResponseEntity.ok().build());
  42. }
  43. @Override
  44. public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
  45. ServerWebExchange exchange) {
  46. AccessContext context = AccessContext.builder()
  47. .cluster(clusterName)
  48. .aclActions(AclAction.EDIT)
  49. .operationName("deleteAcl")
  50. .build();
  51. return accessControlService.validateAccess(context)
  52. .then(kafkaAclDto)
  53. .map(ClusterMapper::toAclBinding)
  54. .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
  55. .doOnEach(sig -> auditService.audit(context, sig))
  56. .thenReturn(ResponseEntity.ok().build());
  57. }
  58. @Override
  59. public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
  60. KafkaAclResourceTypeDTO resourceTypeDto,
  61. String resourceName,
  62. KafkaAclNamePatternTypeDTO namePatternTypeDto,
  63. ServerWebExchange exchange) {
  64. AccessContext context = AccessContext.builder()
  65. .cluster(clusterName)
  66. .aclActions(AclAction.VIEW)
  67. .operationName("listAcls")
  68. .build();
  69. var resourceType = Optional.ofNullable(resourceTypeDto)
  70. .map(ClusterMapper::mapAclResourceTypeDto)
  71. .orElse(ResourceType.ANY);
  72. var namePatternType = Optional.ofNullable(namePatternTypeDto)
  73. .map(ClusterMapper::mapPatternTypeDto)
  74. .orElse(PatternType.ANY);
  75. var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
  76. return accessControlService.validateAccess(context).then(
  77. Mono.just(
  78. ResponseEntity.ok(
  79. aclsService.listAcls(getCluster(clusterName), filter)
  80. .map(ClusterMapper::toKafkaAclDto)))
  81. ).doOnEach(sig -> auditService.audit(context, sig));
  82. }
  83. @Override
  84. public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExchange exchange) {
  85. AccessContext context = AccessContext.builder()
  86. .cluster(clusterName)
  87. .aclActions(AclAction.VIEW)
  88. .operationName("getAclAsCsv")
  89. .build();
  90. return accessControlService.validateAccess(context).then(
  91. aclsService.getAclAsCsvString(getCluster(clusterName))
  92. .map(ResponseEntity::ok)
  93. .flatMap(Mono::just)
  94. .doOnEach(sig -> auditService.audit(context, sig))
  95. );
  96. }
  97. @Override
  98. public Mono<ResponseEntity<Void>> syncAclsCsv(String clusterName, Mono<String> csvMono, ServerWebExchange exchange) {
  99. AccessContext context = AccessContext.builder()
  100. .cluster(clusterName)
  101. .aclActions(AclAction.EDIT)
  102. .operationName("syncAclsCsv")
  103. .build();
  104. return accessControlService.validateAccess(context)
  105. .then(csvMono)
  106. .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
  107. .doOnEach(sig -> auditService.audit(context, sig))
  108. .thenReturn(ResponseEntity.ok().build());
  109. }
  110. }