Audit backend (#3831)

AuditService added to log all API operations
This commit is contained in:
Ilya Kuramshin 2023-06-21 12:02:27 +04:00 committed by GitHub
parent 7f7242eb8b
commit c743067ffa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 1200 additions and 296 deletions

View file

@ -1,2 +1,2 @@
rules:
- pattern: ".*"
- pattern: ".*"

View file

@ -20,6 +20,8 @@ services:
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
DYNAMIC_CONFIG_ENABLED: 'true' # not necessary, added for tests
KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
kafka0:
image: confluentinc/cp-kafka:7.2.1.arm64

View file

@ -51,6 +51,7 @@ public class ClustersProperties {
List<Masking> masking;
Long pollingThrottleRate;
TruststoreConfig ssl;
AuditProperties audit;
}
@Data
@ -143,6 +144,17 @@ public class ClustersProperties {
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class AuditProperties {
String topic;
Integer auditTopicsPartitions;
Boolean topicAuditEnabled;
Boolean consoleAuditEnabled;
Map<String, String> auditTopicProperties;
}
@PostConstruct
public void validateAndSetDefaults() {
if (clusters != null) {

View file

@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.service.acl.AclsService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
@ -26,6 +27,7 @@ public class AclsController extends AbstractController implements AclsApi {
private final AclsService aclsService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
@ -33,12 +35,14 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("createAcl")
.build();
return accessControlService.validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -48,12 +52,14 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("deleteAcl")
.build();
return accessControlService.validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -66,6 +72,7 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.operationName("listAcls")
.build();
var resourceType = Optional.ofNullable(resourceTypeDto)
@ -83,7 +90,7 @@ public class AclsController extends AbstractController implements AclsApi {
ResponseEntity.ok(
aclsService.listAcls(getCluster(clusterName), filter)
.map(ClusterMapper::toKafkaAclDto)))
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -91,12 +98,14 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.operationName("getAclAsCsv")
.build();
return accessControlService.validateAccess(context).then(
aclsService.getAclAsCsvString(getCluster(clusterName))
.map(ResponseEntity::ok)
.flatMap(Mono::just)
.doOnEach(sig -> auditService.audit(context, sig))
);
}
@ -105,11 +114,13 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("syncAclsCsv")
.build();
return accessControlService.validateAccess(context)
.then(csvMono)
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
}

View file

@ -15,6 +15,7 @@ import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ApplicationInfoService;
import com.provectus.kafka.ui.service.KafkaClusterFactory;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationRestarter;
import com.provectus.kafka.ui.util.DynamicConfigOperations;
@ -55,6 +56,7 @@ public class ApplicationConfigController implements ApplicationConfigApi {
private final ApplicationRestarter restarter;
private final KafkaClusterFactory kafkaClusterFactory;
private final ApplicationInfoService applicationInfoService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
@ -63,62 +65,68 @@ public class ApplicationConfigController implements ApplicationConfigApi {
@Override
public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(VIEW)
.build()
)
var context = AccessContext.builder()
.applicationConfigActions(VIEW)
.operationName("getCurrentConfig")
.build();
return accessControlService.validateAccess(context)
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
new ApplicationConfigDTO()
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
)));
)))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(EDIT)
.build()
)
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("restartWithConfig")
.build();
return accessControlService.validateAccess(context)
.then(restartRequestDto)
.map(dto -> {
.<ResponseEntity<Void>>map(dto -> {
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
restarter.requestRestart();
return ResponseEntity.ok().build();
});
})
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(EDIT)
.build()
)
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("uploadConfigRelatedFile")
.build();
return accessControlService.validateAccess(context)
.then(fileFlux.single())
.flatMap(file ->
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok));
.map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
ServerWebExchange exchange) {
return configDto
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("validateConfig")
.build();
return accessControlService.validateAccess(context)
.then(configDto)
.flatMap(config -> {
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
ClustersProperties clustersProperties = propertiesStructure.getKafka();
return validateClustersConfig(clustersProperties)
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
})
.map(ResponseEntity::ok);
.map(ResponseEntity::ok)
.doOnEach(sig -> auditService.audit(context, sig));
}
private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(

View file

@ -11,8 +11,11 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.service.BrokerService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
@ -27,61 +30,78 @@ import reactor.core.publisher.Mono;
public class BrokersController extends AbstractController implements BrokersApi {
private final BrokerService brokerService;
private final ClusterMapper clusterMapper;
private final AuditService auditService;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.build());
.operationName("getBrokers")
.build();
var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
return validateAccess.thenReturn(ResponseEntity.ok(job));
return accessControlService.validateAccess(context)
.thenReturn(ResponseEntity.ok(job))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.build());
.operationName("getBrokersMetrics")
.operationParams(Map.of("id", id))
.build();
return validateAccess.then(
brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
);
return accessControlService.validateAccess(context)
.then(
brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
)
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
List<Integer> brokers,
@Nullable List<Integer> brokers,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.build());
return validateAccess.thenReturn(ResponseEntity.ok(
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers)));
List<Integer> brokerIds = brokers == null ? List.of() : brokers;
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("getAllBrokersLogdirs")
.operationParams(Map.of("brokerIds", brokerIds))
.build();
return accessControlService.validateAccess(context)
.thenReturn(ResponseEntity.ok(
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName,
Integer id,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.clusterConfigActions(ClusterConfigAction.VIEW)
.build());
.operationName("getBrokerConfig")
.operationParams(Map.of("brokerId", id))
.build();
return validateAccess.thenReturn(
return accessControlService.validateAccess(context).thenReturn(
ResponseEntity.ok(
brokerService.getBrokerConfig(getCluster(clusterName), id)
.map(clusterMapper::toBrokerConfig))
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -89,16 +109,18 @@ public class BrokersController extends AbstractController implements BrokersApi
Integer id,
Mono<BrokerLogdirUpdateDTO> brokerLogdir,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
.build());
.operationName("updateBrokerTopicPartitionLogDir")
.operationParams(Map.of("brokerId", id))
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
brokerLogdir
.flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -107,16 +129,18 @@ public class BrokersController extends AbstractController implements BrokersApi
String name,
Mono<BrokerConfigItemDTO> brokerConfig,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
.build());
.operationName("updateBrokerConfigByName")
.operationParams(Map.of("brokerId", id))
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
brokerConfig
.flatMap(bci -> brokerService.updateBrokerConfigByName(
getCluster(clusterName), id, name, bci.getValue()))
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
}

