Merge branch 'master' into ISSUE-4102_audit-log-lvls
This commit is contained in:
commit
7753e5f2de
16 changed files with 336 additions and 180 deletions
|
@ -2,12 +2,19 @@ package com.provectus.kafka.ui.controller;
|
|||
|
||||
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.ClustersStorage;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Signal;
|
||||
|
||||
public abstract class AbstractController {
|
||||
|
||||
private ClustersStorage clustersStorage;
|
||||
protected ClustersStorage clustersStorage;
|
||||
protected AccessControlService accessControlService;
|
||||
protected AuditService auditService;
|
||||
|
||||
protected KafkaCluster getCluster(String name) {
|
||||
return clustersStorage.getClusterByName(name)
|
||||
|
@ -15,8 +22,26 @@ public abstract class AbstractController {
|
|||
String.format("Cluster with name '%s' not found", name)));
|
||||
}
|
||||
|
||||
protected Mono<Void> validateAccess(AccessContext context) {
|
||||
return accessControlService.validateAccess(context);
|
||||
}
|
||||
|
||||
protected void audit(AccessContext acxt, Signal<?> sig) {
|
||||
auditService.audit(acxt, sig);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setClustersStorage(ClustersStorage clustersStorage) {
|
||||
this.clustersStorage = clustersStorage;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setAccessControlService(AccessControlService accessControlService) {
|
||||
this.accessControlService = accessControlService;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setAuditService(AuditService auditService) {
|
||||
this.auditService = auditService;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
|
||||
import com.provectus.kafka.ui.service.acl.AclsService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Optional;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.kafka.common.resource.PatternType;
|
||||
|
@ -29,8 +27,6 @@ import reactor.core.publisher.Mono;
|
|||
public class AclsController extends AbstractController implements AclsApi {
|
||||
|
||||
private final AclsService aclsService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
|
||||
|
@ -41,11 +37,11 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("createAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(kafkaAclDto)
|
||||
.map(ClusterMapper::toAclBinding)
|
||||
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -58,11 +54,11 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("deleteAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(kafkaAclDto)
|
||||
.map(ClusterMapper::toAclBinding)
|
||||
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -88,12 +84,12 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
|
||||
var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
Mono.just(
|
||||
ResponseEntity.ok(
|
||||
aclsService.listAcls(getCluster(clusterName), filter)
|
||||
.map(ClusterMapper::toKafkaAclDto)))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -104,11 +100,11 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("getAclAsCsv")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
aclsService.getAclAsCsvString(getCluster(clusterName))
|
||||
.map(ResponseEntity::ok)
|
||||
.flatMap(Mono::just)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -120,10 +116,10 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("syncAclsCsv")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(csvMono)
|
||||
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -137,10 +133,10 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("createConsumerAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(createConsumerAclDto)
|
||||
.flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -154,10 +150,10 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("createProducerAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(createProducerAclDto)
|
||||
.flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -171,10 +167,10 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("createStreamAppAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(createStreamAppAclDto)
|
||||
.flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,6 @@ import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.ApplicationInfoService;
|
||||
import com.provectus.kafka.ui.service.KafkaClusterFactory;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import com.provectus.kafka.ui.util.ApplicationRestarter;
|
||||
import com.provectus.kafka.ui.util.DynamicConfigOperations;
|
||||
import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
|
||||
|
@ -39,7 +37,7 @@ import reactor.util.function.Tuples;
|
|||
@Slf4j
|
||||
@RestController
|
||||
@RequiredArgsConstructor
|
||||
public class ApplicationConfigController implements ApplicationConfigApi {
|
||||
public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi {
|
||||
|
||||
private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
|
||||
|
||||
|
@ -51,12 +49,10 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
|
||||
}
|
||||
|
||||
private final AccessControlService accessControlService;
|
||||
private final DynamicConfigOperations dynamicConfigOperations;
|
||||
private final ApplicationRestarter restarter;
|
||||
private final KafkaClusterFactory kafkaClusterFactory;
|
||||
private final ApplicationInfoService applicationInfoService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
|
||||
|
@ -69,12 +65,12 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
.applicationConfigActions(VIEW)
|
||||
.operationName("getCurrentConfig")
|
||||
.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
|
||||
new ApplicationConfigDTO()
|
||||
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
|
||||
)))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -84,14 +80,14 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
.applicationConfigActions(EDIT)
|
||||
.operationName("restartWithConfig")
|
||||
.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(restartRequestDto)
|
||||
.<ResponseEntity<Void>>map(dto -> {
|
||||
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
|
||||
restarter.requestRestart();
|
||||
return ResponseEntity.ok().build();
|
||||
})
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,13 +97,13 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
.applicationConfigActions(EDIT)
|
||||
.operationName("uploadConfigRelatedFile")
|
||||
.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(fileFlux.single())
|
||||
.flatMap(file ->
|
||||
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
|
||||
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
|
||||
.map(ResponseEntity::ok))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -117,7 +113,7 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
.applicationConfigActions(EDIT)
|
||||
.operationName("validateConfig")
|
||||
.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(configDto)
|
||||
.flatMap(config -> {
|
||||
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
|
||||
|
@ -126,7 +122,7 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
|
||||
})
|
||||
.map(ResponseEntity::ok)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
|
||||
|
|
|
@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
|
||||
import com.provectus.kafka.ui.service.BrokerService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -31,9 +29,6 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
private final BrokerService brokerService;
|
||||
private final ClusterMapper clusterMapper;
|
||||
|
||||
private final AuditService auditService;
|
||||
private final AccessControlService accessControlService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
|
||||
ServerWebExchange exchange) {
|
||||
|
@ -43,9 +38,9 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.build();
|
||||
|
||||
var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(job))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,14 +52,14 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.operationParams(Map.of("id", id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(
|
||||
brokerService.getBrokerMetrics(getCluster(clusterName), id)
|
||||
.map(clusterMapper::toBrokerMetrics)
|
||||
.map(ResponseEntity::ok)
|
||||
.onErrorReturn(ResponseEntity.notFound().build())
|
||||
)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,10 +75,10 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.operationParams(Map.of("brokerIds", brokerIds))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(
|
||||
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -97,11 +92,11 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.operationParams(Map.of("brokerId", id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
return validateAccess(context).thenReturn(
|
||||
ResponseEntity.ok(
|
||||
brokerService.getBrokerConfig(getCluster(clusterName), id)
|
||||
.map(clusterMapper::toBrokerConfig))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -116,11 +111,11 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.operationParams(Map.of("brokerId", id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
brokerLogdir
|
||||
.flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -136,11 +131,11 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.operationParams(Map.of("brokerId", id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
brokerConfig
|
||||
.flatMap(bci -> brokerService.updateBrokerConfigByName(
|
||||
getCluster(clusterName), id, name, bci.getValue()))
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,6 @@ import com.provectus.kafka.ui.model.ClusterMetricsDTO;
|
|||
import com.provectus.kafka.ui.model.ClusterStatsDTO;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.ClusterService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
@ -21,8 +19,6 @@ import reactor.core.publisher.Mono;
|
|||
@Slf4j
|
||||
public class ClustersController extends AbstractController implements ClustersApi {
|
||||
private final ClusterService clusterService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<ClusterDTO>>> getClusters(ServerWebExchange exchange) {
|
||||
|
@ -40,13 +36,13 @@ public class ClustersController extends AbstractController implements ClustersAp
|
|||
.operationName("getClusterMetrics")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(
|
||||
clusterService.getClusterMetrics(getCluster(clusterName))
|
||||
.map(ResponseEntity::ok)
|
||||
.onErrorReturn(ResponseEntity.notFound().build())
|
||||
)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,13 +53,13 @@ public class ClustersController extends AbstractController implements ClustersAp
|
|||
.operationName("getClusterStats")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(
|
||||
clusterService.getClusterStats(getCluster(clusterName))
|
||||
.map(ResponseEntity::ok)
|
||||
.onErrorReturn(ResponseEntity.notFound().build())
|
||||
)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,8 +71,8 @@ public class ClustersController extends AbstractController implements ClustersAp
|
|||
.operationName("updateClusterInfo")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
|
|||
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
|
||||
import com.provectus.kafka.ui.service.ConsumerGroupService;
|
||||
import com.provectus.kafka.ui.service.OffsetsResetService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
@ -42,8 +40,6 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
|
||||
private final ConsumerGroupService consumerGroupService;
|
||||
private final OffsetsResetService offsetsResetService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Value("${consumer.groups.page.size:25}")
|
||||
private int defaultConsumerGroupsPageSize;
|
||||
|
@ -59,9 +55,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.operationName("deleteConsumerGroup")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -76,11 +72,11 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.operationName("getConsumerGroup")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
|
||||
.map(ConsumerGroupMapper::toDetailsDto)
|
||||
.map(ResponseEntity::ok))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -104,9 +100,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.map(ResponseEntity::ok)
|
||||
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(job)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,7 +121,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.operationName("getConsumerGroupsPage")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
consumerGroupService.getConsumerGroupsPage(
|
||||
getCluster(clusterName),
|
||||
Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
|
||||
|
@ -136,7 +132,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
)
|
||||
.map(this::convertPage)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -191,9 +187,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
}
|
||||
};
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(mono.get())
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}).thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
|
|
@ -18,8 +18,6 @@ import com.provectus.kafka.ui.model.TaskDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
|
||||
import com.provectus.kafka.ui.service.KafkaConnectService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -40,8 +38,6 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
|
||||
|
||||
private final KafkaConnectService kafkaConnectService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
|
||||
|
@ -64,9 +60,9 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("getConnectors")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -81,10 +77,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("createConnector")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -100,10 +96,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("getConnector")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -119,10 +115,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationParams(Map.of("connectorName", connectName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
|
||||
|
@ -150,7 +146,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.sort(comparator);
|
||||
|
||||
return Mono.just(ResponseEntity.ok(job))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -166,11 +162,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("getConnectorConfig")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService
|
||||
.getConnectorConfig(getCluster(clusterName), connectName, connectorName)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -187,11 +183,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService
|
||||
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
|
||||
.map(ResponseEntity::ok))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -214,11 +210,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService
|
||||
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -234,11 +230,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
return validateAccess(context).thenReturn(
|
||||
ResponseEntity
|
||||
.ok(kafkaConnectService
|
||||
.getConnectorTasks(getCluster(clusterName), connectName, connectorName))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -254,11 +250,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService
|
||||
.restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -272,11 +268,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("getConnectorPlugins")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
Mono.just(
|
||||
ResponseEntity.ok(
|
||||
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -9,9 +9,7 @@ import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
|
|||
import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -29,8 +27,6 @@ import reactor.core.publisher.Mono;
|
|||
public class KsqlController extends AbstractController implements KsqlApi {
|
||||
|
||||
private final KsqlServiceV2 ksqlServiceV2;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
|
||||
|
@ -44,13 +40,13 @@ public class KsqlController extends AbstractController implements KsqlApi {
|
|||
.operationName("executeKsql")
|
||||
.operationParams(command)
|
||||
.build();
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
return validateAccess(context).thenReturn(
|
||||
new KsqlCommandV2ResponseDTO().pipeId(
|
||||
ksqlServiceV2.registerCommand(
|
||||
getCluster(clusterName),
|
||||
command.getKsql(),
|
||||
Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
)
|
||||
.map(ResponseEntity::ok);
|
||||
|
@ -66,7 +62,7 @@ public class KsqlController extends AbstractController implements KsqlApi {
|
|||
.operationName("openKsqlResponsePipe")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
return validateAccess(context).thenReturn(
|
||||
ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
|
||||
.map(table -> new KsqlResponseDTO()
|
||||
.table(
|
||||
|
@ -86,9 +82,9 @@ public class KsqlController extends AbstractController implements KsqlApi {
|
|||
.operationName("listStreams")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -100,8 +96,8 @@ public class KsqlController extends AbstractController implements KsqlApi {
|
|||
.operationName("listTables")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,8 +24,6 @@ import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
|
|||
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
|
||||
import com.provectus.kafka.ui.service.DeserializationService;
|
||||
import com.provectus.kafka.ui.service.MessagesService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -49,8 +47,6 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
|
||||
private final MessagesService messagesService;
|
||||
private final DeserializationService deserializationService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> deleteTopicMessages(
|
||||
|
@ -63,13 +59,13 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
.topicActions(MESSAGES_DELETE)
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).<ResponseEntity<Void>>then(
|
||||
return validateAccess(context).<ResponseEntity<Void>>then(
|
||||
messagesService.deleteTopicMessages(
|
||||
getCluster(clusterName),
|
||||
topicName,
|
||||
Optional.ofNullable(partitions).orElse(List.of())
|
||||
).thenReturn(ResponseEntity.ok().build())
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -120,9 +116,9 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
);
|
||||
|
||||
var context = contextBuilder.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(job)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -137,11 +133,11 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
.operationName("sendTopicMessages")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
createTopicMessage.flatMap(msg ->
|
||||
messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
|
||||
).map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -192,7 +188,7 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
|
||||
: deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
Mono.just(dto)
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.map(ResponseEntity::ok)
|
||||
|
|
|
@ -13,8 +13,6 @@ import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
|
||||
import com.provectus.kafka.ui.service.SchemaRegistryService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -38,8 +36,6 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
|
||||
|
||||
private final SchemaRegistryService schemaRegistryService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
protected KafkaCluster getCluster(String clusterName) {
|
||||
|
@ -61,7 +57,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("checkSchemaCompatibility")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
newSchemaSubjectMono.flatMap(subjectDTO ->
|
||||
schemaRegistryService.checksSchemaCompatibility(
|
||||
getCluster(clusterName),
|
||||
|
@ -70,7 +66,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
))
|
||||
.map(kafkaSrMapper::toDto)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -83,7 +79,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("createNewSchema")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
newSchemaSubjectMono.flatMap(newSubject ->
|
||||
schemaRegistryService.registerNewSchema(
|
||||
getCluster(clusterName),
|
||||
|
@ -92,7 +88,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
)
|
||||
).map(kafkaSrMapper::toDto)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,9 +101,9 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("deleteLatestSchema")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -122,9 +118,9 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("deleteSchema")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -139,9 +135,9 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("deleteSchemaByVersion")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -160,9 +156,9 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
|
||||
.map(kafkaSrMapper::toDto);
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(schemas))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -185,11 +181,11 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("getLatestSchema")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
|
||||
.map(kafkaSrMapper::toDto)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -203,12 +199,12 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationParams(Map.of("subject", subject, "version", version))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.getSchemaSubjectByVersion(
|
||||
getCluster(clusterName), subject, version)
|
||||
.map(kafkaSrMapper::toDto)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -244,7 +240,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
|
||||
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
|
||||
}).map(ResponseEntity::ok)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -257,14 +253,14 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("updateGlobalSchemaCompatibilityLevel")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
compatibilityLevelMono
|
||||
.flatMap(compatibilityLevelDTO ->
|
||||
schemaRegistryService.updateGlobalSchemaCompatibility(
|
||||
getCluster(clusterName),
|
||||
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
|
||||
))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -280,7 +276,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationParams(Map.of("subject", subject))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
compatibilityLevelMono
|
||||
.flatMap(compatibilityLevelDTO ->
|
||||
schemaRegistryService.updateSchemaCompatibility(
|
||||
|
@ -288,7 +284,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
subject,
|
||||
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
|
||||
))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
|
|
@ -27,8 +27,6 @@ import com.provectus.kafka.ui.model.TopicsResponseDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.TopicsService;
|
||||
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -53,8 +51,6 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
private final TopicsService topicsService;
|
||||
private final TopicAnalysisService topicAnalysisService;
|
||||
private final ClusterMapper clusterMapper;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<TopicDTO>> createTopic(
|
||||
|
@ -67,12 +63,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationParams(topicCreation)
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(topicsService.createTopic(getCluster(clusterName), topicCreation))
|
||||
.map(clusterMapper::toTopic)
|
||||
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
|
||||
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -86,11 +82,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("recreateTopic")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicsService.recreateTopic(getCluster(clusterName), topicName)
|
||||
.map(clusterMapper::toTopic)
|
||||
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,11 +101,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationParams(Map.of("newTopicName", newTopicName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
|
||||
.map(clusterMapper::toTopic)
|
||||
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,11 +119,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("deleteTopic")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(
|
||||
topicsService.deleteTopic(getCluster(clusterName), topicName)
|
||||
.thenReturn(ResponseEntity.ok().<Void>build())
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
|
||||
|
@ -142,7 +138,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("getTopicConfigs")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicsService.getTopicConfigs(getCluster(clusterName), topicName)
|
||||
.map(lst -> lst.stream()
|
||||
.map(InternalTopicConfig::from)
|
||||
|
@ -150,7 +146,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.collect(toList()))
|
||||
.map(Flux::fromIterable)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -164,11 +160,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("getTopicDetails")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicsService.getTopicDetails(getCluster(clusterName), topicName)
|
||||
.map(clusterMapper::toTopicDetails)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -215,7 +211,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.pageCount(totalPages));
|
||||
})
|
||||
.map(ResponseEntity::ok)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -230,12 +226,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("updateTopic")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicsService
|
||||
.updateTopic(getCluster(clusterName), topicName, topicUpdate)
|
||||
.map(clusterMapper::toTopic)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -250,11 +246,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.topicActions(VIEW, EDIT)
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
partitionsIncrease.flatMap(partitions ->
|
||||
topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
|
||||
).map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -270,12 +266,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("changeReplicationFactor")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
replicationFactorChange
|
||||
.flatMap(rfc ->
|
||||
topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -288,9 +284,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("analyzeTopic")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicAnalysisService.analyze(getCluster(clusterName), topicName)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -305,9 +301,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("cancelTopicAnalysis")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName)))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -324,11 +320,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("getTopicAnalysis")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
|
||||
.map(ResponseEntity::ok)
|
||||
.orElseGet(() -> ResponseEntity.notFound().build()))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
private Comparator<InternalTopic> getComparatorForTopic(
|
||||
|
|
|
@ -16,6 +16,7 @@ import com.provectus.kafka.ui.serdes.builtin.HexSerde;
|
|||
import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
|
||||
|
@ -50,6 +51,7 @@ public class SerdesInitializer {
|
|||
.put(Base64Serde.name(), Base64Serde.class)
|
||||
.put(HexSerde.name(), HexSerde.class)
|
||||
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
|
||||
.put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
|
||||
.build(),
|
||||
new CustomSerdeLoader()
|
||||
);
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin;
|
||||
|
||||
import com.google.protobuf.UnknownFieldSet;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.serde.api.DeserializeResult;
|
||||
import com.provectus.kafka.ui.serde.api.RecordHeaders;
|
||||
import com.provectus.kafka.ui.serde.api.SchemaDescription;
|
||||
import com.provectus.kafka.ui.serdes.BuiltInSerde;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import lombok.SneakyThrows;
|
||||
|
||||
public class ProtobufRawSerde implements BuiltInSerde {
|
||||
|
||||
public static String name() {
|
||||
return "ProtobufDecodeRaw";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> getDescription() {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<SchemaDescription> getSchema(String topic, Target type) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canSerialize(String topic, Target type) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canDeserialize(String topic, Target type) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serializer serializer(String topic, Target type) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Deserializer deserializer(String topic, Target type) {
|
||||
return new Deserializer() {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
|
||||
try {
|
||||
UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data);
|
||||
return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of());
|
||||
} catch (Exception e) {
|
||||
throw new ValidationException(e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
|
||||
import com.google.protobuf.DescriptorProtos;
|
||||
import com.google.protobuf.Descriptors;
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
||||
import lombok.SneakyThrows;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class ProtobufRawSerdeTest {
|
||||
|
||||
private static final String DUMMY_TOPIC = "dummy-topic";
|
||||
|
||||
private ProtobufRawSerde serde;
|
||||
|
||||
@BeforeEach
|
||||
void init() {
|
||||
serde = new ProtobufRawSerde();
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
ProtobufSchema getSampleSchema() {
|
||||
return new ProtobufSchema(
|
||||
"""
|
||||
syntax = "proto3";
|
||||
message Message1 {
|
||||
int32 my_field = 1;
|
||||
}
|
||||
"""
|
||||
);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private byte[] getProtobufMessage() {
|
||||
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleSchema().toDescriptor("Message1"));
|
||||
builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
|
||||
return builder.build().toByteArray();
|
||||
}
|
||||
|
||||
@Test
|
||||
void deserializeSimpleMessage() {
|
||||
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
|
||||
.deserialize(null, getProtobufMessage());
|
||||
assertThat(deserialized.getResult()).isEqualTo("1: 5\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
void deserializeEmptyMessage() {
|
||||
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
|
||||
.deserialize(null, new byte[0]);
|
||||
assertThat(deserialized.getResult()).isEqualTo("");
|
||||
}
|
||||
|
||||
@Test
|
||||
void deserializeInvalidMessage() {
|
||||
var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
|
||||
assertThatThrownBy(() -> deserializer.deserialize(null, new byte[] { 1, 2, 3 }))
|
||||
.isInstanceOf(ValidationException.class)
|
||||
.hasMessageContaining("Protocol message contained an invalid tag");
|
||||
}
|
||||
|
||||
@Test
|
||||
void deserializeNullMessage() {
|
||||
var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
|
||||
assertThatThrownBy(() -> deserializer.deserialize(null, null))
|
||||
.isInstanceOf(ValidationException.class)
|
||||
.hasMessageContaining("Cannot read the array length");
|
||||
}
|
||||
|
||||
ProtobufSchema getSampleNestedSchema() {
|
||||
return new ProtobufSchema(
|
||||
"""
|
||||
syntax = "proto3";
|
||||
message Message2 {
|
||||
int32 my_nested_field = 1;
|
||||
}
|
||||
message Message1 {
|
||||
int32 my_field = 1;
|
||||
Message2 my_nested_message = 2;
|
||||
}
|
||||
"""
|
||||
);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private byte[] getComplexProtobufMessage() {
|
||||
DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message1"));
|
||||
builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
|
||||
DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message2"));
|
||||
nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10);
|
||||
builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build());
|
||||
|
||||
return builder.build().toByteArray();
|
||||
}
|
||||
|
||||
@Test
|
||||
void deserializeNestedMessage() {
|
||||
var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
|
||||
.deserialize(null, getComplexProtobufMessage());
|
||||
assertThat(deserialized.getResult()).isEqualTo("1: 5\n2: {\n 1: 10\n}\n");
|
||||
}
|
||||
}
|
|
@ -42,8 +42,9 @@ public class SchemaRegistryPaginationTest {
|
|||
new SchemaRegistryService.SubjectWithCompatibilityLevel(
|
||||
new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
|
||||
|
||||
this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock(),
|
||||
mock(AuditService.class));
|
||||
this.controller = new SchemasController(schemaRegistryService);
|
||||
this.controller.setAccessControlService(new AccessControlServiceMock().getMock());
|
||||
this.controller.setAuditService(mock(AuditService.class));
|
||||
this.controller.setClustersStorage(clustersStorage);
|
||||
}
|
||||
|
||||
|
|
|
@ -45,8 +45,8 @@ class TopicsServicePaginationTest {
|
|||
private final ClusterMapper clusterMapper = new ClusterMapperImpl();
|
||||
private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
|
||||
|
||||
private final TopicsController topicsController = new TopicsController(
|
||||
topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService, mock(AuditService.class));
|
||||
private final TopicsController topicsController =
|
||||
new TopicsController(topicsService, mock(TopicAnalysisService.class), clusterMapper);
|
||||
|
||||
private void init(Map<String, InternalTopic> topicsInCache) {
|
||||
|
||||
|
@ -59,6 +59,8 @@ class TopicsServicePaginationTest {
|
|||
List<String> lst = a.getArgument(1);
|
||||
return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
|
||||
});
|
||||
topicsController.setAccessControlService(accessControlService);
|
||||
topicsController.setAuditService(mock(AuditService.class));
|
||||
topicsController.setClustersStorage(clustersStorage);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue