This commit is contained in:
Ilya Kuramshin 2023-09-15 11:04:29 -04:00 committed by GitHub
commit fef1b09a5f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 578 additions and 677 deletions

View file

@ -31,7 +31,7 @@ public class AccessController implements AuthorizationApi {
private final AccessControlService accessControlService;
public Mono<ResponseEntity<AuthenticationInfoDTO>> getUserAuthInfo(ServerWebExchange exchange) {
Mono<List<UserPermissionDTO>> permissions = accessControlService.getUser()
Mono<List<UserPermissionDTO>> permissions = AccessControlService.getUser()
.map(user -> accessControlService.getRoles()
.stream()
.filter(role -> user.groups().contains(role.getName()))
@ -64,9 +64,9 @@ public class AccessController implements AuthorizationApi {
dto.setClusters(clusters);
dto.setResource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()));
dto.setValue(permission.getValue());
dto.setActions(permission.getActions()
dto.setActions(permission.getParsedActions()
.stream()
.map(String::toUpperCase)
.map(p -> p.name().toUpperCase())
.map(this::mapAction)
.filter(Objects::nonNull)
.toList());

View file

@ -49,8 +49,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(id)
.consumerGroupActions(DELETE)
.consumerGroupActions(id, DELETE)
.operationName("deleteConsumerGroup")
.build();
@ -66,8 +65,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(consumerGroupId)
.consumerGroupActions(VIEW)
.consumerGroupActions(consumerGroupId, VIEW)
.operationName("getConsumerGroup")
.build();
@ -84,8 +82,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.topicActions(topicName, TopicAction.VIEW)
.operationName("getTopicConsumerGroups")
.build();
@ -142,9 +139,8 @@ public class ConsumerGroupsController extends AbstractController implements Cons
return resetDto.flatMap(reset -> {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(reset.getTopic())
.topicActions(TopicAction.VIEW)
.consumerGroupActions(RESET_OFFSETS)
.topicActions(reset.getTopic(), TopicAction.VIEW)
.consumerGroupActions(group, RESET_OFFSETS)
.operationName("resetConsumerGroupOffsets")
.build();

View file

@ -56,8 +56,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectors")
.build();
@ -73,8 +72,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.CREATE)
.operationName("createConnector")
.build();
@ -91,9 +89,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connector(connectorName)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnector")
.build();
@ -110,8 +106,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("deleteConnector")
.operationParams(Map.of(CONNECTOR_NAME, connectName))
.build();
@ -133,7 +128,6 @@ public class KafkaConnectController extends AbstractController implements KafkaC
) {
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("getAllConnectors")
.build();
@ -143,7 +137,6 @@ public class KafkaConnectController extends AbstractController implements KafkaC
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
.filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName))
.sort(comparator);
return Mono.just(ResponseEntity.ok(job))
@ -158,8 +151,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectorConfig")
.build();
@ -178,8 +170,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("setConnectorConfig")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
@ -205,8 +196,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(connectActions)
.connectActions(connectName, connectActions)
.operationName("updateConnectorState")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
@ -225,8 +215,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectorTasks")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
@ -245,8 +234,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.RESTART)
.operationName("restartConnectorTask")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
@ -264,8 +252,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectorPlugins")
.build();

View file

@ -55,8 +55,7 @@ public class MessagesController extends AbstractController implements MessagesAp
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_DELETE)
.topicActions(topicName, MESSAGES_DELETE)
.build();
return validateAccess(context).<ResponseEntity<Void>>then(
@ -90,8 +89,7 @@ public class MessagesController extends AbstractController implements MessagesAp
ServerWebExchange exchange) {
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.topicActions(topicName, MESSAGES_READ)
.operationName("getTopicMessages");
if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
@ -128,8 +126,7 @@ public class MessagesController extends AbstractController implements MessagesAp
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_PRODUCE)
.topicActions(topicName, MESSAGES_PRODUCE)
.operationName("sendTopicMessages")
.build();
@ -175,8 +172,7 @@ public class MessagesController extends AbstractController implements MessagesAp
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.topicActions(topicName, TopicAction.VIEW)
.operationName("getSerdes")
.build();

View file

@ -51,8 +51,7 @@ public class SchemasController extends AbstractController implements SchemasApi
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subject, SchemaAction.VIEW)
.operationName("checkSchemaCompatibility")
.build();
@ -72,22 +71,23 @@ public class SchemasController extends AbstractController implements SchemasApi
public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.CREATE)
.operationName("createNewSchema")
.build();
return validateAccess(context).then(
newSchemaSubjectMono.flatMap(newSubject ->
schemaRegistryService.registerNewSchema(
getCluster(clusterName),
newSubject.getSubject(),
kafkaSrMapper.fromDto(newSubject)
)
).map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
return newSchemaSubjectMono.flatMap(newSubject -> {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(newSubject.getSubject(), SchemaAction.CREATE)
.operationName("createNewSchema")
.build();
return validateAccess(context).then(
schemaRegistryService.registerNewSchema(
getCluster(clusterName),
newSubject.getSubject(),
kafkaSrMapper.fromDto(newSubject)
))
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
.doOnEach(sig -> audit(context, sig));
}
);
}
@Override
@ -95,8 +95,7 @@ public class SchemasController extends AbstractController implements SchemasApi
String clusterName, String subject, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.schemaActions(subject, SchemaAction.DELETE)
.operationName("deleteLatestSchema")
.build();
@ -112,8 +111,7 @@ public class SchemasController extends AbstractController implements SchemasApi
String clusterName, String subject, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.schemaActions(subject, SchemaAction.DELETE)
.operationName("deleteSchema")
.build();
@ -129,8 +127,7 @@ public class SchemasController extends AbstractController implements SchemasApi
String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.DELETE)
.schemaActions(subjectName, SchemaAction.DELETE)
.operationName("deleteSchemaByVersion")
.build();
@ -146,8 +143,7 @@ public class SchemasController extends AbstractController implements SchemasApi
String clusterName, String subjectName, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subjectName, SchemaAction.VIEW)
.operationName("getAllVersionsBySubject")
.build();
@ -175,8 +171,7 @@ public class SchemasController extends AbstractController implements SchemasApi
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subject, SchemaAction.VIEW)
.operationName("getLatestSchema")
.build();
@ -192,8 +187,7 @@ public class SchemasController extends AbstractController implements SchemasApi
String clusterName, String subject, Integer version, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subject, SchemaAction.VIEW)
.operationName("getSchemaByVersion")
.operationParams(Map.of("subject", subject, "version", version))
.build();
@ -248,7 +242,7 @@ public class SchemasController extends AbstractController implements SchemasApi
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)
.schemaGlobalCompatChange()
.operationName("updateGlobalSchemaCompatibilityLevel")
.build();
@ -268,16 +262,16 @@ public class SchemasController extends AbstractController implements SchemasApi
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.EDIT)
.schemaActions(subject, SchemaAction.EDIT)
.operationName("updateSchemaCompatibilityLevel")
.operationParams(Map.of("subject", subject))
.build();
return validateAccess(context).then(
compatibilityLevelMono
.flatMap(compatibilityLevelDTO ->
return compatibilityLevelMono.flatMap(compatibilityLevelDTO ->
validateAccess(context).then(
schemaRegistryService.updateSchemaCompatibility(
getCluster(clusterName),
subject,

View file

@ -59,7 +59,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
return topicCreationMono.flatMap(topicCreation -> {
var context = AccessContext.builder()
.cluster(clusterName)
.topicActions(CREATE)
.topicActions(topicCreation.getName(), CREATE)
.operationName("createTopic")
.operationParams(topicCreation)
.build();
@ -78,8 +78,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
String topicName, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, CREATE, DELETE)
.topicActions(topicName, VIEW, CREATE, DELETE)
.operationName("recreateTopic")
.build();
@ -96,8 +95,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, CREATE)
.topicActions(topicName, VIEW, CREATE)
.operationName("cloneTopic")
.operationParams(Map.of("newTopicName", newTopicName))
.build();
@ -115,8 +113,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(DELETE)
.topicActions(topicName, DELETE)
.operationName("deleteTopic")
.build();
@ -134,8 +131,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW)
.topicActions(topicName, VIEW)
.operationName("getTopicConfigs")
.build();
@ -156,8 +152,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW)
.topicActions(topicName, VIEW)
.operationName("getTopicDetails")
.build();
@ -222,8 +217,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, EDIT)
.topicActions(topicName, VIEW, EDIT)
.operationName("updateTopic")
.build();
@ -243,8 +237,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, EDIT)
.topicActions(topicName, VIEW, EDIT)
.build();
return validateAccess(context).then(
@ -262,8 +255,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, EDIT)
.topicActions(topicName, VIEW, EDIT)
.operationName("changeReplicationFactor")
.build();
@ -280,8 +272,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.topicActions(topicName, MESSAGES_READ)
.operationName("analyzeTopic")
.build();
@ -297,8 +288,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.topicActions(topicName, MESSAGES_READ)
.operationName("cancelTopicAnalysis")
.build();
@ -316,8 +306,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.topicActions(topicName, MESSAGES_READ)
.operationName("getTopicAnalysis")
.build();

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac;
import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
@ -7,156 +8,144 @@ import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.Value;
import org.springframework.util.Assert;
import java.util.stream.Collectors;
import org.springframework.security.access.AccessDeniedException;
@Value
public class AccessContext {
public record AccessContext(String cluster,
List<ResourceAccess> accesses,
String operationName,
@Nullable Object operationParams) {
Collection<ApplicationConfigAction> applicationConfigActions;
public interface ResourceAccess {
// will be used for audit, should be serializable via json object mapper
@Nullable
Object resourceId();
String cluster;
Collection<ClusterConfigAction> clusterConfigActions;
Resource resourceType();
String topic;
Collection<TopicAction> topicActions;
Collection<PermissibleAction> requestedActions();
String consumerGroup;
Collection<ConsumerGroupAction> consumerGroupActions;
boolean isAccessible(List<Permission> userPermissions);
}
String connect;
Collection<ConnectAction> connectActions;
record SingleResourceAccess(@Nullable String name,
Resource resourceType,
Collection<PermissibleAction> requestedActions) implements ResourceAccess {
String connector;
SingleResourceAccess(Resource type, List<PermissibleAction> requestedActions) {
this(null, type, requestedActions);
}
String schema;
Collection<SchemaAction> schemaActions;
@Override
public Object resourceId() {
return name;
}
Collection<KsqlAction> ksqlActions;
@Override
public boolean isAccessible(List<Permission> userPermissions) throws AccessDeniedException {
var allowedActions = userPermissions.stream()
.filter(permission -> permission.getResource() == resourceType)
.filter(permission -> {
if (name == null && permission.getCompiledValuePattern() == null) {
return true;
}
Preconditions.checkState(permission.getCompiledValuePattern() != null && name != null);
return permission.getCompiledValuePattern().matcher(name).matches();
})
.flatMap(p -> p.getParsedActions().stream())
.collect(Collectors.toSet());
Collection<AclAction> aclActions;
Collection<AuditAction> auditAction;
String operationName;
Object operationParams;
return allowedActions.containsAll(requestedActions);
}
}
public static AccessContextBuilder builder() {
return new AccessContextBuilder();
}
public boolean isAccessible(List<Permission> userPermissions) {
return accesses().stream()
.allMatch(resourceAccess -> resourceAccess.isAccessible(userPermissions));
}
public static final class AccessContextBuilder {
private static final String ACTIONS_NOT_PRESENT = "actions not present";
private Collection<ApplicationConfigAction> applicationConfigActions = Collections.emptySet();
private String cluster;
private Collection<ClusterConfigAction> clusterConfigActions = Collections.emptySet();
private String topic;
private Collection<TopicAction> topicActions = Collections.emptySet();
private String consumerGroup;
private Collection<ConsumerGroupAction> consumerGroupActions = Collections.emptySet();
private String connect;
private Collection<ConnectAction> connectActions = Collections.emptySet();
private String connector;
private String schema;
private Collection<SchemaAction> schemaActions = Collections.emptySet();
private Collection<KsqlAction> ksqlActions = Collections.emptySet();
private Collection<AclAction> aclActions = Collections.emptySet();
private Collection<AuditAction> auditActions = Collections.emptySet();
private String operationName;
private Object operationParams;
private final List<ResourceAccess> accesses = new ArrayList<>();
private AccessContextBuilder() {
}
public AccessContextBuilder applicationConfigActions(ApplicationConfigAction... actions) {
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
this.applicationConfigActions = List.of(actions);
return this;
}
public AccessContextBuilder cluster(String cluster) {
this.cluster = cluster;
return this;
}
public AccessContextBuilder applicationConfigActions(ApplicationConfigAction... actions) {
Preconditions.checkArgument(actions.length > 0, "actions not present");
accesses.add(new SingleResourceAccess(Resource.APPLICATIONCONFIG, List.of(actions)));
return this;
}
public AccessContextBuilder clusterConfigActions(ClusterConfigAction... actions) {
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
this.clusterConfigActions = List.of(actions);
Preconditions.checkArgument(actions.length > 0, "actions not present");
accesses.add(new SingleResourceAccess(Resource.CLUSTERCONFIG, List.of(actions)));
return this;
}
public AccessContextBuilder topic(String topic) {
this.topic = topic;
public AccessContextBuilder topicActions(String topic, TopicAction... actions) {
Preconditions.checkArgument(actions.length > 0, "actions not present");
accesses.add(new SingleResourceAccess(topic, Resource.TOPIC, List.of(actions)));
return this;
}
public AccessContextBuilder topicActions(TopicAction... actions) {
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
this.topicActions = List.of(actions);
public AccessContextBuilder consumerGroupActions(String consumerGroup, ConsumerGroupAction... actions) {
Preconditions.checkArgument(actions.length > 0, "actions not present");
accesses.add(new SingleResourceAccess(consumerGroup, Resource.CONSUMER, List.of(actions)));
return this;
}
public AccessContextBuilder consumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
public AccessContextBuilder connectActions(String connect, ConnectAction... actions) {
Preconditions.checkArgument(actions.length > 0, "actions not present");
accesses.add(new SingleResourceAccess(connect, Resource.CONNECT, List.of(actions)));
return this;
}
public AccessContextBuilder consumerGroupActions(ConsumerGroupAction... actions) {
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
this.consumerGroupActions = List.of(actions);
public AccessContextBuilder schemaActions(String schema, SchemaAction... actions) {
Preconditions.checkArgument(actions.length > 0, "actions not present");
accesses.add(new SingleResourceAccess(schema, Resource.SCHEMA, List.of(actions)));
return this;
}
public AccessContextBuilder connect(String connect) {
this.connect = connect;
return this;
}
public AccessContextBuilder connectActions(ConnectAction... actions) {
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
this.connectActions = List.of(actions);
return this;
}
public AccessContextBuilder connector(String connector) {
this.connector = connector;
return this;
}
public AccessContextBuilder schema(String schema) {
this.schema = schema;
return this;
}
public AccessContextBuilder schemaActions(SchemaAction... actions) {
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
this.schemaActions = List.of(actions);
public AccessContextBuilder schemaGlobalCompatChange() {
accesses.add(new SingleResourceAccess(Resource.SCHEMA, List.of(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)));
return this;
}
public AccessContextBuilder ksqlActions(KsqlAction... actions) {
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
this.ksqlActions = List.of(actions);
Preconditions.checkArgument(actions.length > 0, "actions not present");
accesses.add(new SingleResourceAccess(Resource.KSQL, List.of(actions)));
return this;
}
public AccessContextBuilder aclActions(AclAction... actions) {
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
this.aclActions = List.of(actions);
Preconditions.checkArgument(actions.length > 0, "actions not present");
accesses.add(new SingleResourceAccess(Resource.ACL, List.of(actions)));
return this;
}
public AccessContextBuilder auditActions(AuditAction... actions) {
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
this.auditActions = List.of(actions);
Preconditions.checkArgument(actions.length > 0, "actions not present");
accesses.add(new SingleResourceAccess(Resource.AUDIT, List.of(actions)));
return this;
}
@ -170,22 +159,8 @@ public class AccessContext {
return this;
}
public AccessContextBuilder operationParams(Map<String, Object> paramsMap) {
this.operationParams = paramsMap;
return this;
}
public AccessContext build() {
return new AccessContext(
applicationConfigActions,
cluster, clusterConfigActions,
topic, topicActions,
consumerGroup, consumerGroupActions,
connect, connectActions,
connector,
schema, schemaActions,
ksqlActions, aclActions, auditActions,
operationName, operationParams);
return new AccessContext(cluster, accesses, operationName, operationParams);
}
}
}

View file

@ -1,41 +1,25 @@
package com.provectus.kafka.ui.model.rbac;
import static com.provectus.kafka.ui.model.rbac.Resource.ACL;
import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.AUDIT;
import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;
import static org.apache.commons.collections.CollectionUtils.isNotEmpty;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import java.util.Arrays;
import java.util.Collections;
import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.util.Assert;
@Getter
@ToString
@EqualsAndHashCode
public class Permission {
private static final List<Resource> RBAC_ACTION_EXEMPT_LIST =
List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT);
Resource resource;
List<String> actions;
transient List<PermissibleAction> parsedActions; //includes all dependant actions
@Nullable
String value;
@ -58,37 +42,19 @@ public class Permission {
}
public void validate() {
Assert.notNull(resource, "resource cannot be null");
if (!RBAC_ACTION_EXEMPT_LIST.contains(this.resource)) {
Assert.notNull(value, "permission value can't be empty for resource " + resource);
}
Preconditions.checkNotNull(resource, "resource cannot be null");
Preconditions.checkArgument(isNotEmpty(actions), "Actions list for %s can't be null or empty", resource);
}
public void transform() {
if (value != null) {
this.compiledValuePattern = Pattern.compile(value);
}
if (CollectionUtils.isNotEmpty(actions) && actions.stream().anyMatch("ALL"::equalsIgnoreCase)) {
this.actions = getAllActionValues();
if (actions.stream().anyMatch("ALL"::equalsIgnoreCase)) {
this.parsedActions = resource.allActions();
} else {
this.parsedActions = resource.parseActionsWithDependantsUnnest(actions);
}
}
private List<String> getAllActionValues() {
if (resource == null) {
return Collections.emptyList();
}
return switch (this.resource) {
case APPLICATIONCONFIG -> Arrays.stream(ApplicationConfigAction.values()).map(Enum::toString).toList();
case CLUSTERCONFIG -> Arrays.stream(ClusterConfigAction.values()).map(Enum::toString).toList();
case TOPIC -> Arrays.stream(TopicAction.values()).map(Enum::toString).toList();
case CONSUMER -> Arrays.stream(ConsumerGroupAction.values()).map(Enum::toString).toList();
case SCHEMA -> Arrays.stream(SchemaAction.values()).map(Enum::toString).toList();
case CONNECT -> Arrays.stream(ConnectAction.values()).map(Enum::toString).toList();
case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList();
case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList();
case AUDIT -> Arrays.stream(AuditAction.values()).map(Enum::toString).toList();
};
}
}

View file

@ -1,24 +1,65 @@
package com.provectus.kafka.ui.model.rbac;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import jakarta.annotation.Nullable;
import java.util.List;
import java.util.stream.Stream;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
public enum Resource {
APPLICATIONCONFIG,
CLUSTERCONFIG,
TOPIC,
CONSUMER,
SCHEMA,
CONNECT,
KSQL,
ACL,
AUDIT;
APPLICATIONCONFIG(ApplicationConfigAction.values()),
CLUSTERCONFIG(ClusterConfigAction.values()),
TOPIC(TopicAction.values()),
CONSUMER(ConsumerGroupAction.values()),
SCHEMA(SchemaAction.values()),
CONNECT(ConnectAction.values()),
KSQL(KsqlAction.values()),
ACL(AclAction.values()),
AUDIT(AclAction.values());
private final List<PermissibleAction> actions;
Resource(PermissibleAction[] actions) {
this.actions = List.of(actions);
}
public List<PermissibleAction> allActions() {
return actions;
}
@Nullable
public static Resource fromString(String name) {
return EnumUtils.getEnum(Resource.class, name);
}
public List<PermissibleAction> parseActionsWithDependantsUnnest(List<String> actionsToParse) {
return actionsToParse.stream()
.map(toParse -> actions.stream()
.filter(a -> toParse.equalsIgnoreCase(a.name()))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(
"'%s' actions not applicable for resource %s".formatted(toParse, name())))
)
// unnesting all dependant actions
.flatMap(a -> Stream.concat(Stream.of(a), a.unnestAllDependants()))
.toList();
}
}

View file

@ -7,12 +7,18 @@ import org.jetbrains.annotations.Nullable;
public enum AclAction implements PermissibleAction {
VIEW,
EDIT
EDIT(VIEW)
;
public static final Set<AclAction> ALTER_ACTIONS = Set.of(EDIT);
private final PermissibleAction[] dependantActions;
AclAction(AclAction... dependantActions) {
this.dependantActions = dependantActions;
}
@Nullable
public static AclAction fromString(String name) {
return EnumUtils.getEnum(AclAction.class, name);
@ -22,4 +28,9 @@ public enum AclAction implements PermissibleAction {
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}
}

View file

@ -7,12 +7,18 @@ import org.jetbrains.annotations.Nullable;
public enum ApplicationConfigAction implements PermissibleAction {
VIEW,
EDIT
EDIT(VIEW)
;
public static final Set<ApplicationConfigAction> ALTER_ACTIONS = Set.of(EDIT);
private final PermissibleAction[] dependantActions;
ApplicationConfigAction(ApplicationConfigAction... dependantActions) {
this.dependantActions = dependantActions;
}
@Nullable
public static ApplicationConfigAction fromString(String name) {
return EnumUtils.getEnum(ApplicationConfigAction.class, name);
@ -22,4 +28,9 @@ public enum ApplicationConfigAction implements PermissibleAction {
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}
}

View file

@ -12,6 +12,12 @@ public enum AuditAction implements PermissibleAction {
private static final Set<AuditAction> ALTER_ACTIONS = Set.of();
private final AclAction[] dependantActions;
AuditAction(AclAction... dependantActions) {
this.dependantActions = dependantActions;
}
@Nullable
public static AuditAction fromString(String name) {
return EnumUtils.getEnum(AuditAction.class, name);
@ -21,4 +27,9 @@ public enum AuditAction implements PermissibleAction {
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}
}

View file

@ -7,12 +7,18 @@ import org.jetbrains.annotations.Nullable;
public enum ClusterConfigAction implements PermissibleAction {
VIEW,
EDIT
EDIT(VIEW)
;
public static final Set<ClusterConfigAction> ALTER_ACTIONS = Set.of(EDIT);
private final ClusterConfigAction[] dependantActions;
ClusterConfigAction(ClusterConfigAction... dependantActions) {
this.dependantActions = dependantActions;
}
@Nullable
public static ClusterConfigAction fromString(String name) {
return EnumUtils.getEnum(ClusterConfigAction.class, name);
@ -22,4 +28,9 @@ public enum ClusterConfigAction implements PermissibleAction {
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}
}

View file

@ -7,12 +7,18 @@ import org.jetbrains.annotations.Nullable;
public enum ConnectAction implements PermissibleAction {
VIEW,
EDIT,
CREATE,
RESTART
EDIT(VIEW),
CREATE(VIEW),
RESTART(VIEW)
;
private final ConnectAction[] dependantActions;
ConnectAction(ConnectAction... dependantActions) {
this.dependantActions = dependantActions;
}
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, RESTART);
@Nullable
@ -24,4 +30,9 @@ public enum ConnectAction implements PermissibleAction {
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}
}

View file

@ -7,13 +7,19 @@ import org.jetbrains.annotations.Nullable;
public enum ConsumerGroupAction implements PermissibleAction {
VIEW,
DELETE,
RESET_OFFSETS
DELETE(VIEW),
RESET_OFFSETS(VIEW)
;
public static final Set<ConsumerGroupAction> ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS);
private final ConsumerGroupAction[] dependantActions;
ConsumerGroupAction(ConsumerGroupAction... dependantActions) {
this.dependantActions = dependantActions;
}
@Nullable
public static ConsumerGroupAction fromString(String name) {
return EnumUtils.getEnum(ConsumerGroupAction.class, name);
@ -23,4 +29,9 @@ public enum ConsumerGroupAction implements PermissibleAction {
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}
}

View file

@ -12,6 +12,12 @@ public enum KsqlAction implements PermissibleAction {
public static final Set<KsqlAction> ALTER_ACTIONS = Set.of(EXECUTE);
private final KsqlAction[] dependantActions;
KsqlAction(KsqlAction... dependantActions) {
this.dependantActions = dependantActions;
}
@Nullable
public static KsqlAction fromString(String name) {
return EnumUtils.getEnum(KsqlAction.class, name);
@ -21,4 +27,9 @@ public enum KsqlAction implements PermissibleAction {
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}
}

View file

@ -1,5 +1,7 @@
package com.provectus.kafka.ui.model.rbac.permission;
import java.util.stream.Stream;
public sealed interface PermissibleAction permits
AclAction, ApplicationConfigAction,
ConsumerGroupAction, SchemaAction,
@ -10,4 +12,16 @@ public sealed interface PermissibleAction permits
boolean isAlter();
/**
* Actions that are direct parts (childs) of this action. If current action is allowed for user, then
* all dependant actions supposed to be allowed. Dependants can also have their dependants, that can be recursively
* unnested with `unnestAllDependants` method.
*/
PermissibleAction[] dependantActions();
// recursively unnest all action's dependants
default Stream<PermissibleAction> unnestAllDependants() {
return Stream.of(dependantActions()).flatMap(dep -> Stream.concat(Stream.of(dep), dep.unnestAllDependants()));
}
}

View file

@ -7,15 +7,21 @@ import org.jetbrains.annotations.Nullable;
public enum SchemaAction implements PermissibleAction {
VIEW,
CREATE,
DELETE,
EDIT,
CREATE(VIEW),
DELETE(VIEW),
EDIT(VIEW),
MODIFY_GLOBAL_COMPATIBILITY
;
public static final Set<SchemaAction> ALTER_ACTIONS = Set.of(CREATE, DELETE, EDIT, MODIFY_GLOBAL_COMPATIBILITY);
private final SchemaAction[] dependantActions;
SchemaAction(SchemaAction... dependantActions) {
this.dependantActions = dependantActions;
}
@Nullable
public static SchemaAction fromString(String name) {
return EnumUtils.getEnum(SchemaAction.class, name);
@ -25,4 +31,9 @@ public enum SchemaAction implements PermissibleAction {
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}
}

View file

@ -7,17 +7,23 @@ import org.jetbrains.annotations.Nullable;
public enum TopicAction implements PermissibleAction {
VIEW,
CREATE,
EDIT,
DELETE,
MESSAGES_READ,
MESSAGES_PRODUCE,
MESSAGES_DELETE,
CREATE(VIEW),
EDIT(VIEW),
DELETE(VIEW),
MESSAGES_READ(VIEW),
MESSAGES_PRODUCE(VIEW),
MESSAGES_DELETE(VIEW, EDIT),
;
public static final Set<TopicAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, MESSAGES_PRODUCE, MESSAGES_DELETE);
private final TopicAction[] dependantActions;
TopicAction(TopicAction... dependantActions) {
this.dependantActions = dependantActions;
}
@Nullable
public static TopicAction fromString(String name) {
return EnumUtils.getEnum(TopicAction.class, name);
@ -27,4 +33,9 @@ public enum TopicAction implements PermissibleAction {
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
@Override
public PermissibleAction[] dependantActions() {
return dependantActions;
}
}

View file

@ -7,10 +7,8 @@ import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.Resource;
import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.SneakyThrows;
import org.springframework.security.access.AccessDeniedException;
@ -34,43 +32,17 @@ record AuditRecord(String timestamp,
return MAPPER.writeValueAsString(this);
}
record AuditResource(String accessType, boolean alter, Resource type, @Nullable Object id) {
record AuditResource(Resource type, @Nullable Object id, boolean alter, List<String> accessType) {
private static AuditResource create(PermissibleAction action, Resource type, @Nullable Object id) {
return new AuditResource(action.name(), action.isAlter(), type, id);
private static AuditResource create(Collection<PermissibleAction> actions, Resource type, @Nullable Object id) {
boolean isAlter = actions.stream().anyMatch(PermissibleAction::isAlter);
return new AuditResource(type, id, isAlter, actions.stream().map(PermissibleAction::name).toList());
}
static List<AuditResource> getAccessedResources(AccessContext ctx) {
List<AuditResource> resources = new ArrayList<>();
ctx.getClusterConfigActions()
.forEach(a -> resources.add(create(a, Resource.CLUSTERCONFIG, null)));
ctx.getTopicActions()
.forEach(a -> resources.add(create(a, Resource.TOPIC, nameId(ctx.getTopic()))));
ctx.getConsumerGroupActions()
.forEach(a -> resources.add(create(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
ctx.getConnectActions()
.forEach(a -> {
Map<String, String> resourceId = new LinkedHashMap<>();
resourceId.put("connect", ctx.getConnect());
if (ctx.getConnector() != null) {
resourceId.put("connector", ctx.getConnector());
}
resources.add(create(a, Resource.CONNECT, resourceId));
});
ctx.getSchemaActions()
.forEach(a -> resources.add(create(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
ctx.getKsqlActions()
.forEach(a -> resources.add(create(a, Resource.KSQL, null)));
ctx.getAclActions()
.forEach(a -> resources.add(create(a, Resource.ACL, null)));
ctx.getAuditAction()
.forEach(a -> resources.add(create(a, Resource.AUDIT, null)));
return resources;
}
@Nullable
private static Map<String, Object> nameId(@Nullable String name) {
return name != null ? Map.of("name", name) : null;
return ctx.accesses().stream()
.map(r -> create(r.requestedActions(), r.resourceType(), r.resourceId()))
.toList();
}
}

View file

@ -11,6 +11,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
@ -20,7 +21,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -28,9 +28,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
@ -181,31 +179,18 @@ public class AuditService implements Closeable {
public void audit(AccessContext acxt, Signal<?> sig) {
if (sig.isOnComplete()) {
extractUser(sig)
extractUser()
.doOnNext(u -> sendAuditRecord(acxt, u))
.subscribe();
} else if (sig.isOnError()) {
extractUser(sig)
extractUser()
.doOnNext(u -> sendAuditRecord(acxt, u, sig.getThrowable()))
.subscribe();
}
}
private Mono<AuthenticatedUser> extractUser(Signal<?> sig) {
//see ReactiveSecurityContextHolder for impl details
Object key = SecurityContext.class;
if (sig.getContextView().hasKey(key)) {
return sig.getContextView().<Mono<SecurityContext>>get(key)
.map(context -> context.getAuthentication().getPrincipal())
.cast(UserDetails.class)
.map(user -> {
var roles = user.getAuthorities().stream().map(GrantedAuthority::getAuthority).collect(Collectors.toSet());
return new AuthenticatedUser(user.getUsername(), roles);
})
.switchIfEmpty(NO_AUTH_USER);
} else {
return NO_AUTH_USER;
}
private Mono<AuthenticatedUser> extractUser() {
return AccessControlService.getUser().switchIfEmpty(NO_AUTH_USER);
}
private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) {
@ -214,8 +199,8 @@ public class AuditService implements Closeable {
private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
try {
if (ctx.getCluster() != null) {
var writer = auditWriters.get(ctx.getCluster());
if (ctx.cluster() != null) {
var writer = auditWriters.get(ctx.cluster());
if (writer != null) {
writer.write(ctx, user, th);
}

View file

@ -65,10 +65,10 @@ record AuditWriter(String clusterName,
return new AuditRecord(
DateTimeFormatter.ISO_INSTANT.format(Instant.now()),
user.principal(),
ctx.getCluster(), //can be null, if it is application-level action
ctx.cluster(), //can be null, if it is application-level action
AuditResource.getAccessedResources(ctx),
ctx.getOperationName(),
ctx.getOperationParams(),
ctx.operationName(),
ctx.operationParams(),
th == null ? OperationResult.successful() : OperationResult.error(th)
);
}

View file

@ -1,7 +1,5 @@
package com.provectus.kafka.ui.service.rbac;
import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
import com.provectus.kafka.ui.config.auth.RbacUser;
import com.provectus.kafka.ui.config.auth.RoleBasedAccessControlProperties;
@ -10,7 +8,6 @@ import com.provectus.kafka.ui.model.ConnectDTO;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.Permission;
import com.provectus.kafka.ui.model.rbac.Resource;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.Subject;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
@ -28,7 +25,6 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
@ -52,7 +48,6 @@ import reactor.core.publisher.Mono;
public class AccessControlService {
private static final String ACCESS_DENIED = "Access denied";
private static final String ACTIONS_ARE_EMPTY = "actions are empty";
@Nullable
private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
@ -96,44 +91,33 @@ public class AccessControlService {
}
public Mono<Void> validateAccess(AccessContext context) {
if (!rbacEnabled) {
return Mono.empty();
}
if (CollectionUtils.isNotEmpty(context.getApplicationConfigActions())) {
return getUser()
.doOnNext(user -> {
boolean accessGranted = isApplicationConfigAccessible(context, user);
if (!accessGranted) {
throw new AccessDeniedException(ACCESS_DENIED);
}
}).then();
}
return getUser()
.doOnNext(user -> {
boolean accessGranted =
isApplicationConfigAccessible(context, user)
&& isClusterAccessible(context, user)
&& isClusterConfigAccessible(context, user)
&& isTopicAccessible(context, user)
&& isConsumerGroupAccessible(context, user)
&& isConnectAccessible(context, user)
&& isConnectorAccessible(context, user) // TODO connector selectors
&& isSchemaAccessible(context, user)
&& isKsqlAccessible(context, user)
&& isAclAccessible(context, user)
&& isAuditAccessible(context, user);
if (!accessGranted) {
throw new AccessDeniedException(ACCESS_DENIED);
}
})
return isAccessible(context)
.flatMap(allowed -> allowed ? Mono.empty() : Mono.error(new AccessDeniedException(ACCESS_DENIED)))
.then();
}
public Mono<AuthenticatedUser> getUser() {
private Mono<Boolean> isAccessible(AccessContext context) {
if (!rbacEnabled) {
return Mono.just(true);
}
return getUser().map(user -> isAccessible(user, context));
}
private boolean isAccessible(AuthenticatedUser user, AccessContext context) {
if (context.cluster() != null && !isClusterAccessible(context.cluster(), user)) {
return false;
}
return context.isAccessible(getUserPermissions(user));
}
private List<Permission> getUserPermissions(AuthenticatedUser user) {
return properties.getRoles().stream()
.filter(filterRole(user))
.flatMap(role -> role.getPermissions().stream())
.toList();
}
public static Mono<AuthenticatedUser> getUser() {
return ReactiveSecurityContextHolder.getContext()
.map(SecurityContext::getAuthentication)
.filter(authentication -> authentication.getPrincipal() instanceof RbacUser)
@ -141,281 +125,67 @@ public class AccessControlService {
.map(user -> new AuthenticatedUser(user.name(), user.groups()));
}
public boolean isApplicationConfigAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (CollectionUtils.isEmpty(context.getApplicationConfigActions())) {
return true;
}
Set<String> requiredActions = context.getApplicationConfigActions()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(APPLICATIONCONFIG, null, user, context, requiredActions);
}
private boolean isClusterAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
Assert.isTrue(StringUtils.isNotEmpty(context.getCluster()), "cluster value is empty");
private boolean isClusterAccessible(String clusterName, AuthenticatedUser user) {
Assert.isTrue(StringUtils.isNotEmpty(clusterName), "cluster value is empty");
return properties.getRoles()
.stream()
.filter(filterRole(user))
.anyMatch(filterCluster(context.getCluster()));
.anyMatch(role -> role.getClusters().stream().anyMatch(clusterName::equalsIgnoreCase));
}
public Mono<Boolean> isClusterAccessible(ClusterDTO cluster) {
if (!rbacEnabled) {
return Mono.just(true);
}
AccessContext accessContext = AccessContext
.builder()
.cluster(cluster.getName())
.build();
return getUser().map(u -> isClusterAccessible(accessContext, u));
}
public boolean isClusterConfigAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (CollectionUtils.isEmpty(context.getClusterConfigActions())) {
return true;
}
Assert.isTrue(StringUtils.isNotEmpty(context.getCluster()), "cluster value is empty");
Set<String> requiredActions = context.getClusterConfigActions()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(Resource.CLUSTERCONFIG, context.getCluster(), user, context, requiredActions);
}
public boolean isTopicAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (context.getTopic() == null && context.getTopicActions().isEmpty()) {
return true;
}
Assert.isTrue(!context.getTopicActions().isEmpty(), ACTIONS_ARE_EMPTY);
Set<String> requiredActions = context.getTopicActions()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(Resource.TOPIC, context.getTopic(), user, context, requiredActions);
return getUser().map(u -> isClusterAccessible(cluster.getName(), u));
}
public Mono<List<InternalTopic>> filterViewableTopics(List<InternalTopic> topics, String clusterName) {
if (!rbacEnabled) {
return Mono.just(topics);
}
return getUser()
.map(user -> topics.stream()
.filter(topic -> {
var accessContext = AccessContext
.builder()
.cluster(clusterName)
.topic(topic.getName())
.topicActions(TopicAction.VIEW)
.build();
return isTopicAccessible(accessContext, user);
}
.filter(topic ->
isAccessible(
user,
AccessContext.builder()
.cluster(clusterName)
.topicActions(topic.getName(), TopicAction.VIEW)
.build()
)
).toList());
}
private boolean isConsumerGroupAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (context.getConsumerGroup() == null && context.getConsumerGroupActions().isEmpty()) {
return true;
}
Assert.isTrue(!context.getConsumerGroupActions().isEmpty(), ACTIONS_ARE_EMPTY);
Set<String> requiredActions = context.getConsumerGroupActions()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(Resource.CONSUMER, context.getConsumerGroup(), user, context, requiredActions);
}
public Mono<Boolean> isConsumerGroupAccessible(String groupId, String clusterName) {
if (!rbacEnabled) {
return Mono.just(true);
}
AccessContext accessContext = AccessContext
.builder()
.cluster(clusterName)
.consumerGroup(groupId)
.consumerGroupActions(ConsumerGroupAction.VIEW)
.build();
return getUser().map(u -> isConsumerGroupAccessible(accessContext, u));
}
public boolean isSchemaAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (context.getSchema() == null && context.getSchemaActions().isEmpty()) {
return true;
}
Assert.isTrue(!context.getSchemaActions().isEmpty(), ACTIONS_ARE_EMPTY);
Set<String> requiredActions = context.getSchemaActions()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(Resource.SCHEMA, context.getSchema(), user, context, requiredActions);
return isAccessible(
AccessContext.builder()
.cluster(clusterName)
.consumerGroupActions(groupId, ConsumerGroupAction.VIEW)
.build()
);
}
public Mono<Boolean> isSchemaAccessible(String schema, String clusterName) {
if (!rbacEnabled) {
return Mono.just(true);
}
AccessContext accessContext = AccessContext
.builder()
.cluster(clusterName)
.schema(schema)
.schemaActions(SchemaAction.VIEW)
.build();
return getUser().map(u -> isSchemaAccessible(accessContext, u));
}
public boolean isConnectAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (context.getConnect() == null && context.getConnectActions().isEmpty()) {
return true;
}
Assert.isTrue(!context.getConnectActions().isEmpty(), ACTIONS_ARE_EMPTY);
Set<String> requiredActions = context.getConnectActions()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(Resource.CONNECT, context.getConnect(), user, context, requiredActions);
return isAccessible(
AccessContext.builder()
.cluster(clusterName)
.schemaActions(schema, SchemaAction.VIEW)
.build()
);
}
public Mono<Boolean> isConnectAccessible(ConnectDTO dto, String clusterName) {
if (!rbacEnabled) {
return Mono.just(true);
}
return isConnectAccessible(dto.getName(), clusterName);
}
public Mono<Boolean> isConnectAccessible(String connectName, String clusterName) {
if (!rbacEnabled) {
return Mono.just(true);
}
AccessContext accessContext = AccessContext
.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.build();
return getUser().map(u -> isConnectAccessible(accessContext, u));
}
public boolean isConnectorAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
return isConnectAccessible(context, user);
}
public Mono<Boolean> isConnectorAccessible(String connectName, String connectorName, String clusterName) {
if (!rbacEnabled) {
return Mono.just(true);
}
AccessContext accessContext = AccessContext
.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connector(connectorName)
.build();
return getUser().map(u -> isConnectorAccessible(accessContext, u));
}
private boolean isKsqlAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (context.getKsqlActions().isEmpty()) {
return true;
}
Set<String> requiredActions = context.getKsqlActions()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(Resource.KSQL, null, user, context, requiredActions);
}
private boolean isAclAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (context.getAclActions().isEmpty()) {
return true;
}
Set<String> requiredActions = context.getAclActions()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(Resource.ACL, null, user, context, requiredActions);
}
private boolean isAuditAccessible(AccessContext context, AuthenticatedUser user) {
if (!rbacEnabled) {
return true;
}
if (context.getAuditAction().isEmpty()) {
return true;
}
Set<String> requiredActions = context.getAuditAction()
.stream()
.map(a -> a.toString().toUpperCase())
.collect(Collectors.toSet());
return isAccessible(Resource.AUDIT, null, user, context, requiredActions);
return isAccessible(
AccessContext.builder()
.cluster(clusterName)
.connectActions(connectName, ConnectAction.VIEW)
.build()
);
}
public Set<ProviderAuthorityExtractor> getOauthExtractors() {
@ -429,57 +199,10 @@ public class AccessControlService {
return Collections.unmodifiableList(properties.getRoles());
}
private boolean isAccessible(Resource resource, @Nullable String resourceValue,
AuthenticatedUser user, AccessContext context, Set<String> requiredActions) {
Set<String> grantedActions = properties.getRoles()
.stream()
.filter(filterRole(user))
.filter(filterCluster(resource, context.getCluster()))
.flatMap(grantedRole -> grantedRole.getPermissions().stream())
.filter(filterResource(resource))
.filter(filterResourceValue(resourceValue))
.flatMap(grantedPermission -> grantedPermission.getActions().stream())
.map(String::toUpperCase)
.collect(Collectors.toSet());
return grantedActions.containsAll(requiredActions);
}
private Predicate<Role> filterRole(AuthenticatedUser user) {
return role -> user.groups().contains(role.getName());
}
private Predicate<Role> filterCluster(String cluster) {
return grantedRole -> grantedRole.getClusters()
.stream()
.anyMatch(cluster::equalsIgnoreCase);
}
private Predicate<Role> filterCluster(Resource resource, String cluster) {
if (resource == APPLICATIONCONFIG) {
return role -> true;
}
return filterCluster(cluster);
}
private Predicate<Permission> filterResource(Resource resource) {
return grantedPermission -> resource == grantedPermission.getResource();
}
private Predicate<Permission> filterResourceValue(@Nullable String resourceValue) {
if (resourceValue == null) {
return grantedPermission -> true;
}
return grantedPermission -> {
Pattern valuePattern = grantedPermission.getCompiledValuePattern();
if (valuePattern == null) {
return true;
}
return valuePattern.matcher(resourceValue).matches();
};
}
public boolean isRbacEnabled() {
return rbacEnabled;
}

View file

@ -0,0 +1,112 @@
package com.provectus.kafka.ui.model.rbac;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.provectus.kafka.ui.model.rbac.AccessContext.ResourceAccess;
import com.provectus.kafka.ui.model.rbac.AccessContext.SingleResourceAccess;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import jakarta.annotation.Nullable;
import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
class AccessContextTest {
@Test
void validateReturnsTrueIfAllResourcesAreAccessible() {
ResourceAccess okResourceAccess1 = mock(ResourceAccess.class);
when(okResourceAccess1.isAccessible(any())).thenReturn(true);
ResourceAccess okResourceAccess2 = mock(ResourceAccess.class);
when(okResourceAccess2.isAccessible(any())).thenReturn(true);
var cxt = new AccessContext("cluster", List.of(okResourceAccess1, okResourceAccess2), "op", "params");
assertThat(cxt.isAccessible(List.of())).isTrue();
}
@Test
void validateReturnsFalseIfAnyResourcesCantBeAccessible() {
ResourceAccess okResourceAccess = mock(ResourceAccess.class);
when(okResourceAccess.isAccessible(any())).thenReturn(true);
ResourceAccess failureResourceAccess = mock(ResourceAccess.class);
when(failureResourceAccess.isAccessible(any())).thenReturn(false);
var cxt = new AccessContext("cluster", List.of(okResourceAccess, failureResourceAccess), "op", "params");
assertThat(cxt.isAccessible(List.of())).isFalse();
}
@Nested
class SingleResourceAccessTest {
@Test
void allowsAccessForResourceWithNameIfUserHasAllNeededPermissions() {
SingleResourceAccess sra =
new SingleResourceAccess("test_topic123", Resource.TOPIC, List.of(TopicAction.VIEW, TopicAction.EDIT));
var allowed = sra.isAccessible(
List.of(
permission(Resource.TOPIC, "test_topic.*", TopicAction.EDIT),
permission(Resource.TOPIC, "test.*", TopicAction.VIEW)));
assertThat(allowed).isTrue();
}
@Test
void deniesAccessForResourceWithNameIfUserHasSomePermissionsMissing() {
SingleResourceAccess sra =
new SingleResourceAccess("test_topic123", Resource.TOPIC,
List.of(TopicAction.VIEW, TopicAction.MESSAGES_DELETE));
var allowed = sra.isAccessible(
List.of(
permission(Resource.TOPIC, "test_topic.*", TopicAction.EDIT),
permission(Resource.TOPIC, "test.*", TopicAction.VIEW)));
assertThat(allowed).isFalse();
}
@Test
void allowsAccessForResourceWithoutNameIfUserHasAllNeededPermissions() {
SingleResourceAccess sra =
new SingleResourceAccess(Resource.CLUSTERCONFIG, List.of(ClusterConfigAction.VIEW));
var allowed = sra.isAccessible(
List.of(
permission(Resource.CLUSTERCONFIG, null, ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)));
assertThat(allowed).isTrue();
}
@Test
void deniesAccessForResourceWithoutNameIfUserHasAllNeededPermissions() {
SingleResourceAccess sra =
new SingleResourceAccess(Resource.CLUSTERCONFIG, List.of(ClusterConfigAction.EDIT));
var allowed = sra.isAccessible(
List.of(
permission(Resource.CLUSTERCONFIG, null, ClusterConfigAction.VIEW)));
assertThat(allowed).isFalse();
}
private Permission permission(Resource res, @Nullable String namePattern, PermissibleAction... actions) {
Permission p = new Permission();
p.setResource(res.name());
p.setActions(Stream.of(actions).map(PermissibleAction::name).toList());
p.setValue(namePattern);
p.validate();
p.transform();
return p;
}
}
}

View file

@ -0,0 +1,52 @@
package com.provectus.kafka.ui.model.rbac;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import java.util.List;
import org.junit.jupiter.api.Test;
class PermissionTest {
@Test
void transformSetsParseableFields() {
var p = new Permission();
p.setResource("toPic");
p.setActions(List.of("vIEW", "EdiT"));
p.setValue("patt|ern");
p.transform();
assertThat(p.getParsedActions())
.isEqualTo(List.of(TopicAction.VIEW, TopicAction.EDIT));
assertThat(p.getCompiledValuePattern())
.isNotNull()
.matches(pattern -> pattern.pattern().equals("patt|ern"));
}
@Test
void transformSetsFullActionsListIfAllActionPassed() {
var p = new Permission();
p.setResource("toPic");
p.setActions(List.of("All"));
p.transform();
assertThat(p.getParsedActions())
.isEqualTo(List.of(TopicAction.values()));
}
@Test
void transformUnnestsDependantActions() {
var p = new Permission();
p.setResource("toPic");
p.setActions(List.of("EDIT"));
p.transform();
assertThat(p.getParsedActions())
.containsExactlyInAnyOrder(TopicAction.VIEW, TopicAction.EDIT);
}
}

View file

@ -44,17 +44,17 @@ class AuditWriterTest {
static Stream<AccessContext> onlyLogsWhenAlterOperationIsPresentForOneOfResources() {
Stream<UnaryOperator<AccessContextBuilder>> topicEditActions =
TopicAction.ALTER_ACTIONS.stream().map(a -> c -> c.topic("test").topicActions(a));
TopicAction.ALTER_ACTIONS.stream().map(a -> c -> c.topicActions("test", a));
Stream<UnaryOperator<AccessContextBuilder>> clusterConfigEditActions =
ClusterConfigAction.ALTER_ACTIONS.stream().map(a -> c -> c.clusterConfigActions(a));
Stream<UnaryOperator<AccessContextBuilder>> aclEditActions =
AclAction.ALTER_ACTIONS.stream().map(a -> c -> c.aclActions(a));
Stream<UnaryOperator<AccessContextBuilder>> cgEditActions =
ConsumerGroupAction.ALTER_ACTIONS.stream().map(a -> c -> c.consumerGroup("cg").consumerGroupActions(a));
ConsumerGroupAction.ALTER_ACTIONS.stream().map(a -> c -> c.consumerGroupActions("cg", a));
Stream<UnaryOperator<AccessContextBuilder>> schemaEditActions =
SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a));
SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schemaActions("sc", a));
Stream<UnaryOperator<AccessContextBuilder>> connEditActions =
ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a));
ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connectActions("conn", a));
return Stream.of(
topicEditActions, clusterConfigEditActions, aclEditActions,
cgEditActions, connEditActions, schemaEditActions
@ -73,12 +73,12 @@ class AuditWriterTest {
static Stream<AccessContext> doesNothingIfNoResourceHasAlterAction() {
return Stream.<UnaryOperator<AccessContextBuilder>>of(
c -> c.topic("test").topicActions(TopicAction.VIEW),
c -> c.topicActions("test", TopicAction.VIEW),
c -> c.clusterConfigActions(ClusterConfigAction.VIEW),
c -> c.aclActions(AclAction.VIEW),
c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW),
c -> c.schema("sc").schemaActions(SchemaAction.VIEW),
c -> c.connect("conn").connectActions(ConnectAction.VIEW)
c -> c.consumerGroupActions("cg", ConsumerGroupAction.VIEW),
c -> c.schemaActions("sc", SchemaAction.VIEW),
c -> c.connectActions("conn", ConnectAction.VIEW)
).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build());
}
}