View file

@ -6,6 +6,7 @@ import com.provectus.kafka.ui.model.ClusterMetricsDTO;
import com.provectus.kafka.ui.model.ClusterStatsDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -21,6 +22,7 @@ import reactor.core.publisher.Mono;
public class ClustersController extends AbstractController implements ClustersApi {
private final ClusterService clusterService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<Flux<ClusterDTO>>> getClusters(ServerWebExchange exchange) {
@ -35,6 +37,7 @@ public class ClustersController extends AbstractController implements ClustersAp
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("getClusterMetrics")
.build();
return accessControlService.validateAccess(context)
@ -42,7 +45,8 @@ public class ClustersController extends AbstractController implements ClustersAp
clusterService.getClusterMetrics(getCluster(clusterName))
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
);
)
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -50,6 +54,7 @@ public class ClustersController extends AbstractController implements ClustersAp
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("getClusterStats")
.build();
return accessControlService.validateAccess(context)
@ -57,7 +62,8 @@ public class ClustersController extends AbstractController implements ClustersAp
clusterService.getClusterStats(getCluster(clusterName))
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
);
)
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -66,11 +72,11 @@ public class ClustersController extends AbstractController implements ClustersAp
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("updateClusterInfo")
.build();
return accessControlService.validateAccess(context)
.then(
clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok)
);
.then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
}
}

View file

@ -19,6 +19,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.OffsetsResetService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Map;
import java.util.Optional;
@ -42,6 +43,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
private final ConsumerGroupService consumerGroupService;
private final OffsetsResetService offsetsResetService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Value("${consumer.groups.page.size:25}")
private int defaultConsumerGroupsPageSize;
@ -50,44 +52,47 @@ public class ConsumerGroupsController extends AbstractController implements Cons
public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName,
String id,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(id)
.consumerGroupActions(DELETE)
.build());
.operationName("deleteConsumerGroup")
.build();
return validateAccess.then(
consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id)
.thenReturn(ResponseEntity.ok().build())
);
return accessControlService.validateAccess(context)
.then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clusterName,
String consumerGroupId,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(consumerGroupId)
.consumerGroupActions(VIEW)
.build());
.operationName("getConsumerGroup")
.build();
return validateAccess.then(
consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
return accessControlService.validateAccess(context)
.then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
.map(ConsumerGroupMapper::toDetailsDto)
.map(ResponseEntity::ok)
);
.map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(String clusterName,
String topicName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.build());
.operationName("getTopicConsumerGroups")
.build();
Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> job =
consumerGroupService.getConsumerGroupsForTopic(getCluster(clusterName), topicName)
@ -99,7 +104,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
return validateAccess.then(job);
return accessControlService.validateAccess(context)
.then(job)
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -112,12 +119,13 @@ public class ConsumerGroupsController extends AbstractController implements Cons
SortOrderDTO sortOrderDto,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
// consumer group access validation is within the service
.build());
.operationName("getConsumerGroupsPage")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
consumerGroupService.getConsumerGroupsPage(
getCluster(clusterName),
Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
@ -128,7 +136,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
)
.map(this::convertPage)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -137,12 +145,13 @@ public class ConsumerGroupsController extends AbstractController implements Cons
Mono<ConsumerGroupOffsetsResetDTO> resetDto,
ServerWebExchange exchange) {
return resetDto.flatMap(reset -> {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(reset.getTopic())
.topicActions(TopicAction.VIEW)
.consumerGroupActions(RESET_OFFSETS)
.build());
.operationName("resetConsumerGroupOffsets")
.build();
Supplier<Mono<Void>> mono = () -> {
var cluster = getCluster(clusterName);
@ -182,7 +191,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
}
};
return validateAccess.then(mono.get());
return accessControlService.validateAccess(context)
.then(mono.get())
.doOnEach(sig -> auditService.audit(context, sig));
}).thenReturn(ResponseEntity.ok().build());
}

View file

@ -18,6 +18,7 @@ import com.provectus.kafka.ui.model.TaskDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.service.KafkaConnectService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Comparator;
import java.util.Map;
@ -37,8 +38,10 @@ import reactor.core.publisher.Mono;
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
private final KafkaConnectService kafkaConnectService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
@ -54,15 +57,16 @@ public class KafkaConnectController extends AbstractController implements KafkaC
public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.build());
.operationName("getConnectors")
.build();
return validateAccess.thenReturn(
ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName))
);
return accessControlService.validateAccess(context)
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -70,16 +74,17 @@ public class KafkaConnectController extends AbstractController implements KafkaC
@Valid Mono<NewConnectorDTO> connector,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
.build());
.operationName("createConnector")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -87,17 +92,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connector(connectorName)
.build());
.operationName("getConnector")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -105,16 +111,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.build());
.operationName("deleteConnector")
.operationParams(Map.of("connectorName", connectName))
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@ -126,14 +134,23 @@ public class KafkaConnectController extends AbstractController implements KafkaC
SortOrderDTO sortOrder,
ServerWebExchange exchange
) {
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("getAllConnectors")
.build();
var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
? getConnectorsComparator(orderBy)
: getConnectorsComparator(orderBy).reversed();
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
.filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName));
.filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName))
.sort(comparator);
return Mono.just(ResponseEntity.ok(job.sort(comparator)));
return Mono.just(ResponseEntity.ok(job))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -142,17 +159,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.build());
.operationName("getConnectorConfig")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
kafkaConnectService
.getConnectorConfig(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -161,16 +179,19 @@ public class KafkaConnectController extends AbstractController implements KafkaC
Mono<Map<String, Object>> requestBody,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.build());
.operationName("setConnectorConfig")
.operationParams(Map.of("connectorName", connectorName))
.build();
return validateAccess.then(
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok));
return accessControlService.validateAccess(context).then(
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -178,7 +199,6 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName,
ConnectorActionDTO action,
ServerWebExchange exchange) {
ConnectAction[] connectActions;
if (RESTART_ACTIONS.contains(action)) {
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.RESTART};
@ -186,17 +206,19 @@ public class KafkaConnectController extends AbstractController implements KafkaC
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.EDIT};
}
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(connectActions)
.build());
.operationName("updateConnectorState")
.operationParams(Map.of("connectorName", connectorName))
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
kafkaConnectService
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -204,17 +226,19 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectName,
String connectorName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.build());
.operationName("getConnectorTasks")
.operationParams(Map.of("connectorName", connectorName))
.build();
return validateAccess.thenReturn(
return accessControlService.validateAccess(context).thenReturn(
ResponseEntity
.ok(kafkaConnectService
.getConnectorTasks(getCluster(clusterName), connectName, connectorName))
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -222,34 +246,37 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName, Integer taskId,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
.build());
.operationName("restartConnectorTask")
.operationParams(Map.of("connectorName", connectorName))
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
kafkaConnectService
.restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
String clusterName, String connectName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.build());
.operationName("getConnectorPlugins")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override

