ApplicationConfigController.java 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package com.provectus.kafka.ui.controller;
  2. import static com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction.EDIT;
  3. import static com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction.VIEW;
  4. import com.provectus.kafka.ui.api.ApplicationConfigApi;
  5. import com.provectus.kafka.ui.config.ClustersProperties;
  6. import com.provectus.kafka.ui.model.ApplicationConfigDTO;
  7. import com.provectus.kafka.ui.model.ApplicationConfigPropertiesDTO;
  8. import com.provectus.kafka.ui.model.ApplicationConfigValidationDTO;
  9. import com.provectus.kafka.ui.model.ApplicationInfoDTO;
  10. import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
  11. import com.provectus.kafka.ui.model.RestartRequestDTO;
  12. import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
  13. import com.provectus.kafka.ui.model.rbac.AccessContext;
  14. import com.provectus.kafka.ui.service.ApplicationInfoService;
  15. import com.provectus.kafka.ui.service.KafkaClusterFactory;
  16. import com.provectus.kafka.ui.util.ApplicationRestarter;
  17. import com.provectus.kafka.ui.util.DynamicConfigOperations;
  18. import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
  19. import java.util.Map;
  20. import javax.annotation.Nullable;
  21. import lombok.RequiredArgsConstructor;
  22. import lombok.extern.slf4j.Slf4j;
  23. import org.mapstruct.Mapper;
  24. import org.mapstruct.factory.Mappers;
  25. import org.springframework.http.ResponseEntity;
  26. import org.springframework.http.codec.multipart.FilePart;
  27. import org.springframework.http.codec.multipart.Part;
  28. import org.springframework.web.bind.annotation.RestController;
  29. import org.springframework.web.server.ServerWebExchange;
  30. import reactor.core.publisher.Flux;
  31. import reactor.core.publisher.Mono;
  32. import reactor.util.function.Tuple2;
  33. import reactor.util.function.Tuples;
  34. @Slf4j
  35. @RestController
  36. @RequiredArgsConstructor
  37. public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi {
  38. private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
  39. @Mapper
  40. interface PropertiesMapper {
  41. PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto);
  42. ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
  43. }
  44. private final DynamicConfigOperations dynamicConfigOperations;
  45. private final ApplicationRestarter restarter;
  46. private final KafkaClusterFactory kafkaClusterFactory;
  47. private final ApplicationInfoService applicationInfoService;
  48. @Override
  49. public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
  50. return Mono.just(applicationInfoService.getApplicationInfo()).map(ResponseEntity::ok);
  51. }
  52. @Override
  53. public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExchange exchange) {
  54. var context = AccessContext.builder()
  55. .applicationConfigActions(VIEW)
  56. .operationName("getCurrentConfig")
  57. .build();
  58. return validateAccess(context)
  59. .then(Mono.fromSupplier(() -> ResponseEntity.ok(
  60. new ApplicationConfigDTO()
  61. .properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
  62. )))
  63. .doOnEach(sig -> audit(context, sig));
  64. }
  65. @Override
  66. public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
  67. ServerWebExchange exchange) {
  68. var context = AccessContext.builder()
  69. .applicationConfigActions(EDIT)
  70. .operationName("restartWithConfig")
  71. .build();
  72. return validateAccess(context)
  73. .then(restartRequestDto)
  74. .<ResponseEntity<Void>>map(dto -> {
  75. dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
  76. restarter.requestRestart();
  77. return ResponseEntity.ok().build();
  78. })
  79. .doOnEach(sig -> audit(context, sig));
  80. }
  81. @Override
  82. public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
  83. ServerWebExchange exchange) {
  84. var context = AccessContext.builder()
  85. .applicationConfigActions(EDIT)
  86. .operationName("uploadConfigRelatedFile")
  87. .build();
  88. return validateAccess(context)
  89. .then(fileFlux.single())
  90. .flatMap(file ->
  91. dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
  92. .map(path -> new UploadedFileInfoDTO().location(path.toString()))
  93. .map(ResponseEntity::ok))
  94. .doOnEach(sig -> audit(context, sig));
  95. }
  96. @Override
  97. public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
  98. ServerWebExchange exchange) {
  99. var context = AccessContext.builder()
  100. .applicationConfigActions(EDIT)
  101. .operationName("validateConfig")
  102. .build();
  103. return validateAccess(context)
  104. .then(configDto)
  105. .flatMap(config -> {
  106. PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
  107. ClustersProperties clustersProperties = propertiesStructure.getKafka();
  108. return validateClustersConfig(clustersProperties)
  109. .map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
  110. })
  111. .map(ResponseEntity::ok)
  112. .doOnEach(sig -> audit(context, sig));
  113. }
  114. private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
  115. @Nullable ClustersProperties properties) {
  116. if (properties == null || properties.getClusters() == null) {
  117. return Mono.just(Map.of());
  118. }
  119. properties.validateAndSetDefaults();
  120. return Flux.fromIterable(properties.getClusters())
  121. .flatMap(c -> kafkaClusterFactory.validate(c).map(v -> Tuples.of(c.getName(), v)))
  122. .collectMap(Tuple2::getT1, Tuple2::getT2);
  123. }
  124. }