ApplicationConfigController.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction.EDIT;
  3. import static com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction.VIEW;
  4. import com.provectus.kafka.ui.api.ApplicationConfigApi;
  5. import com.provectus.kafka.ui.config.ClustersProperties;
  6. import com.provectus.kafka.ui.model.ApplicationConfigDTO;
  7. import com.provectus.kafka.ui.model.ApplicationConfigPropertiesDTO;
  8. import com.provectus.kafka.ui.model.ApplicationConfigValidationDTO;
  9. import com.provectus.kafka.ui.model.ApplicationInfoDTO;
  10. import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
  11. import com.provectus.kafka.ui.model.RestartRequestDTO;
  12. import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
  13. import com.provectus.kafka.ui.model.rbac.AccessContext;
  14. import com.provectus.kafka.ui.service.ApplicationInfoService;
  15. import com.provectus.kafka.ui.service.KafkaClusterFactory;
  16. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  17. import com.provectus.kafka.ui.util.ApplicationRestarter;
  18. import com.provectus.kafka.ui.util.DynamicConfigOperations;
  19. import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
  20. import java.util.Map;
  21. import javax.annotation.Nullable;
  22. import lombok.RequiredArgsConstructor;
  23. import lombok.extern.slf4j.Slf4j;
  24. import org.mapstruct.Mapper;
  25. import org.mapstruct.factory.Mappers;
  26. import org.springframework.http.ResponseEntity;
  27. import org.springframework.http.codec.multipart.FilePart;
  28. import org.springframework.http.codec.multipart.Part;
  29. import org.springframework.web.bind.annotation.RestController;
  30. import org.springframework.web.server.ServerWebExchange;
  31. import reactor.core.publisher.Flux;
  32. import reactor.core.publisher.Mono;
  33. import reactor.util.function.Tuple2;
  34. import reactor.util.function.Tuples;
  35. @Slf4j
  36. @RestController
  37. @RequiredArgsConstructor
  38. public class ApplicationConfigController implements ApplicationConfigApi {
  39. private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
  40. @Mapper
  41. interface PropertiesMapper {
  42. PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto);
  43. ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
  44. }
  45. private final AccessControlService accessControlService;
  46. private final DynamicConfigOperations dynamicConfigOperations;
  47. private final ApplicationRestarter restarter;
  48. private final KafkaClusterFactory kafkaClusterFactory;
  49. private final ApplicationInfoService applicationInfoService;
  50. @Override
  51. public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
  52. return Mono.just(applicationInfoService.getApplicationInfo()).map(ResponseEntity::ok);
  53. }
  54. @Override
  55. public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExchange exchange) {
  56. return accessControlService
  57. .validateAccess(
  58. AccessContext.builder()
  59. .applicationConfigActions(VIEW)
  60. .build()
  61. )
  62. .then(Mono.fromSupplier(() -> ResponseEntity.ok(
  63. new ApplicationConfigDTO()
  64. .properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
  65. )));
  66. }
  67. @Override
  68. public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
  69. ServerWebExchange exchange) {
  70. return accessControlService
  71. .validateAccess(
  72. AccessContext.builder()
  73. .applicationConfigActions(EDIT)
  74. .build()
  75. )
  76. .then(restartRequestDto)
  77. .map(dto -> {
  78. dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
  79. restarter.requestRestart();
  80. return ResponseEntity.ok().build();
  81. });
  82. }
  83. @Override
  84. public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
  85. ServerWebExchange exchange) {
  86. return accessControlService
  87. .validateAccess(
  88. AccessContext.builder()
  89. .applicationConfigActions(EDIT)
  90. .build()
  91. )
  92. .then(fileFlux.single())
  93. .flatMap(file ->
  94. dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
  95. .map(path -> new UploadedFileInfoDTO().location(path.toString()))
  96. .map(ResponseEntity::ok));
  97. }
  98. @Override
  99. public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
  100. ServerWebExchange exchange) {
  101. return configDto
  102. .flatMap(config -> {
  103. PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
  104. ClustersProperties clustersProperties = propertiesStructure.getKafka();
  105. return validateClustersConfig(clustersProperties)
  106. .map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
  107. })
  108. .map(ResponseEntity::ok);
  109. }
  110. private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
  111. @Nullable ClustersProperties properties) {
  112. if (properties == null || properties.getClusters() == null) {
  113. return Mono.just(Map.of());
  114. }
  115. properties.validateAndSetDefaults();
  116. return Flux.fromIterable(properties.getClusters())
  117. .flatMap(c -> kafkaClusterFactory.validate(c).map(v -> Tuples.of(c.getName(), v)))
  118. .collectMap(Tuple2::getT1, Tuple2::getT2);
  119. }
  120. }