View file

@ -9,6 +9,7 @@ import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
@ -29,38 +30,43 @@ public class KsqlController extends AbstractController implements KsqlApi {
private final KsqlServiceV2 ksqlServiceV2;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
Mono<KsqlCommandV2DTO>
ksqlCommand2Dto,
Mono<KsqlCommandV2DTO> ksqlCmdDo,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.build());
return validateAccess.then(
ksqlCommand2Dto.map(dto -> {
var id = ksqlServiceV2.registerCommand(
getCluster(clusterName),
dto.getKsql(),
Optional.ofNullable(dto.getStreamsProperties()).orElse(Map.of()));
return new KsqlCommandV2ResponseDTO().pipeId(id);
}).map(ResponseEntity::ok)
);
return ksqlCmdDo.flatMap(
command -> {
var context = AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.operationName("executeKsql")
.operationParams(command)
.build();
return accessControlService.validateAccess(context).thenReturn(
new KsqlCommandV2ResponseDTO().pipeId(
ksqlServiceV2.registerCommand(
getCluster(clusterName),
command.getKsql(),
Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
.doOnEach(sig -> auditService.audit(context, sig));
}
)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
String pipeId,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.build());
.operationName("openKsqlResponsePipe")
.build();
return validateAccess.thenReturn(
return accessControlService.validateAccess(context).thenReturn(
ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
.map(table -> new KsqlResponseDTO()
.table(
@ -74,22 +80,28 @@ public class KsqlController extends AbstractController implements KsqlApi {
@Override
public Mono<ResponseEntity<Flux<KsqlStreamDescriptionDTO>>> listStreams(String clusterName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.build());
.operationName("listStreams")
.build();
return validateAccess.thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))));
return accessControlService.validateAccess(context)
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<KsqlTableDescriptionDTO>>> listTables(String clusterName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.build());
.operationName("listTables")
.build();
return validateAccess.thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))));
return accessControlService.validateAccess(context)
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
.doOnEach(sig -> auditService.audit(context, sig));
}
}

View file

