Merge branch 'master' of github.com:provectus/kafka-ui into ISSUE-3732_quotas_apis

This commit is contained in:
iliax 2023-08-11 16:50:12 +04:00
commit 13fcdced8d
13 changed files with 167 additions and 180 deletions

View file

@ -2,12 +2,19 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
public abstract class AbstractController {
private ClustersStorage clustersStorage;
protected ClustersStorage clustersStorage;
protected AccessControlService accessControlService;
protected AuditService auditService;
protected KafkaCluster getCluster(String name) {
return clustersStorage.getClusterByName(name)
@ -15,8 +22,26 @@ public abstract class AbstractController {
String.format("Cluster with name '%s' not found", name)));
}
protected Mono<Void> validateAccess(AccessContext context) {
return accessControlService.validateAccess(context);
}
protected void audit(AccessContext acxt, Signal<?> sig) {
auditService.audit(acxt, sig);
}
@Autowired
public void setClustersStorage(ClustersStorage clustersStorage) {
this.clustersStorage = clustersStorage;
}
@Autowired
public void setAccessControlService(AccessControlService accessControlService) {
this.accessControlService = accessControlService;
}
@Autowired
public void setAuditService(AuditService auditService) {
this.auditService = auditService;
}
}

View file

@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.service.acl.AclsService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.resource.PatternType;
@ -29,8 +27,6 @@ import reactor.core.publisher.Mono;
public class AclsController extends AbstractController implements AclsApi {
private final AclsService aclsService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
@ -41,11 +37,11 @@ public class AclsController extends AbstractController implements AclsApi {
.operationName("createAcl")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -58,11 +54,11 @@ public class AclsController extends AbstractController implements AclsApi {
.operationName("deleteAcl")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -88,12 +84,12 @@ public class AclsController extends AbstractController implements AclsApi {
var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
aclsService.listAcls(getCluster(clusterName), filter)
.map(ClusterMapper::toKafkaAclDto)))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -104,11 +100,11 @@ public class AclsController extends AbstractController implements AclsApi {
.operationName("getAclAsCsv")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
aclsService.getAclAsCsvString(getCluster(clusterName))
.map(ResponseEntity::ok)
.flatMap(Mono::just)
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
);
}
@ -120,10 +116,10 @@ public class AclsController extends AbstractController implements AclsApi {
.operationName("syncAclsCsv")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(csvMono)
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -137,10 +133,10 @@ public class AclsController extends AbstractController implements AclsApi {
.operationName("createConsumerAcl")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(createConsumerAclDto)
.flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -154,10 +150,10 @@ public class AclsController extends AbstractController implements AclsApi {
.operationName("createProducerAcl")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(createProducerAclDto)
.flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -171,10 +167,10 @@ public class AclsController extends AbstractController implements AclsApi {
.operationName("createStreamAppAcl")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(createStreamAppAclDto)
.flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
}

View file

@ -15,8 +15,6 @@ import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ApplicationInfoService;
import com.provectus.kafka.ui.service.KafkaClusterFactory;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationRestarter;
import com.provectus.kafka.ui.util.DynamicConfigOperations;
import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
@ -39,7 +37,7 @@ import reactor.util.function.Tuples;
@Slf4j
@RestController
@RequiredArgsConstructor
public class ApplicationConfigController implements ApplicationConfigApi {
public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi {
private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
@ -51,12 +49,10 @@ public class ApplicationConfigController implements ApplicationConfigApi {
ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
}
private final AccessControlService accessControlService;
private final DynamicConfigOperations dynamicConfigOperations;
private final ApplicationRestarter restarter;
private final KafkaClusterFactory kafkaClusterFactory;
private final ApplicationInfoService applicationInfoService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
@ -69,12 +65,12 @@ public class ApplicationConfigController implements ApplicationConfigApi {
.applicationConfigActions(VIEW)
.operationName("getCurrentConfig")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
new ApplicationConfigDTO()
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
)))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -84,14 +80,14 @@ public class ApplicationConfigController implements ApplicationConfigApi {
.applicationConfigActions(EDIT)
.operationName("restartWithConfig")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(restartRequestDto)
.<ResponseEntity<Void>>map(dto -> {
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
restarter.requestRestart();
return ResponseEntity.ok().build();
})
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -101,13 +97,13 @@ public class ApplicationConfigController implements ApplicationConfigApi {
.applicationConfigActions(EDIT)
.operationName("uploadConfigRelatedFile")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(fileFlux.single())
.flatMap(file ->
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -117,7 +113,7 @@ public class ApplicationConfigController implements ApplicationConfigApi {
.applicationConfigActions(EDIT)
.operationName("validateConfig")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(configDto)
.flatMap(config -> {
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
@ -126,7 +122,7 @@ public class ApplicationConfigController implements ApplicationConfigApi {
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
})
.map(ResponseEntity::ok)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(

View file

@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.service.BrokerService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@ -31,9 +29,6 @@ public class BrokersController extends AbstractController implements BrokersApi
private final BrokerService brokerService;
private final ClusterMapper clusterMapper;
private final AuditService auditService;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
ServerWebExchange exchange) {
@ -43,9 +38,9 @@ public class BrokersController extends AbstractController implements BrokersApi
.build();
var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(job))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -57,14 +52,14 @@ public class BrokersController extends AbstractController implements BrokersApi
.operationParams(Map.of("id", id))
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(
brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -80,10 +75,10 @@ public class BrokersController extends AbstractController implements BrokersApi
.operationParams(Map.of("brokerIds", brokerIds))
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -97,11 +92,11 @@ public class BrokersController extends AbstractController implements BrokersApi
.operationParams(Map.of("brokerId", id))
.build();
return accessControlService.validateAccess(context).thenReturn(
return validateAccess(context).thenReturn(
ResponseEntity.ok(
brokerService.getBrokerConfig(getCluster(clusterName), id)
.map(clusterMapper::toBrokerConfig))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -116,11 +111,11 @@ public class BrokersController extends AbstractController implements BrokersApi
.operationParams(Map.of("brokerId", id))
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
brokerLogdir
.flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -136,11 +131,11 @@ public class BrokersController extends AbstractController implements BrokersApi
.operationParams(Map.of("brokerId", id))
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
brokerConfig
.flatMap(bci -> brokerService.updateBrokerConfigByName(
getCluster(clusterName), id, name, bci.getValue()))
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
}

View file

@ -6,8 +6,6 @@ import com.provectus.kafka.ui.model.ClusterMetricsDTO;
import com.provectus.kafka.ui.model.ClusterStatsDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
@ -21,8 +19,6 @@ import reactor.core.publisher.Mono;
@Slf4j
public class ClustersController extends AbstractController implements ClustersApi {
private final ClusterService clusterService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<Flux<ClusterDTO>>> getClusters(ServerWebExchange exchange) {
@ -40,13 +36,13 @@ public class ClustersController extends AbstractController implements ClustersAp
.operationName("getClusterMetrics")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(
clusterService.getClusterMetrics(getCluster(clusterName))
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -57,13 +53,13 @@ public class ClustersController extends AbstractController implements ClustersAp
.operationName("getClusterStats")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(
clusterService.getClusterStats(getCluster(clusterName))
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -75,8 +71,8 @@ public class ClustersController extends AbstractController implements ClustersAp
.operationName("updateClusterInfo")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
}

View file

@ -19,8 +19,6 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.OffsetsResetService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
@ -42,8 +40,6 @@ public class ConsumerGroupsController extends AbstractController implements Cons
private final ConsumerGroupService consumerGroupService;
private final OffsetsResetService offsetsResetService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Value("${consumer.groups.page.size:25}")
private int defaultConsumerGroupsPageSize;
@ -59,9 +55,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
.operationName("deleteConsumerGroup")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -76,11 +72,11 @@ public class ConsumerGroupsController extends AbstractController implements Cons
.operationName("getConsumerGroup")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
.map(ConsumerGroupMapper::toDetailsDto)
.map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -104,9 +100,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(job)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -125,7 +121,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
.operationName("getConsumerGroupsPage")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
consumerGroupService.getConsumerGroupsPage(
getCluster(clusterName),
Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
@ -136,7 +132,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
)
.map(this::convertPage)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -191,9 +187,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
}
};
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(mono.get())
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}).thenReturn(ResponseEntity.ok().build());
}

View file

@ -18,8 +18,6 @@ import com.provectus.kafka.ui.model.TaskDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.service.KafkaConnectService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
@ -40,8 +38,6 @@ public class KafkaConnectController extends AbstractController implements KafkaC
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
private final KafkaConnectService kafkaConnectService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
@ -64,9 +60,9 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationName("getConnectors")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -81,10 +77,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationName("createConnector")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -100,10 +96,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationName("getConnector")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -119,10 +115,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationParams(Map.of("connectorName", connectName))
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@ -150,7 +146,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.sort(comparator);
return Mono.just(ResponseEntity.ok(job))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -166,11 +162,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationName("getConnectorConfig")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
kafkaConnectService
.getConnectorConfig(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -187,11 +183,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationParams(Map.of("connectorName", connectorName))
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -214,11 +210,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationParams(Map.of("connectorName", connectorName))
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
kafkaConnectService
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -234,11 +230,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationParams(Map.of("connectorName", connectorName))
.build();
return accessControlService.validateAccess(context).thenReturn(
return validateAccess(context).thenReturn(
ResponseEntity
.ok(kafkaConnectService
.getConnectorTasks(getCluster(clusterName), connectName, connectorName))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -254,11 +250,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationParams(Map.of("connectorName", connectorName))
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
kafkaConnectService
.restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -272,11 +268,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
.operationName("getConnectorPlugins")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override

View file

@ -9,9 +9,7 @@ import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -29,8 +27,6 @@ import reactor.core.publisher.Mono;
public class KsqlController extends AbstractController implements KsqlApi {
private final KsqlServiceV2 ksqlServiceV2;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
@ -44,13 +40,13 @@ public class KsqlController extends AbstractController implements KsqlApi {
.operationName("executeKsql")
.operationParams(command)
.build();
return accessControlService.validateAccess(context).thenReturn(
return validateAccess(context).thenReturn(
new KsqlCommandV2ResponseDTO().pipeId(
ksqlServiceV2.registerCommand(
getCluster(clusterName),
command.getKsql(),
Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
)
.map(ResponseEntity::ok);
@ -66,7 +62,7 @@ public class KsqlController extends AbstractController implements KsqlApi {
.operationName("openKsqlResponsePipe")
.build();
return accessControlService.validateAccess(context).thenReturn(
return validateAccess(context).thenReturn(
ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
.map(table -> new KsqlResponseDTO()
.table(
@ -86,9 +82,9 @@ public class KsqlController extends AbstractController implements KsqlApi {
.operationName("listStreams")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -100,8 +96,8 @@ public class KsqlController extends AbstractController implements KsqlApi {
.operationName("listTables")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
}

View file

@ -24,8 +24,6 @@ import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.service.DeserializationService;
import com.provectus.kafka.ui.service.MessagesService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -49,8 +47,6 @@ public class MessagesController extends AbstractController implements MessagesAp
private final MessagesService messagesService;
private final DeserializationService deserializationService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<Void>> deleteTopicMessages(
@ -63,13 +59,13 @@ public class MessagesController extends AbstractController implements MessagesAp
.topicActions(MESSAGES_DELETE)
.build();
return accessControlService.validateAccess(context).<ResponseEntity<Void>>then(
return validateAccess(context).<ResponseEntity<Void>>then(
messagesService.deleteTopicMessages(
getCluster(clusterName),
topicName,
Optional.ofNullable(partitions).orElse(List.of())
).thenReturn(ResponseEntity.ok().build())
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -120,9 +116,9 @@ public class MessagesController extends AbstractController implements MessagesAp
);
var context = contextBuilder.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(job)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -137,11 +133,11 @@ public class MessagesController extends AbstractController implements MessagesAp
.operationName("sendTopicMessages")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
createTopicMessage.flatMap(msg ->
messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
).map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
/**
@ -192,7 +188,7 @@ public class MessagesController extends AbstractController implements MessagesAp
? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
: deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
Mono.just(dto)
.subscribeOn(Schedulers.boundedElastic())
.map(ResponseEntity::ok)

View file

@ -13,8 +13,6 @@ import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
import com.provectus.kafka.ui.service.SchemaRegistryService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -38,8 +36,6 @@ public class SchemasController extends AbstractController implements SchemasApi
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
private final SchemaRegistryService schemaRegistryService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
protected KafkaCluster getCluster(String clusterName) {
@ -61,7 +57,7 @@ public class SchemasController extends AbstractController implements SchemasApi
.operationName("checkSchemaCompatibility")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
newSchemaSubjectMono.flatMap(subjectDTO ->
schemaRegistryService.checksSchemaCompatibility(
getCluster(clusterName),
@ -70,7 +66,7 @@ public class SchemasController extends AbstractController implements SchemasApi
))
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -83,7 +79,7 @@ public class SchemasController extends AbstractController implements SchemasApi
.operationName("createNewSchema")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
newSchemaSubjectMono.flatMap(newSubject ->
schemaRegistryService.registerNewSchema(
getCluster(clusterName),
@ -92,7 +88,7 @@ public class SchemasController extends AbstractController implements SchemasApi
)
).map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -105,9 +101,9 @@ public class SchemasController extends AbstractController implements SchemasApi
.operationName("deleteLatestSchema")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -122,9 +118,9 @@ public class SchemasController extends AbstractController implements SchemasApi
.operationName("deleteSchema")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -139,9 +135,9 @@ public class SchemasController extends AbstractController implements SchemasApi
.operationName("deleteSchemaByVersion")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -160,9 +156,9 @@ public class SchemasController extends AbstractController implements SchemasApi
schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
.map(kafkaSrMapper::toDto);
return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(ResponseEntity.ok(schemas))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -185,11 +181,11 @@ public class SchemasController extends AbstractController implements SchemasApi
.operationName("getLatestSchema")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -203,12 +199,12 @@ public class SchemasController extends AbstractController implements SchemasApi
.operationParams(Map.of("subject", subject, "version", version))
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
schemaRegistryService.getSchemaSubjectByVersion(
getCluster(clusterName), subject, version)
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -244,7 +240,7 @@ public class SchemasController extends AbstractController implements SchemasApi
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
}).map(ResponseEntity::ok)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -257,14 +253,14 @@ public class SchemasController extends AbstractController implements SchemasApi
.operationName("updateGlobalSchemaCompatibilityLevel")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
compatibilityLevelMono
.flatMap(compatibilityLevelDTO ->
schemaRegistryService.updateGlobalSchemaCompatibility(
getCluster(clusterName),
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -280,7 +276,7 @@ public class SchemasController extends AbstractController implements SchemasApi
.operationParams(Map.of("subject", subject))
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
compatibilityLevelMono
.flatMap(compatibilityLevelDTO ->
schemaRegistryService.updateSchemaCompatibility(
@ -288,7 +284,7 @@ public class SchemasController extends AbstractController implements SchemasApi
subject,
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}

View file

@ -27,8 +27,6 @@ import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.TopicsService;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -53,8 +51,6 @@ public class TopicsController extends AbstractController implements TopicsApi {
private final TopicsService topicsService;
private final TopicAnalysisService topicAnalysisService;
private final ClusterMapper clusterMapper;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<TopicDTO>> createTopic(
@ -67,12 +63,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationParams(topicCreation)
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(topicsService.createTopic(getCluster(clusterName), topicCreation))
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
});
}
@ -86,11 +82,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("recreateTopic")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
topicsService.recreateTopic(getCluster(clusterName), topicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -105,11 +101,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationParams(Map.of("newTopicName", newTopicName))
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -123,11 +119,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("deleteTopic")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(
topicsService.deleteTopic(getCluster(clusterName), topicName)
.thenReturn(ResponseEntity.ok().<Void>build())
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@ -142,7 +138,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("getTopicConfigs")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
topicsService.getTopicConfigs(getCluster(clusterName), topicName)
.map(lst -> lst.stream()
.map(InternalTopicConfig::from)
@ -150,7 +146,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
.collect(toList()))
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -164,11 +160,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("getTopicDetails")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
topicsService.getTopicDetails(getCluster(clusterName), topicName)
.map(clusterMapper::toTopicDetails)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -215,7 +211,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
.pageCount(totalPages));
})
.map(ResponseEntity::ok)
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -230,12 +226,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("updateTopic")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
topicsService
.updateTopic(getCluster(clusterName), topicName, topicUpdate)
.map(clusterMapper::toTopic)
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -250,11 +246,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
.topicActions(VIEW, EDIT)
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
partitionsIncrease.flatMap(partitions ->
topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
).map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -270,12 +266,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("changeReplicationFactor")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
replicationFactorChange
.flatMap(rfc ->
topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
.map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -288,9 +284,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("analyzeTopic")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
topicAnalysisService.analyze(getCluster(clusterName), topicName)
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -305,9 +301,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("cancelTopicAnalysis")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName)))
.doOnEach(sig -> auditService.audit(context, sig))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -324,11 +320,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("getTopicAnalysis")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
.map(ResponseEntity::ok)
.orElseGet(() -> ResponseEntity.notFound().build()))
.doOnEach(sig -> auditService.audit(context, sig));
.doOnEach(sig -> audit(context, sig));
}
private Comparator<InternalTopic> getComparatorForTopic(

View file

@ -42,8 +42,9 @@ public class SchemaRegistryPaginationTest {
new SchemaRegistryService.SubjectWithCompatibilityLevel(
new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock(),
mock(AuditService.class));
this.controller = new SchemasController(schemaRegistryService);
this.controller.setAccessControlService(new AccessControlServiceMock().getMock());
this.controller.setAuditService(mock(AuditService.class));
this.controller.setClustersStorage(clustersStorage);
}

View file

@ -45,8 +45,8 @@ class TopicsServicePaginationTest {
private final ClusterMapper clusterMapper = new ClusterMapperImpl();
private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
private final TopicsController topicsController = new TopicsController(
topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService, mock(AuditService.class));
private final TopicsController topicsController =
new TopicsController(topicsService, mock(TopicAnalysisService.class), clusterMapper);
private void init(Map<String, InternalTopic> topicsInCache) {
@ -59,6 +59,8 @@ class TopicsServicePaginationTest {
List<String> lst = a.getArgument(1);
return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
});
topicsController.setAccessControlService(accessControlService);
topicsController.setAuditService(mock(AuditService.class));
topicsController.setClustersStorage(clustersStorage);
}