filter params added to acls endpoint
This commit is contained in:
parent
a58c2055f5
commit
807ca5ae96
6 changed files with 91 additions and 34 deletions
|
@ -3,11 +3,17 @@ package com.provectus.kafka.ui.controller;
|
|||
import com.provectus.kafka.ui.api.AclsApi;
|
||||
import com.provectus.kafka.ui.mapper.ClusterMapper;
|
||||
import com.provectus.kafka.ui.model.KafkaAclDTO;
|
||||
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
|
||||
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
|
||||
import com.provectus.kafka.ui.service.acl.AclsService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Optional;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.kafka.common.resource.PatternType;
|
||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||
import org.apache.kafka.common.resource.ResourceType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
|
@ -52,16 +58,31 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName, ServerWebExchange exchange) {
|
||||
public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
|
||||
KafkaAclResourceTypeDTO resourceTypeDto,
|
||||
String resourceName,
|
||||
KafkaAclNamePatternTypeDTO namePatternTypeDto,
|
||||
ServerWebExchange exchange) {
|
||||
AccessContext context = AccessContext.builder()
|
||||
.cluster(clusterName)
|
||||
.aclActions(AclAction.VIEW)
|
||||
.build();
|
||||
|
||||
var resourceType = Optional.ofNullable(resourceTypeDto)
|
||||
.map(ClusterMapper::mapAclResourceTypeDto)
|
||||
.orElse(ResourceType.ANY);
|
||||
|
||||
var namePatternType = Optional.ofNullable(namePatternTypeDto)
|
||||
.map(ClusterMapper::mapPatternTypeDto)
|
||||
.orElse(PatternType.ANY);
|
||||
|
||||
var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
Mono.just(
|
||||
ResponseEntity.ok(
|
||||
aclsService.listAcls(getCluster(clusterName)).map(ClusterMapper::toKafkaAclDto)))
|
||||
aclsService.listAcls(getCluster(clusterName), filter)
|
||||
.map(ClusterMapper::toKafkaAclDto)))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,8 @@ import com.provectus.kafka.ui.model.InternalReplica;
|
|||
import com.provectus.kafka.ui.model.InternalTopic;
|
||||
import com.provectus.kafka.ui.model.InternalTopicConfig;
|
||||
import com.provectus.kafka.ui.model.KafkaAclDTO;
|
||||
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
|
||||
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
|
||||
import com.provectus.kafka.ui.model.MetricDTO;
|
||||
import com.provectus.kafka.ui.model.Metrics;
|
||||
import com.provectus.kafka.ui.model.PartitionDTO;
|
||||
|
@ -136,25 +138,33 @@ public interface ClusterMapper {
|
|||
};
|
||||
}
|
||||
|
||||
static KafkaAclDTO.ResourceTypeEnum mapAclResourceType(ResourceType resourceType) {
|
||||
static KafkaAclResourceTypeDTO mapAclResourceType(ResourceType resourceType) {
|
||||
return switch (resourceType) {
|
||||
case CLUSTER -> KafkaAclDTO.ResourceTypeEnum.CLUSTER;
|
||||
case TOPIC -> KafkaAclDTO.ResourceTypeEnum.TOPIC;
|
||||
case GROUP -> KafkaAclDTO.ResourceTypeEnum.GROUP;
|
||||
case DELEGATION_TOKEN -> KafkaAclDTO.ResourceTypeEnum.DELEGATION_TOKEN;
|
||||
case TRANSACTIONAL_ID -> KafkaAclDTO.ResourceTypeEnum.TRANSACTIONAL_ID;
|
||||
case USER -> KafkaAclDTO.ResourceTypeEnum.USER;
|
||||
case CLUSTER -> KafkaAclResourceTypeDTO.CLUSTER;
|
||||
case TOPIC -> KafkaAclResourceTypeDTO.TOPIC;
|
||||
case GROUP -> KafkaAclResourceTypeDTO.GROUP;
|
||||
case DELEGATION_TOKEN -> KafkaAclResourceTypeDTO.DELEGATION_TOKEN;
|
||||
case TRANSACTIONAL_ID -> KafkaAclResourceTypeDTO.TRANSACTIONAL_ID;
|
||||
case USER -> KafkaAclResourceTypeDTO.USER;
|
||||
case ANY -> throw new IllegalArgumentException("ANY type can be only part of filter");
|
||||
case UNKNOWN -> KafkaAclDTO.ResourceTypeEnum.UNKNOWN;
|
||||
case UNKNOWN -> KafkaAclResourceTypeDTO.UNKNOWN;
|
||||
};
|
||||
}
|
||||
|
||||
static ResourceType mapAclResourceTypeDto(KafkaAclResourceTypeDTO dto) {
|
||||
return ResourceType.valueOf(dto.name());
|
||||
}
|
||||
|
||||
static PatternType mapPatternTypeDto(KafkaAclNamePatternTypeDTO dto) {
|
||||
return PatternType.valueOf(dto.name());
|
||||
}
|
||||
|
||||
static AclBinding toAclBinding(KafkaAclDTO dto) {
|
||||
return new AclBinding(
|
||||
new ResourcePattern(
|
||||
ResourceType.valueOf(dto.getResourceType().name()),
|
||||
mapAclResourceTypeDto(dto.getResourceType()),
|
||||
dto.getResourceName(),
|
||||
PatternType.valueOf(dto.getNamePatternType().name())
|
||||
mapPatternTypeDto(dto.getNamePatternType())
|
||||
),
|
||||
new AccessControlEntry(
|
||||
dto.getPrincipal(),
|
||||
|
@ -171,7 +181,7 @@ public interface ClusterMapper {
|
|||
return new KafkaAclDTO()
|
||||
.resourceType(mapAclResourceType(pattern.resourceType()))
|
||||
.resourceName(pattern.name())
|
||||
.namePatternType(KafkaAclDTO.NamePatternTypeEnum.fromValue(pattern.patternType().name()))
|
||||
.namePatternType(KafkaAclNamePatternTypeDTO.fromValue(pattern.patternType().name()))
|
||||
.principal(filter.principal())
|
||||
.host(filter.host())
|
||||
.operation(mapAclOperation(filter.operation()))
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.kafka.common.Node;
|
|||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartitionReplica;
|
||||
import org.apache.kafka.common.acl.AccessControlEntryFilter;
|
||||
import org.apache.kafka.common.acl.AclBinding;
|
||||
import org.apache.kafka.common.acl.AclBindingFilter;
|
||||
import org.apache.kafka.common.acl.AclOperation;
|
||||
|
@ -74,6 +75,8 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
|
|||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
||||
import org.apache.kafka.common.resource.ResourcePattern;
|
||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
@ -592,9 +595,9 @@ public class ReactiveAdminClient implements Closeable {
|
|||
);
|
||||
}
|
||||
|
||||
public Mono<Collection<AclBinding>> listAcls() {
|
||||
public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
|
||||
Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
|
||||
return toMono(client.describeAcls(AclBindingFilter.ANY).values());
|
||||
return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
|
||||
}
|
||||
|
||||
public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
|
||||
|
|
|
@ -3,12 +3,12 @@ package com.provectus.kafka.ui.service.acl;
|
|||
import com.google.common.collect.Sets;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.service.AdminClientService;
|
||||
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.common.acl.AclBinding;
|
||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
@ -36,21 +36,21 @@ public class AclsService {
|
|||
.doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString));
|
||||
}
|
||||
|
||||
public Flux<AclBinding> listAcls(KafkaCluster cluster) {
|
||||
public Flux<AclBinding> listAcls(KafkaCluster cluster, ResourcePatternFilter filter) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ReactiveAdminClient::listAcls)
|
||||
.flatMap(c -> c.listAcls(filter))
|
||||
.flatMapIterable(acls -> acls);
|
||||
}
|
||||
|
||||
public Mono<String> getAclAsCsvString(KafkaCluster cluster) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ReactiveAdminClient::listAcls)
|
||||
.flatMap(c -> c.listAcls(ResourcePatternFilter.ANY))
|
||||
.map(AclCsv::transformToCsvString);
|
||||
}
|
||||
|
||||
public Mono<Void> syncAclWithAclCsv(KafkaCluster cluster, String csv) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.listAcls().flatMap(existingAclList -> {
|
||||
.flatMap(ac -> ac.listAcls(ResourcePatternFilter.ANY).flatMap(existingAclList -> {
|
||||
var existingSet = Set.copyOf(existingAclList);
|
||||
var newAcls = Set.copyOf(AclCsv.parseCsv(csv));
|
||||
var toDelete = Sets.difference(existingSet, newAcls);
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.apache.kafka.common.acl.AclOperation;
|
|||
import org.apache.kafka.common.acl.AclPermissionType;
|
||||
import org.apache.kafka.common.resource.PatternType;
|
||||
import org.apache.kafka.common.resource.ResourcePattern;
|
||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||
import org.apache.kafka.common.resource.ResourceType;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -49,7 +50,7 @@ class AclsServiceTest {
|
|||
new ResourcePattern(ResourceType.GROUP, "groupNew", PatternType.PREFIXED),
|
||||
new AccessControlEntry("User:test3", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY));
|
||||
|
||||
when(adminClientMock.listAcls())
|
||||
when(adminClientMock.listAcls(ResourcePatternFilter.ANY))
|
||||
.thenReturn(Mono.just(List.of(existingBinding1, existingBinding2)));
|
||||
|
||||
ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
|
||||
|
|
|
@ -1742,6 +1742,21 @@ paths:
|
|||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: resourceType
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
$ref: '#/components/schemas/KafkaAclResourceType'
|
||||
- name: resourceName
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
type: string
|
||||
- name: namePatternType
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
$ref: '#/components/schemas/KafkaAclNamePatternType'
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
|
@ -3444,22 +3459,11 @@ components:
|
|||
required: [resourceType, resourceName, namePatternType, principal, host, operation, permission]
|
||||
properties:
|
||||
resourceType:
|
||||
type: string
|
||||
enum:
|
||||
- UNKNOWN # Unknown operation, need to update mapping code on BE
|
||||
- TOPIC
|
||||
- GROUP
|
||||
- CLUSTER
|
||||
- TRANSACTIONAL_ID
|
||||
- DELEGATION_TOKEN
|
||||
- USER
|
||||
$ref: '#/components/schemas/KafkaAclResourceType'
|
||||
resourceName:
|
||||
type: string # "*" if acl can be applied to any resource of given type
|
||||
namePatternType:
|
||||
type: string
|
||||
enum:
|
||||
- LITERAL
|
||||
- PREFIXED
|
||||
$ref: '#/components/schemas/KafkaAclNamePatternType'
|
||||
principal:
|
||||
type: string
|
||||
host:
|
||||
|
@ -3487,6 +3491,24 @@ components:
|
|||
- ALLOW
|
||||
- DENY
|
||||
|
||||
KafkaAclResourceType:
|
||||
type: string
|
||||
enum:
|
||||
- UNKNOWN # Unknown operation, need to update mapping code on BE
|
||||
- TOPIC
|
||||
- GROUP
|
||||
- CLUSTER
|
||||
- TRANSACTIONAL_ID
|
||||
- DELEGATION_TOKEN
|
||||
- USER
|
||||
|
||||
KafkaAclNamePatternType:
|
||||
type: string
|
||||
enum:
|
||||
- MATCH
|
||||
- LITERAL
|
||||
- PREFIXED
|
||||
|
||||
RestartRequest:
|
||||
type: object
|
||||
properties:
|
||||
|
|
Loading…
Add table
Reference in a new issue