@ -18,9 +18,11 @@ import com.provectus.kafka.ui.model.SerdeUsageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.service.DeserializationService;
import com.provectus.kafka.ui.service.MessagesService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
@ -46,25 +48,26 @@ public class MessagesController extends AbstractController implements MessagesAp
private final MessagesService messagesService;
private final DeserializationService deserializationService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<Void>> deleteTopicMessages(
String clusterName, String topicName, @Valid List<Integer> partitions,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_DELETE)
.build());
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).<ResponseEntity<Void>>then(
messagesService.deleteTopicMessages(
getCluster(clusterName),
topicName,
Optional.ofNullable(partitions).orElse(List.of())
).thenReturn(ResponseEntity.ok().build())
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -79,11 +82,15 @@ public class MessagesController extends AbstractController implements MessagesAp
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());
.operationName("getTopicMessages");
if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
contextBuilder.auditActions(AuditAction.VIEW);
}
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
@ -102,7 +109,10 @@ public class MessagesController extends AbstractController implements MessagesAp
)
);
return validateAccess.then(job);
var context = contextBuilder.build();
return accessControlService.validateAccess(context)
.then(job)
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -110,17 +120,18 @@ public class MessagesController extends AbstractController implements MessagesAp
String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_PRODUCE)
.build());
.operationName("sendTopicMessages")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
createTopicMessage.flatMap(msg ->
messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
).map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
/**
@ -156,12 +167,12 @@ public class MessagesController extends AbstractController implements MessagesAp
String topicName,
SerdeUsageDTO use,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.build());
.operationName("getSerdes")
.build();
TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
.key(use == SerdeUsageDTO.SERIALIZE
@ -171,7 +182,7 @@ public class MessagesController extends AbstractController implements MessagesAp
? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
: deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
return validateAccess.then(
return accessControlService.validateAccess(context).then(
Mono.just(dto)
.subscribeOn(Schedulers.boundedElastic())
.map(ResponseEntity::ok)

View file

@ -13,8 +13,10 @@ import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
import com.provectus.kafka.ui.service.SchemaRegistryService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
@ -37,6 +39,7 @@ public class SchemasController extends AbstractController implements SchemasApi
private final SchemaRegistryService schemaRegistryService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
protected KafkaCluster getCluster(String clusterName) {
@ -51,13 +54,14 @@ public class SchemasController extends AbstractController implements SchemasApi
public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.build());
.operationName("checkSchemaCompatibility")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
newSchemaSubjectMono.flatMap(subjectDTO ->
schemaRegistryService.checksSchemaCompatibility(
getCluster(clusterName),
@ -66,19 +70,20 @@ public class SchemasController extends AbstractController implements SchemasApi
))
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.CREATE)
.build());
.operationName("createNewSchema")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
newSchemaSubjectMono.flatMap(newSubject ->
schemaRegistryService.registerNewSchema(
getCluster(clusterName),
@ -87,20 +92,22 @@ public class SchemasController extends AbstractController implements SchemasApi
)
).map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> deleteLatestSchema(
String clusterName, String subject, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.build());
.operationName("deleteLatestSchema")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -108,14 +115,16 @@ public class SchemasController extends AbstractController implements SchemasApi
@Override
public Mono<ResponseEntity<Void>> deleteSchema(
String clusterName, String subject, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.build());
.operationName("deleteSchema")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -123,14 +132,16 @@ public class SchemasController extends AbstractController implements SchemasApi
@Override
public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.DELETE)
.build());
.operationName("deleteSchemaByVersion")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -138,16 +149,20 @@ public class SchemasController extends AbstractController implements SchemasApi
@Override
public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
String clusterName, String subjectName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.VIEW)
.build());
.operationName("getAllVersionsBySubject")
.build();
Flux<SchemaSubjectDTO> schemas =
schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
.map(kafkaSrMapper::toDto);
return validateAccess.thenReturn(ResponseEntity.ok(schemas));
return accessControlService.validateAccess(context)
.thenReturn(ResponseEntity.ok(schemas))
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -163,34 +178,37 @@ public class SchemasController extends AbstractController implements SchemasApi
public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName,
String subject,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.build());
.operationName("getLatestSchema")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
String clusterName, String subject, Integer version, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.build());
.operationName("getSchemaByVersion")
.operationParams(Map.of("subject", subject, "version", version))
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
schemaRegistryService.getSchemaSubjectByVersion(
getCluster(clusterName), subject, version)
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -199,6 +217,11 @@ public class SchemasController extends AbstractController implements SchemasApi
@Valid Integer perPage,
@Valid String search,
ServerWebExchange serverWebExchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("getSchemas")
.build();
return schemaRegistryService
.getAllSubjectNames(getCluster(clusterName))
.flatMapIterable(l -> l)
@ -220,25 +243,28 @@ public class SchemasController extends AbstractController implements SchemasApi
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
}).map(ResponseEntity::ok);
}).map(ResponseEntity::ok)
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)
.build());
.operationName("updateGlobalSchemaCompatibilityLevel")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
compatibilityLevelMono
.flatMap(compatibilityLevelDTO ->
schemaRegistryService.updateGlobalSchemaCompatibility(
getCluster(clusterName),
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -247,12 +273,14 @@ public class SchemasController extends AbstractController implements SchemasApi
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.EDIT)
.build());
.operationName("updateSchemaCompatibilityLevel")
.operationParams(Map.of("subject", subject))
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
compatibilityLevelMono
.flatMap(compatibilityLevelDTO ->
schemaRegistryService.updateSchemaCompatibility(
@ -260,6 +288,7 @@ public class SchemasController extends AbstractController implements SchemasApi
subject,
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}

View file

@ -27,9 +27,11 @@ import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.TopicsService;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -52,69 +54,78 @@ public class TopicsController extends AbstractController implements TopicsApi {
private final TopicAnalysisService topicAnalysisService;
private final ClusterMapper clusterMapper;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<TopicDTO>> createTopic(
String clusterName, @Valid Mono<TopicCreationDTO> topicCreation, ServerWebExchange exchange) {
String clusterName, @Valid Mono<TopicCreationDTO> topicCreationMono, ServerWebExchange exchange) {
return topicCreationMono.flatMap(topicCreation -> {
var context = AccessContext.builder()
.cluster(clusterName)
.topicActions(CREATE)
.operationName("createTopic")
.operationParams(topicCreation)
.build();
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topicActions(CREATE)
.build());
return validateAccess.then(
topicsService.createTopic(getCluster(clusterName), topicCreation)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
);
return accessControlService.validateAccess(context)
.then(topicsService.createTopic(getCluster(clusterName), topicCreation))
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
.doOnEach(sig -> auditService.audit(context, sig));
});
}
@Override
public Mono<ResponseEntity<TopicDTO>> recreateTopic(String clusterName,
String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, CREATE, DELETE)
.build());
.operationName("recreateTopic")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
topicsService.recreateTopic(getCluster(clusterName), topicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<TopicDTO>> cloneTopic(
String clusterName, String topicName, String newTopicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, CREATE)
.build());
.operationName("cloneTopic")
.operationParams(Map.of("newTopicName", newTopicName))
.build();
return validateAccess.then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
);
return accessControlService.validateAccess(context)
.then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> deleteTopic(
String clusterName, String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(DELETE)
.build());
.operationName("deleteTopic")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@ -122,13 +133,14 @@ public class TopicsController extends AbstractController implements TopicsApi {
public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
String clusterName, String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW)
.build());
.operationName("getTopicConfigs")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
topicsService.getTopicConfigs(getCluster(clusterName), topicName)
.map(lst -> lst.stream()
.map(InternalTopicConfig::from)
@ -136,24 +148,25 @@ public class TopicsController extends AbstractController implements TopicsApi {
.collect(toList()))
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
String clusterName, String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW)
.build());
.operationName("getTopicDetails")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
topicsService.getTopicDetails(getCluster(clusterName), topicName)
.map(clusterMapper::toTopicDetails)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -166,6 +179,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
@Valid SortOrderDTO sortOrder,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("getTopics")
.build();
return topicsService.getTopicsForPagination(getCluster(clusterName))
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
.flatMap(topics -> {
@ -189,14 +207,13 @@ public class TopicsController extends AbstractController implements TopicsApi {
.collect(toList());
return topicsService.loadTopics(getCluster(clusterName), topicsPage)
.flatMapMany(Flux::fromIterable)
.collectList()
.map(topicsToRender ->
new TopicsResponseDTO()
.topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
.pageCount(totalPages));
})
.map(ResponseEntity::ok);
.map(ResponseEntity::ok)
.doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -204,18 +221,19 @@ public class TopicsController extends AbstractController implements TopicsApi {
String clusterName, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, EDIT)
.build());
.operationName("updateTopic")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
topicsService
.updateTopic(getCluster(clusterName), topicName, topicUpdate)
.map(clusterMapper::toTopic)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -224,17 +242,17 @@ public class TopicsController extends AbstractController implements TopicsApi {
Mono<PartitionsIncreaseDTO> partitionsIncrease,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, EDIT)
.build());
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
partitionsIncrease.flatMap(partitions ->
topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
).map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
@ -243,31 +261,34 @@ public class TopicsController extends AbstractController implements TopicsApi {
Mono<ReplicationFactorChangeDTO> replicationFactorChange,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, EDIT)
.build());
.operationName("changeReplicationFactor")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
replicationFactorChange
.flatMap(rfc ->
topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());
.operationName("analyzeTopic")
.build();
return validateAccess.then(
return accessControlService.validateAccess(context).then(
topicAnalysisService.analyze(getCluster(clusterName), topicName)
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -275,15 +296,17 @@ public class TopicsController extends AbstractController implements TopicsApi {
@Override
public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String topicName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());
.operationName("cancelTopicAnalysis")
.build();
topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName);
return validateAccess.thenReturn(ResponseEntity.ok().build());
return accessControlService.validateAccess(context)
.then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName)))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -292,15 +315,18 @@ public class TopicsController extends AbstractController implements TopicsApi {
String topicName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());
.operationName("getTopicAnalysis")
.build();
return validateAccess.thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
.map(ResponseEntity::ok)
.orElseGet(() -> ResponseEntity.notFound().build()));
return accessControlService.validateAccess(context)
.thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
.map(ResponseEntity::ok)
.orElseGet(() -> ResponseEntity.notFound().build()))
.doOnEach(sig -> auditService.audit(context, sig));
}
private Comparator<InternalTopic> getComparatorForTopic(

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.model.rbac;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
@ -11,6 +12,7 @@ import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.Value;
import org.springframework.util.Assert;
@ -40,6 +42,11 @@ public class AccessContext {
Collection<AclAction> aclActions;
Collection<AuditAction> auditAction;
String operationName;
Object operationParams;
public static AccessContextBuilder builder() {
return new AccessContextBuilder();
}
@ -59,6 +66,10 @@ public class AccessContext {
private Collection<SchemaAction> schemaActions = Collections.emptySet();
private Collection<KsqlAction> ksqlActions = Collections.emptySet();
private Collection<AclAction> aclActions = Collections.emptySet();
private Collection<AuditAction> auditActions = Collections.emptySet();
private String operationName;
private Object operationParams;
private AccessContextBuilder() {
}
@ -141,6 +152,27 @@ public class AccessContext {
return this;
}
public AccessContextBuilder auditActions(AuditAction... actions) {
Assert.isTrue(actions.length > 0, "actions not present");
this.auditActions = List.of(actions);
return this;
}
public AccessContextBuilder operationName(String operationName) {
this.operationName = operationName;
return this;
}
public AccessContextBuilder operationParams(Object operationParams) {
this.operationParams = operationParams;
return this;
}
public AccessContextBuilder operationParams(Map<String, Object> paramsMap) {
this.operationParams = paramsMap;
return this;
}
public AccessContext build() {
return new AccessContext(
applicationConfigActions,
@ -150,7 +182,8 @@ public class AccessContext {
connect, connectActions,
connector,
schema, schemaActions,
ksqlActions, aclActions);
ksqlActions, aclActions, auditActions,
operationName, operationParams);
}
}
}

View file

@ -2,11 +2,13 @@ package com.provectus.kafka.ui.model.rbac;
import static com.provectus.kafka.ui.model.rbac.Resource.ACL;
import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.AUDIT;
import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
@ -28,7 +30,8 @@ import org.springframework.util.Assert;
@EqualsAndHashCode
public class Permission {
private static final List<Resource> RBAC_ACTION_EXEMPT_LIST = List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL);
private static final List<Resource> RBAC_ACTION_EXEMPT_LIST =
List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT);
Resource resource;
List<String> actions;
@ -79,6 +82,7 @@ public class Permission {
case CONNECT -> Arrays.stream(ConnectAction.values()).map(Enum::toString).toList();
case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList();
case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList();
case AUDIT -> Arrays.stream(AuditAction.values()).map(Enum::toString).toList();
};
}

View file

@ -12,7 +12,8 @@ public enum Resource {
SCHEMA,
CONNECT,
KSQL,
ACL;
ACL,
AUDIT;
@Nullable
public static Resource fromString(String name) {

View file

@ -0,0 +1,14 @@
package com.provectus.kafka.ui.model.rbac.permission;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
public enum AuditAction implements PermissibleAction {
VIEW;
@Nullable
public static AuditAction fromString(String name) {
return EnumUtils.getEnum(AuditAction.class, name);
}
}

View file

@ -1,4 +1,8 @@
package com.provectus.kafka.ui.model.rbac.permission;
public interface PermissibleAction {
public sealed interface PermissibleAction permits
AclAction, ApplicationConfigAction,
ConsumerGroupAction, SchemaAction,
ConnectAction, ClusterConfigAction,
KsqlAction, TopicAction, AuditAction {
}

View file

@ -17,6 +17,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.ConfigEntry;
@ -117,7 +118,7 @@ public class BrokerService {
.stream()
.map(Node::id)
.collect(Collectors.toList());
if (reqBrokers != null && !reqBrokers.isEmpty()) {
if (!reqBrokers.isEmpty()) {
brokers.retainAll(reqBrokers);
}
return admin.describeLogDirs(brokers);

View file

@ -127,13 +127,7 @@ public class MessagesService {
msg.getValueSerde().get()
);
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
try (KafkaProducer<byte[], byte[]> producer = createProducer(cluster, Map.of())) {
ProducerRecord<byte[], byte[]> producerRecord = producerRecordCreator.create(
topicDescription.name(),
msg.getPartition(),
@ -155,6 +149,18 @@ public class MessagesService {
}
}
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
Map<String, Object> additionalProps) {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
properties.putAll(additionalProps);
return new KafkaProducer<>(properties);
}
public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
ConsumerPosition consumerPosition,
@Nullable String query,

View file

@ -168,21 +168,18 @@ public class TopicsService {
.map(m -> m.values().stream().findFirst().orElse(List.of())));
}
private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient,
Mono<TopicCreationDTO> topicCreation) {
return topicCreation.flatMap(topicData ->
adminClient.createTopic(
topicData.getName(),
topicData.getPartitions(),
topicData.getReplicationFactor(),
topicData.getConfigs()
).thenReturn(topicData)
)
private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient, TopicCreationDTO topicData) {
return adminClient.createTopic(
topicData.getName(),
topicData.getPartitions(),
topicData.getReplicationFactor(),
topicData.getConfigs())
.thenReturn(topicData)
.onErrorMap(t -> new TopicMetadataException(t.getMessage(), t))
.flatMap(topicData -> loadTopicAfterCreation(c, topicData.getName()));
.then(loadTopicAfterCreation(c, topicData.getName()));
}
public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
public Mono<InternalTopic> createTopic(KafkaCluster cluster, TopicCreationDTO topicCreation) {
return adminClientService.get(cluster)
.flatMap(ac -> createTopic(cluster, ac, topicCreation));
}

View file

@ -0,0 +1,97 @@
package com.provectus.kafka.ui.service.audit;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.provectus.kafka.ui.exception.CustomBaseException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.Resource;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.SneakyThrows;
import org.springframework.security.access.AccessDeniedException;
record AuditRecord(String timestamp,
String username,
String clusterName,
List<AuditResource> resources,
String operation,
Object operationParams,
OperationResult result) {
static final JsonMapper MAPPER = new JsonMapper();
static {
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
@SneakyThrows
String toJson() {
return MAPPER.writeValueAsString(this);
}
record AuditResource(String accessType, Resource type, @Nullable Object id) {
static List<AuditResource> getAccessedResources(AccessContext ctx) {
List<AuditResource> resources = new ArrayList<>();
ctx.getClusterConfigActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CLUSTERCONFIG, null)));
ctx.getTopicActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.TOPIC, nameId(ctx.getTopic()))));
ctx.getConsumerGroupActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
ctx.getConnectActions()
.forEach(a -> {
Map<String, String> resourceId = new LinkedHashMap<>();
resourceId.put("connect", ctx.getConnect());
if (ctx.getConnector() != null) {
resourceId.put("connector", ctx.getConnector());
}
resources.add(new AuditResource(a.name(), Resource.CONNECT, resourceId));
});
ctx.getSchemaActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.SCHEMA, nameId(ctx.getSchema()))));
ctx.getKsqlActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.KSQL, null)));
ctx.getAclActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.ACL, null)));
ctx.getAuditAction()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.AUDIT, null)));
return resources;
}
@Nullable
private static Map<String, Object> nameId(@Nullable String name) {
return name != null ? Map.of("name", name) : null;
}
}
record OperationResult(boolean success, OperationError error) {
static OperationResult successful() {
return new OperationResult(true, null);
}
static OperationResult error(Throwable th) {
OperationError err = OperationError.UNRECOGNIZED_ERROR;
if (th instanceof AccessDeniedException) {
err = OperationError.ACCESS_DENIED;
} else if (th instanceof ValidationException) {
err = OperationError.VALIDATION_ERROR;
} else if (th instanceof CustomBaseException) {
err = OperationError.EXECUTION_ERROR;
}
return new OperationResult(false, err);
}
enum OperationError {
ACCESS_DENIED,
VALIDATION_ERROR,
EXECUTION_ERROR,
UNRECOGNIZED_ERROR
}
}
}

View file

@ -0,0 +1,209 @@
package com.provectus.kafka.ui.service.audit;
import static com.provectus.kafka.ui.service.MessagesService.createProducer;
import com.google.common.annotations.VisibleForTesting;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
import com.provectus.kafka.ui.config.auth.RbacUser;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
@Slf4j
@Service
public class AuditService implements Closeable {
private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of()));
private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
private static final Map<String, String> DEFAULT_AUDIT_TOPIC_CONFIG = Map.of(
"retention.ms", String.valueOf(TimeUnit.DAYS.toMillis(7)),
"cleanup.policy", "delete"
);
private static final Map<String, Object> AUDIT_PRODUCER_CONFIG = Map.of(
ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"
);
private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
private final Map<String, AuditWriter> auditWriters;
@Autowired
public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
Map<String, AuditWriter> auditWriters = new HashMap<>();
for (var cluster : clustersStorage.getKafkaClusters()) {
ReactiveAdminClient adminClient;
try {
adminClient = adminClientService.get(cluster).block();
} catch (Exception e) {
printAuditInitError(cluster, "Error connect to cluster", e);
continue;
}
createAuditWriter(cluster, adminClient, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
.ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
}
this.auditWriters = auditWriters;
}
@VisibleForTesting
AuditService(Map<String, AuditWriter> auditWriters) {
this.auditWriters = auditWriters;
}
@VisibleForTesting
static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
ReactiveAdminClient ac,
Supplier<KafkaProducer<byte[], byte[]>> producerFactory) {
var auditProps = cluster.getOriginalProperties().getAudit();
if (auditProps == null) {
return Optional.empty();
}
boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
if (!topicAudit && !consoleAudit) {
return Optional.empty();
}
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
@Nullable KafkaProducer<byte[], byte[]> producer = null;
if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
producer = producerFactory.get();
}
log.info("Audit service initialized for cluster '{}'", cluster.getName());
return Optional.of(
new AuditWriter(
cluster.getName(),
auditTopicName,
producer,
consoleAudit ? AUDIT_LOGGER : null
)
);
}
/**
* return true if topic created/existing and producing can be enabled.
*/
private static boolean createTopicIfNeeded(KafkaCluster cluster,
ReactiveAdminClient ac,
String auditTopicName,
ClustersProperties.AuditProperties auditProps) {
boolean topicExists;
try {
topicExists = ac.listTopics(true).block().contains(auditTopicName);
} catch (Exception e) {
printAuditInitError(cluster, "Error checking audit topic existence", e);
return false;
}
if (topicExists) {
return true;
}
try {
int topicPartitions =
Optional.ofNullable(auditProps.getAuditTopicsPartitions())
.orElse(DEFAULT_AUDIT_TOPIC_PARTITIONS);
Map<String, String> topicConfig = new HashMap<>(DEFAULT_AUDIT_TOPIC_CONFIG);
Optional.ofNullable(auditProps.getAuditTopicProperties())
.ifPresent(topicConfig::putAll);
log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block();
log.info("Audit topic created for cluster '{}'", cluster.getName());
return true;
} catch (Exception e) {
printAuditInitError(cluster, "Error creating topic '%s'".formatted(auditTopicName), e);
return false;
}
}
private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
log.error("-----------------------------------------------------------------");
log.error(
"Error initializing Audit Service for cluster '{}'. Audit will be disabled. See error below: ",
cluster.getName()
);
log.error("{}", errorMsg, cause);
log.error("-----------------------------------------------------------------");
}
public boolean isAuditTopic(KafkaCluster cluster, String topic) {
var writer = auditWriters.get(cluster.getName());
return writer != null
&& topic.equals(writer.targetTopic())
&& writer.isTopicWritingEnabled();
}
public void audit(AccessContext acxt, Signal<?> sig) {
if (sig.isOnComplete()) {
extractUser(sig)
.doOnNext(u -> sendAuditRecord(acxt, u))
.subscribe();
} else if (sig.isOnError()) {
extractUser(sig)
.doOnNext(u -> sendAuditRecord(acxt, u, sig.getThrowable()))
.subscribe();
}
}
private Mono<AuthenticatedUser> extractUser(Signal<?> sig) {
//see ReactiveSecurityContextHolder for impl details
Object key = SecurityContext.class;
if (sig.getContextView().hasKey(key)) {
return sig.getContextView().<Mono<SecurityContext>>get(key)
.map(context -> context.getAuthentication().getPrincipal())
.cast(RbacUser.class)
.map(user -> new AuthenticatedUser(user.name(), user.groups()))
.switchIfEmpty(NO_AUTH_USER);
} else {
return NO_AUTH_USER;
}
}
private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) {
sendAuditRecord(ctx, user, null);
}
private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
try {
if (ctx.getCluster() != null) {
var writer = auditWriters.get(ctx.getCluster());
if (writer != null) {
writer.write(ctx, user, th);
}
} else {
// cluster-independent operation
AuditWriter.writeAppOperation(AUDIT_LOGGER, ctx, user, th);
}
} catch (Exception e) {
log.warn("Error sending audit record", e);
}
}
@Override
public void close() throws IOException {
auditWriters.values().forEach(AuditWriter::close);
}
}

