123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- package com.provectus.kafka.ui.controller;
- import static com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction.EDIT;
- import static com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction.VIEW;
- import com.provectus.kafka.ui.api.ApplicationConfigApi;
- import com.provectus.kafka.ui.config.ClustersProperties;
- import com.provectus.kafka.ui.model.ApplicationConfigDTO;
- import com.provectus.kafka.ui.model.ApplicationConfigPropertiesDTO;
- import com.provectus.kafka.ui.model.ApplicationConfigValidationDTO;
- import com.provectus.kafka.ui.model.ApplicationInfoDTO;
- import com.provectus.kafka.ui.model.ClusterConfigValidationDTO;
- import com.provectus.kafka.ui.model.RestartRequestDTO;
- import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
- import com.provectus.kafka.ui.model.rbac.AccessContext;
- import com.provectus.kafka.ui.service.ApplicationInfoService;
- import com.provectus.kafka.ui.service.KafkaClusterFactory;
- import com.provectus.kafka.ui.service.audit.AuditService;
- import com.provectus.kafka.ui.service.rbac.AccessControlService;
- import com.provectus.kafka.ui.util.ApplicationRestarter;
- import com.provectus.kafka.ui.util.DynamicConfigOperations;
- import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
- import java.util.Map;
- import javax.annotation.Nullable;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.mapstruct.Mapper;
- import org.mapstruct.factory.Mappers;
- import org.springframework.http.ResponseEntity;
- import org.springframework.http.codec.multipart.FilePart;
- import org.springframework.http.codec.multipart.Part;
- import org.springframework.web.bind.annotation.RestController;
- import org.springframework.web.server.ServerWebExchange;
- import reactor.core.publisher.Flux;
- import reactor.core.publisher.Mono;
- import reactor.util.function.Tuple2;
- import reactor.util.function.Tuples;
- @Slf4j
- @RestController
- @RequiredArgsConstructor
- public class ApplicationConfigController implements ApplicationConfigApi {
- private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
- @Mapper
- interface PropertiesMapper {
- PropertiesStructure fromDto(ApplicationConfigPropertiesDTO dto);
- ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
- }
- private final AccessControlService accessControlService;
- private final DynamicConfigOperations dynamicConfigOperations;
- private final ApplicationRestarter restarter;
- private final KafkaClusterFactory kafkaClusterFactory;
- private final ApplicationInfoService applicationInfoService;
- private final AuditService auditService;
- @Override
- public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
- return Mono.just(applicationInfoService.getApplicationInfo()).map(ResponseEntity::ok);
- }
- @Override
- public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExchange exchange) {
- var context = AccessContext.builder()
- .applicationConfigActions(VIEW)
- .operationName("getCurrentConfig")
- .build();
- return accessControlService.validateAccess(context)
- .then(Mono.fromSupplier(() -> ResponseEntity.ok(
- new ApplicationConfigDTO()
- .properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
- )))
- .doOnEach(sig -> auditService.audit(context, sig));
- }
- @Override
- public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
- ServerWebExchange exchange) {
- var context = AccessContext.builder()
- .applicationConfigActions(EDIT)
- .operationName("restartWithConfig")
- .build();
- return accessControlService.validateAccess(context)
- .then(restartRequestDto)
- .<ResponseEntity<Void>>map(dto -> {
- dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
- restarter.requestRestart();
- return ResponseEntity.ok().build();
- })
- .doOnEach(sig -> auditService.audit(context, sig));
- }
- @Override
- public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
- ServerWebExchange exchange) {
- var context = AccessContext.builder()
- .applicationConfigActions(EDIT)
- .operationName("uploadConfigRelatedFile")
- .build();
- return accessControlService.validateAccess(context)
- .then(fileFlux.single())
- .flatMap(file ->
- dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
- .map(path -> new UploadedFileInfoDTO().location(path.toString()))
- .map(ResponseEntity::ok))
- .doOnEach(sig -> auditService.audit(context, sig));
- }
- @Override
- public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
- ServerWebExchange exchange) {
- var context = AccessContext.builder()
- .applicationConfigActions(EDIT)
- .operationName("validateConfig")
- .build();
- return accessControlService.validateAccess(context)
- .then(configDto)
- .flatMap(config -> {
- PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
- ClustersProperties clustersProperties = propertiesStructure.getKafka();
- return validateClustersConfig(clustersProperties)
- .map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
- })
- .map(ResponseEntity::ok)
- .doOnEach(sig -> auditService.audit(context, sig));
- }
- private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
- @Nullable ClustersProperties properties) {
- if (properties == null || properties.getClusters() == null) {
- return Mono.just(Map.of());
- }
- properties.validateAndSetDefaults();
- return Flux.fromIterable(properties.getClusters())
- .flatMap(c -> kafkaClusterFactory.validate(c).map(v -> Tuples.of(c.getName(), v)))
- .collectMap(Tuple2::getT1, Tuple2::getT2);
- }
- }
|