ApplicationConfigController.java 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction.EDIT;
  3. import static com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction.VIEW;
  4. import com.provectus.kafka.ui.api.ApplicationConfigApi;
  5. import com.provectus.kafka.ui.config.ClustersProperties;
  6. import com.provectus.kafka.ui.model.ApplicationConfigDTO;
  7. import com.provectus.kafka.ui.model.ApplicationConfigPropertiesDTO;
  8. import com.provectus.kafka.ui.model.ApplicationConfigValidationDTO;
  9. import com.provectus.kafka.ui.model.ApplicationInfoDTO;
  10. import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
  11. import com.provectus.kafka.ui.model.RestartRequestDTO;
  12. import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
  13. import com.provectus.kafka.ui.model.rbac.AccessContext;
  14. import com.provectus.kafka.ui.service.ApplicationInfoService;
  15. import com.provectus.kafka.ui.service.KafkaClusterFactory;
  16. import com.provectus.kafka.ui.service.audit.AuditService;
  17. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  18. import com.provectus.kafka.ui.util.ApplicationRestarter;
  19. import com.provectus.kafka.ui.util.DynamicConfigOperations;
  20. import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
  21. import java.util.Map;
  22. import javax.annotation.Nullable;
  23. import lombok.RequiredArgsConstructor;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.mapstruct.Mapper;
  26. import org.mapstruct.factory.Mappers;
  27. import org.springframework.http.ResponseEntity;
  28. import org.springframework.http.codec.multipart.FilePart;
  29. import org.springframework.http.codec.multipart.Part;
  30. import org.springframework.web.bind.annotation.RestController;
  31. import org.springframework.web.server.ServerWebExchange;
  32. import reactor.core.publisher.Flux;
  33. import reactor.core.publisher.Mono;
  34. import reactor.util.function.Tuple2;
  35. import reactor.util.function.Tuples;
  36. @Slf4j
  37. @RestController
  38. @RequiredArgsConstructor
  39. public class ApplicationConfigController implements ApplicationConfigApi {
  40. private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
  41. @Mapper
  42. interface PropertiesMapper {
  43. PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto);
  44. ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
  45. }
  46. private final AccessControlService accessControlService;
  47. private final DynamicConfigOperations dynamicConfigOperations;
  48. private final ApplicationRestarter restarter;
  49. private final KafkaClusterFactory kafkaClusterFactory;
  50. private final ApplicationInfoService applicationInfoService;
  51. private final AuditService auditService;
  52. @Override
  53. public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
  54. return Mono.just(applicationInfoService.getApplicationInfo()).map(ResponseEntity::ok);
  55. }
  56. @Override
  57. public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExchange exchange) {
  58. var context = AccessContext.builder()
  59. .applicationConfigActions(VIEW)
  60. .operationName("getCurrentConfig")
  61. .build();
  62. return accessControlService.validateAccess(context)
  63. .then(Mono.fromSupplier(() -> ResponseEntity.ok(
  64. new ApplicationConfigDTO()
  65. .properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
  66. )))
  67. .doOnEach(sig -> auditService.audit(context, sig));
  68. }
  69. @Override
  70. public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
  71. ServerWebExchange exchange) {
  72. var context = AccessContext.builder()
  73. .applicationConfigActions(EDIT)
  74. .operationName("restartWithConfig")
  75. .build();
  76. return accessControlService.validateAccess(context)
  77. .then(restartRequestDto)
  78. .<ResponseEntity<Void>>map(dto -> {
  79. dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
  80. restarter.requestRestart();
  81. return ResponseEntity.ok().build();
  82. })
  83. .doOnEach(sig -> auditService.audit(context, sig));
  84. }
  85. @Override
  86. public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
  87. ServerWebExchange exchange) {
  88. var context = AccessContext.builder()
  89. .applicationConfigActions(EDIT)
  90. .operationName("uploadConfigRelatedFile")
  91. .build();
  92. return accessControlService.validateAccess(context)
  93. .then(fileFlux.single())
  94. .flatMap(file ->
  95. dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
  96. .map(path -> new UploadedFileInfoDTO().location(path.toString()))
  97. .map(ResponseEntity::ok))
  98. .doOnEach(sig -> auditService.audit(context, sig));
  99. }
  100. @Override
  101. public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
  102. ServerWebExchange exchange) {
  103. var context = AccessContext.builder()
  104. .applicationConfigActions(EDIT)
  105. .operationName("validateConfig")
  106. .build();
  107. return accessControlService.validateAccess(context)
  108. .then(configDto)
  109. .flatMap(config -> {
  110. PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
  111. ClustersProperties clustersProperties = propertiesStructure.getKafka();
  112. return validateClustersConfig(clustersProperties)
  113. .map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
  114. })
  115. .map(ResponseEntity::ok)
  116. .doOnEach(sig -> auditService.audit(context, sig));
  117. }
  118. private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
  119. @Nullable ClustersProperties properties) {
  120. if (properties == null || properties.getClusters() == null) {
  121. return Mono.just(Map.of());
  122. }
  123. properties.validateAndSetDefaults();
  124. return Flux.fromIterable(properties.getClusters())
  125. .flatMap(c -> kafkaClusterFactory.validate(c).map(v -> Tuples.of(c.getName(), v)))
  126. .collectMap(Tuple2::getT1, Tuple2::getT2);
  127. }
  128. }