View file

@ -0,0 +1,78 @@
package com.provectus.kafka.ui.service.audit;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.audit.AuditRecord.AuditResource;
import com.provectus.kafka.ui.service.audit.AuditRecord.OperationResult;
import java.io.Closeable;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
@Slf4j
record AuditWriter(String clusterName,
String targetTopic,
@Nullable KafkaProducer<byte[], byte[]> producer,
@Nullable Logger consoleLogger) implements Closeable {
boolean isTopicWritingEnabled() {
return producer != null;
}
// application-level (cluster-independent) operation
static void writeAppOperation(Logger consoleLogger,
AccessContext ctx,
AuthenticatedUser user,
@Nullable Throwable th) {
consoleLogger.info(createRecord(ctx, user, th).toJson());
}
void write(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
write(createRecord(ctx, user, th));
}
private void write(AuditRecord rec) {
String json = rec.toJson();
if (consoleLogger != null) {
consoleLogger.info(json);
}
if (producer != null) {
producer.send(
new ProducerRecord<>(targetTopic, null, json.getBytes(UTF_8)),
(metadata, ex) -> {
if (ex != null) {
log.warn("Error sending Audit record to kafka for cluster {}", clusterName, ex);
}
});
}
}
private static AuditRecord createRecord(AccessContext ctx,
AuthenticatedUser user,
@Nullable Throwable th) {
return new AuditRecord(
DateTimeFormatter.ISO_INSTANT.format(Instant.now()),
user.principal(),
ctx.getCluster(), //can be null, if it is application-level action
AuditResource.getAccessedResources(ctx),
ctx.getOperationName(),
ctx.getOperationParams(),
th == null ? OperationResult.successful() : OperationResult.error(th)
);
}
@Override
public void close() {
Optional.ofNullable(producer).ifPresent(KafkaProducer::close);
}
}

View file

@ -109,7 +109,8 @@ public class AccessControlService {
&& isConnectorAccessible(context, user) // TODO connector selectors
&& isSchemaAccessible(context, user)
&& isKsqlAccessible(context, user)
&& isAclAccessible(context, user);
&& isAclAccessible(context, user)
&& isAuditAccessible(context, user);
if (!accessGranted) {
throw new AccessDeniedException("Access denied");
@ -386,6 +387,23 @@ public class AccessControlService {
return isAccessible(Resource.ACL, null, user, context, requiredActions);
}
private boolean isAuditAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (context.getAuditAction().isEmpty()) {
return true;
}
Set<String> requiredActions = context.getAuditAction()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(Resource.AUDIT, null, user, context, requiredActions);
}
public Set<ProviderAuthorityExtractor> getOauthExtractors() {
return oauthExtractors;
}

View file

@ -144,3 +144,6 @@ rbac:
- resource: acl
actions: all
- resource: audit
actions: all

View file

@ -74,6 +74,8 @@ public abstract class AbstractIntegrationTest {
System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");
System.setProperty("kafka.clusters.0.audit.topicAuditEnabled", "true");
System.setProperty("kafka.clusters.0.audit.consoleAuditEnabled", "true");
System.setProperty("kafka.clusters.1.name", SECOND_LOCAL);
System.setProperty("kafka.clusters.1.readOnly", "true");

View file

@ -9,6 +9,7 @@ import static org.mockito.Mockito.when;
import com.provectus.kafka.ui.controller.SchemasController;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.sr.model.Compatibility;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import com.provectus.kafka.ui.util.AccessControlServiceMock;
@ -41,7 +42,8 @@ public class SchemaRegistryPaginationTest {
new SchemaRegistryService.SubjectWithCompatibilityLevel(
new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock());
this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock(),
mock(AuditService.class));
this.controller.setClustersStorage(clustersStorage);
}

View file

@ -18,6 +18,7 @@ import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.AccessControlServiceMock;
import java.util.ArrayList;
@ -33,7 +34,6 @@ import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
import org.junit.jupiter.api.Test;
import org.springframework.test.util.ReflectionTestUtils;
import reactor.core.publisher.Mono;
class TopicsServicePaginationTest {
@ -46,7 +46,7 @@ class TopicsServicePaginationTest {
private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
private final TopicsController topicsController = new TopicsController(
topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService);
topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService, mock(AuditService.class));
private void init(Map<String, InternalTopic> topicsInCache) {
@ -59,7 +59,7 @@ class TopicsServicePaginationTest {
List<String> lst = a.getArgument(1);
return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
});
ReflectionTestUtils.setField(topicsController, "clustersStorage", clustersStorage);
topicsController.setClustersStorage(clustersStorage);
}
@Test

View file

@ -0,0 +1,87 @@
package com.provectus.kafka.ui.service.audit;
import static org.assertj.core.api.Assertions.assertThat;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.model.TopicCreationDTO;
import com.provectus.kafka.ui.model.rbac.Resource;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.testcontainers.shaded.org.awaitility.Awaitility;
public class AuditIntegrationTest extends AbstractIntegrationTest {
@Autowired
private WebTestClient webTestClient;
@Test
void auditRecordWrittenIntoKafkaWhenNewTopicCreated() {
String newTopicName = "test_audit_" + UUID.randomUUID();
webTestClient.post()
.uri("/api/clusters/{clusterName}/topics", LOCAL)
.bodyValue(
new TopicCreationDTO()
.replicationFactor(1)
.partitions(1)
.name(newTopicName)
)
.exchange()
.expectStatus()
.isOk();
try (var consumer = createConsumer()) {
var jsonMapper = new JsonMapper();
consumer.subscribe(List.of("__kui-audit-log"));
Awaitility.await()
.pollInSameThread()
.atMost(Duration.ofSeconds(15))
.untilAsserted(() -> {
var polled = consumer.poll(Duration.ofSeconds(1));
assertThat(polled).anySatisfy(kafkaRecord -> {
try {
AuditRecord record = jsonMapper.readValue(kafkaRecord.value(), AuditRecord.class);
assertThat(record.operation()).isEqualTo("createTopic");
assertThat(record.resources()).map(AuditRecord.AuditResource::type).contains(Resource.TOPIC);
assertThat(record.result().success()).isTrue();
assertThat(record.timestamp()).isNotBlank();
assertThat(record.clusterName()).isEqualTo(LOCAL);
assertThat(record.operationParams())
.isEqualTo(Map.of(
"name", newTopicName,
"partitions", 1,
"replicationFactor", 1,
"configs", Map.of()
));
} catch (JsonProcessingException e) {
Assertions.fail();
}
});
});
}
}
private KafkaConsumer<?, String> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, AuditIntegrationTest.class.getName());
return new KafkaConsumer<>(props);
}
}

View file

@ -0,0 +1,154 @@
package com.provectus.kafka.ui.service.audit;
import static com.provectus.kafka.ui.service.audit.AuditService.createAuditWriter;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
class AuditServiceTest {
@Test
void isAuditTopicChecksIfAuditIsEnabledForCluster() {
Map<String, AuditWriter> writers = Map.of(
"c1", new AuditWriter("с1", "c1topic", null, null),
"c2", new AuditWriter("c2", "c2topic", mock(KafkaProducer.class), null)
);
var auditService = new AuditService(writers);
assertThat(auditService.isAuditTopic(KafkaCluster.builder().name("notExist").build(), "some"))
.isFalse();
assertThat(auditService.isAuditTopic(KafkaCluster.builder().name("c1").build(), "c1topic"))
.isFalse();
assertThat(auditService.isAuditTopic(KafkaCluster.builder().name("c2").build(), "c2topic"))
.isTrue();
}
@Test
void auditCallsWriterMethodDependingOnSignal() {
var auditWriter = mock(AuditWriter.class);
var auditService = new AuditService(Map.of("test", auditWriter));
var cxt = AccessContext.builder().cluster("test").build();
auditService.audit(cxt, Signal.complete());
verify(auditWriter).write(any(), any(), eq(null));
var th = new Exception("testError");
auditService.audit(cxt, Signal.error(th));
verify(auditWriter).write(any(), any(), eq(th));
}
@Nested
class CreateAuditWriter {
private final ReactiveAdminClient adminClientMock = mock(ReactiveAdminClient.class);
private final Supplier<KafkaProducer<byte[], byte[]>> producerSupplierMock = mock(Supplier.class);
private final ClustersProperties.Cluster clustersProperties = new ClustersProperties.Cluster();
private final KafkaCluster cluster = KafkaCluster
.builder()
.name("test")
.originalProperties(clustersProperties)
.build();
@BeforeEach
void init() {
when(producerSupplierMock.get())
.thenReturn(mock(KafkaProducer.class));
}
@Test
void noWriterIfNoAuditPropsSet() {
var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
assertThat(maybeWriter).isEmpty();
}
@Test
void setsLoggerIfConsoleLoggingEnabled() {
var auditProps = new ClustersProperties.AuditProperties();
auditProps.setConsoleAuditEnabled(true);
clustersProperties.setAudit(auditProps);
var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
assertThat(maybeWriter).isPresent();
var writer = maybeWriter.get();
assertThat(writer.consoleLogger()).isNotNull();
}
@Nested
class WhenTopicAuditEnabled {
@BeforeEach
void setTopicWriteProperties() {
var auditProps = new ClustersProperties.AuditProperties();
auditProps.setTopicAuditEnabled(true);
auditProps.setTopic("test_audit_topic");
auditProps.setAuditTopicsPartitions(3);
auditProps.setAuditTopicProperties(Map.of("p1", "v1"));
clustersProperties.setAudit(auditProps);
}
@Test
void createsProducerIfTopicExists() {
when(adminClientMock.listTopics(true))
.thenReturn(Mono.just(Set.of("test_audit_topic")));
var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
assertThat(maybeWriter).isPresent();
//checking there was no topic creation request
verify(adminClientMock, times(0))
.createTopic(any(), anyInt(), anyInt(), anyMap());
var writer = maybeWriter.get();
assertThat(writer.producer()).isNotNull();
assertThat(writer.targetTopic()).isEqualTo("test_audit_topic");
}
@Test
void createsProducerAndTopicIfItIsNotExist() {
when(adminClientMock.listTopics(true))
.thenReturn(Mono.just(Set.of()));
when(adminClientMock.createTopic(eq("test_audit_topic"), eq(3), eq(null), anyMap()))
.thenReturn(Mono.empty());
var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
assertThat(maybeWriter).isPresent();
//verifying topic created
verify(adminClientMock).createTopic(eq("test_audit_topic"), eq(3), eq(null), anyMap());
var writer = maybeWriter.get();
assertThat(writer.producer()).isNotNull();
assertThat(writer.targetTopic()).isEqualTo("test_audit_topic");
}
}
}
}

View file

@ -5,7 +5,6 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Collections;
import org.mockito.Mockito;
import reactor.core.publisher.Mono;

View file

@ -3465,6 +3465,7 @@ components:
- CONNECT
- KSQL
- ACL
- AUDIT
KafkaAcl:
type: object
@ -3825,3 +3826,18 @@ components:
pollingThrottleRate:
type: integer
format: int64
audit:
type: object
properties:
topic:
type: string
auditTopicsPartitions:
type: integer
topicAuditEnabled:
type: boolean
consoleAuditEnabled:
type: boolean
auditTopicProperties:
type: object
additionalProperties:
type: string