Endpoint for ACL creation for consumer, producer, stream app added
This commit is contained in:
parent
727f38401b
commit
1b3c131255
4 changed files with 564 additions and 5 deletions
|
@ -2,6 +2,9 @@ package com.provectus.kafka.ui.controller;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.api.AclsApi;
|
import com.provectus.kafka.ui.api.AclsApi;
|
||||||
import com.provectus.kafka.ui.mapper.ClusterMapper;
|
import com.provectus.kafka.ui.mapper.ClusterMapper;
|
||||||
|
import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
|
||||||
|
import com.provectus.kafka.ui.model.CreateProducerAclDTO;
|
||||||
|
import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
|
||||||
import com.provectus.kafka.ui.model.KafkaAclDTO;
|
import com.provectus.kafka.ui.model.KafkaAclDTO;
|
||||||
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
|
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
|
||||||
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
|
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
|
||||||
|
@ -112,4 +115,49 @@ public class AclsController extends AbstractController implements AclsApi {
|
||||||
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
|
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
|
||||||
.thenReturn(ResponseEntity.ok().build());
|
.thenReturn(ResponseEntity.ok().build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
|
||||||
|
Mono<CreateConsumerAclDTO> createConsumerAclDto,
|
||||||
|
ServerWebExchange exchange) {
|
||||||
|
AccessContext context = AccessContext.builder()
|
||||||
|
.cluster(clusterName)
|
||||||
|
.aclActions(AclAction.EDIT)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return accessControlService.validateAccess(context)
|
||||||
|
.then(createConsumerAclDto)
|
||||||
|
.flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
|
||||||
|
.thenReturn(ResponseEntity.ok().build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
|
||||||
|
Mono<CreateProducerAclDTO> createProducerAclDto,
|
||||||
|
ServerWebExchange exchange) {
|
||||||
|
AccessContext context = AccessContext.builder()
|
||||||
|
.cluster(clusterName)
|
||||||
|
.aclActions(AclAction.EDIT)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return accessControlService.validateAccess(context)
|
||||||
|
.then(createProducerAclDto)
|
||||||
|
.flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
|
||||||
|
.thenReturn(ResponseEntity.ok().build());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
|
||||||
|
Mono<CreateStreamAppAclDTO> createStreamAppAclDto,
|
||||||
|
ServerWebExchange exchange) {
|
||||||
|
AccessContext context = AccessContext.builder()
|
||||||
|
.cluster(clusterName)
|
||||||
|
.aclActions(AclAction.EDIT)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
return accessControlService.validateAccess(context)
|
||||||
|
.then(createStreamAppAclDto)
|
||||||
|
.flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
|
||||||
|
.thenReturn(ResponseEntity.ok().build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,15 +1,43 @@
|
||||||
package com.provectus.kafka.ui.service.acl;
|
package com.provectus.kafka.ui.service.acl;
|
||||||
|
|
||||||
|
import static org.apache.kafka.common.acl.AclOperation.ALL;
|
||||||
|
import static org.apache.kafka.common.acl.AclOperation.CREATE;
|
||||||
|
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
|
||||||
|
import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
|
||||||
|
import static org.apache.kafka.common.acl.AclOperation.READ;
|
||||||
|
import static org.apache.kafka.common.acl.AclOperation.WRITE;
|
||||||
|
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
|
||||||
|
import static org.apache.kafka.common.resource.PatternType.LITERAL;
|
||||||
|
import static org.apache.kafka.common.resource.PatternType.PREFIXED;
|
||||||
|
import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
|
||||||
|
import static org.apache.kafka.common.resource.ResourceType.GROUP;
|
||||||
|
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
|
||||||
|
import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
|
||||||
|
import com.provectus.kafka.ui.model.CreateProducerAclDTO;
|
||||||
|
import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
|
||||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
import com.provectus.kafka.ui.service.AdminClientService;
|
import com.provectus.kafka.ui.service.AdminClientService;
|
||||||
|
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.kafka.common.acl.AccessControlEntry;
|
||||||
import org.apache.kafka.common.acl.AclBinding;
|
import org.apache.kafka.common.acl.AclBinding;
|
||||||
|
import org.apache.kafka.common.acl.AclOperation;
|
||||||
|
import org.apache.kafka.common.resource.Resource;
|
||||||
|
import org.apache.kafka.common.resource.ResourcePattern;
|
||||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||||
|
import org.apache.kafka.common.resource.ResourceType;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.util.CollectionUtils;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
@ -21,11 +49,14 @@ public class AclsService {
|
||||||
private final AdminClientService adminClientService;
|
private final AdminClientService adminClientService;
|
||||||
|
|
||||||
public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
|
public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
|
||||||
var aclString = AclCsv.createAclString(aclBinding);
|
|
||||||
log.info("CREATING ACL: [{}]", aclString);
|
|
||||||
return adminClientService.get(cluster)
|
return adminClientService.get(cluster)
|
||||||
.flatMap(ac -> ac.createAcls(List.of(aclBinding)))
|
.flatMap(ac -> createAclsWithLogging(ac, List.of(aclBinding)));
|
||||||
.doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString));
|
}
|
||||||
|
|
||||||
|
private Mono<Void> createAclsWithLogging(ReactiveAdminClient ac, Collection<AclBinding> bindings) {
|
||||||
|
bindings.forEach(b -> log.info("CREATING ACL: [{}]", AclCsv.createAclString(b)));
|
||||||
|
return ac.createAcls(bindings)
|
||||||
|
.doOnSuccess(v -> bindings.forEach(b -> log.info("ACL CREATED: [{}]", AclCsv.createAclString(b))));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
|
public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
|
||||||
|
@ -90,4 +121,150 @@ public class AclsService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// creates allow binding for resources by prefix or specific names list
|
||||||
|
private List<AclBinding> createAllowBindings(ResourceType resourceType,
|
||||||
|
List<AclOperation> opsToAllow,
|
||||||
|
String principal,
|
||||||
|
String host,
|
||||||
|
@Nullable String resourcePrefix,
|
||||||
|
@Nullable Collection<String> resourceNames) {
|
||||||
|
List<AclBinding> bindings = new ArrayList<>();
|
||||||
|
if (resourcePrefix != null) {
|
||||||
|
for (var op : opsToAllow) {
|
||||||
|
bindings.add(
|
||||||
|
new AclBinding(
|
||||||
|
new ResourcePattern(resourceType, resourcePrefix, PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, op, ALLOW)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!CollectionUtils.isEmpty(resourceNames)) {
|
||||||
|
resourceNames.stream()
|
||||||
|
.distinct()
|
||||||
|
.forEach(resource ->
|
||||||
|
opsToAllow.forEach(op ->
|
||||||
|
bindings.add(
|
||||||
|
new AclBinding(
|
||||||
|
new ResourcePattern(resourceType, resource, LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, op, ALLOW)))));
|
||||||
|
}
|
||||||
|
return bindings;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Mono<Void> createConsumerAcl(KafkaCluster cluster, CreateConsumerAclDTO request) {
|
||||||
|
return adminClientService.get(cluster)
|
||||||
|
.flatMap(ac -> createAclsWithLogging(ac, createConsumerBindings(request)))
|
||||||
|
.then();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Read, Describe on topics, Read on consumerGroups
|
||||||
|
private List<AclBinding> createConsumerBindings(CreateConsumerAclDTO request) {
|
||||||
|
List<AclBinding> bindings = new ArrayList<>();
|
||||||
|
bindings.addAll(
|
||||||
|
createAllowBindings(TOPIC,
|
||||||
|
List.of(READ, DESCRIBE),
|
||||||
|
request.getPrincipal(),
|
||||||
|
request.getHost(),
|
||||||
|
request.getTopicsPrefix(),
|
||||||
|
request.getTopics()));
|
||||||
|
|
||||||
|
bindings.addAll(
|
||||||
|
createAllowBindings(
|
||||||
|
GROUP,
|
||||||
|
List.of(READ),
|
||||||
|
request.getPrincipal(),
|
||||||
|
request.getHost(),
|
||||||
|
request.getConsumerGroupsPrefix(),
|
||||||
|
request.getConsumerGroups()));
|
||||||
|
return bindings;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Mono<Void> createProducerAcl(KafkaCluster cluster, CreateProducerAclDTO request) {
|
||||||
|
return adminClientService.get(cluster)
|
||||||
|
.flatMap(ac -> createAclsWithLogging(ac, createProducerBindings(request)))
|
||||||
|
.then();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
|
||||||
|
//IDEMPOTENT_WRITE of cluster if idempotent is enabled
|
||||||
|
private List<AclBinding> createProducerBindings(CreateProducerAclDTO request) {
|
||||||
|
List<AclBinding> bindings = new ArrayList<>();
|
||||||
|
bindings.addAll(
|
||||||
|
createAllowBindings(
|
||||||
|
TOPIC,
|
||||||
|
List.of(WRITE, DESCRIBE, CREATE),
|
||||||
|
request.getPrincipal(),
|
||||||
|
request.getHost(),
|
||||||
|
request.getTopicsPrefix(),
|
||||||
|
request.getTopics()));
|
||||||
|
|
||||||
|
bindings.addAll(
|
||||||
|
createAllowBindings(
|
||||||
|
TRANSACTIONAL_ID,
|
||||||
|
List.of(WRITE, DESCRIBE),
|
||||||
|
request.getPrincipal(),
|
||||||
|
request.getHost(),
|
||||||
|
request.getTransactionsIdPrefix(),
|
||||||
|
Optional.ofNullable(request.getTransactionalId()).map(List::of).orElse(null)));
|
||||||
|
|
||||||
|
if (Boolean.TRUE.equals(request.getIdempotent())) {
|
||||||
|
bindings.addAll(
|
||||||
|
createAllowBindings(
|
||||||
|
CLUSTER,
|
||||||
|
List.of(IDEMPOTENT_WRITE),
|
||||||
|
request.getPrincipal(),
|
||||||
|
request.getHost(),
|
||||||
|
null,
|
||||||
|
List.of(Resource.CLUSTER_NAME))); // cluster name is a const string in ACL api
|
||||||
|
}
|
||||||
|
return bindings;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Mono<Void> createStreamAppAcl(KafkaCluster cluster, CreateStreamAppAclDTO request) {
|
||||||
|
return adminClientService.get(cluster)
|
||||||
|
.flatMap(ac -> createAclsWithLogging(ac, createStreamAppBindings(request)))
|
||||||
|
.then();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read on input topics, Write on output topics
|
||||||
|
// ALL on applicationId-prefixed Groups and Topics
|
||||||
|
private List<AclBinding> createStreamAppBindings(CreateStreamAppAclDTO request) {
|
||||||
|
List<AclBinding> bindings = new ArrayList<>();
|
||||||
|
bindings.addAll(
|
||||||
|
createAllowBindings(
|
||||||
|
TOPIC,
|
||||||
|
List.of(READ),
|
||||||
|
request.getPrincipal(),
|
||||||
|
request.getHost(),
|
||||||
|
null,
|
||||||
|
request.getInputTopics()));
|
||||||
|
|
||||||
|
bindings.addAll(
|
||||||
|
createAllowBindings(
|
||||||
|
TOPIC,
|
||||||
|
List.of(WRITE),
|
||||||
|
request.getPrincipal(),
|
||||||
|
request.getHost(),
|
||||||
|
null,
|
||||||
|
request.getOutputTopics()));
|
||||||
|
|
||||||
|
bindings.addAll(
|
||||||
|
createAllowBindings(
|
||||||
|
GROUP,
|
||||||
|
List.of(ALL),
|
||||||
|
request.getPrincipal(),
|
||||||
|
request.getHost(),
|
||||||
|
request.getApplicationId(),
|
||||||
|
null));
|
||||||
|
|
||||||
|
bindings.addAll(
|
||||||
|
createAllowBindings(
|
||||||
|
TOPIC,
|
||||||
|
List.of(ALL),
|
||||||
|
request.getPrincipal(),
|
||||||
|
request.getHost(),
|
||||||
|
request.getApplicationId(),
|
||||||
|
null));
|
||||||
|
return bindings;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,16 +4,21 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
|
||||||
|
import com.provectus.kafka.ui.model.CreateProducerAclDTO;
|
||||||
|
import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
|
||||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
import com.provectus.kafka.ui.service.AdminClientService;
|
import com.provectus.kafka.ui.service.AdminClientService;
|
||||||
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
import org.apache.kafka.common.acl.AccessControlEntry;
|
import org.apache.kafka.common.acl.AccessControlEntry;
|
||||||
import org.apache.kafka.common.acl.AclBinding;
|
import org.apache.kafka.common.acl.AclBinding;
|
||||||
import org.apache.kafka.common.acl.AclOperation;
|
import org.apache.kafka.common.acl.AclOperation;
|
||||||
import org.apache.kafka.common.acl.AclPermissionType;
|
import org.apache.kafka.common.acl.AclPermissionType;
|
||||||
import org.apache.kafka.common.resource.PatternType;
|
import org.apache.kafka.common.resource.PatternType;
|
||||||
|
import org.apache.kafka.common.resource.Resource;
|
||||||
import org.apache.kafka.common.resource.ResourcePattern;
|
import org.apache.kafka.common.resource.ResourcePattern;
|
||||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||||
import org.apache.kafka.common.resource.ResourceType;
|
import org.apache.kafka.common.resource.ResourceType;
|
||||||
|
@ -79,4 +84,207 @@ class AclsServiceTest {
|
||||||
.contains(existingBinding2);
|
.contains(existingBinding2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void createsConsumerDependantAcls() {
|
||||||
|
ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
|
||||||
|
when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
|
||||||
|
.thenReturn(Mono.empty());
|
||||||
|
|
||||||
|
var principal = UUID.randomUUID().toString();
|
||||||
|
var host = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
aclsService.createConsumerAcl(
|
||||||
|
CLUSTER,
|
||||||
|
new CreateConsumerAclDTO()
|
||||||
|
.principal(principal)
|
||||||
|
.host(host)
|
||||||
|
.consumerGroups(List.of("cg1", "cg2"))
|
||||||
|
.topics(List.of("t1", "t2"))
|
||||||
|
).block();
|
||||||
|
|
||||||
|
//Read, Describe on topics, Read on consumerGroups
|
||||||
|
Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
|
||||||
|
assertThat(createdBindings)
|
||||||
|
.hasSize(6)
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.GROUP, "cg1", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.GROUP, "cg2", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void createsConsumerDependantAclsWhenTopicsAndGroupsSpecifiedByPrefix() {
|
||||||
|
ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
|
||||||
|
when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
|
||||||
|
.thenReturn(Mono.empty());
|
||||||
|
|
||||||
|
var principal = UUID.randomUUID().toString();
|
||||||
|
var host = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
aclsService.createConsumerAcl(
|
||||||
|
CLUSTER,
|
||||||
|
new CreateConsumerAclDTO()
|
||||||
|
.principal(principal)
|
||||||
|
.host(host)
|
||||||
|
.consumerGroupsPrefix("cgPref")
|
||||||
|
.topicsPrefix("topicPref")
|
||||||
|
).block();
|
||||||
|
|
||||||
|
//Read, Describe on topics, Read on consumerGroups
|
||||||
|
Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
|
||||||
|
assertThat(createdBindings)
|
||||||
|
.hasSize(3)
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.GROUP, "cgPref", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void createsProducerDependantAcls() {
|
||||||
|
ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
|
||||||
|
when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
|
||||||
|
.thenReturn(Mono.empty());
|
||||||
|
|
||||||
|
var principal = UUID.randomUUID().toString();
|
||||||
|
var host = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
aclsService.createProducerAcl(
|
||||||
|
CLUSTER,
|
||||||
|
new CreateProducerAclDTO()
|
||||||
|
.principal(principal)
|
||||||
|
.host(host)
|
||||||
|
.topics(List.of("t1"))
|
||||||
|
.idempotent(true)
|
||||||
|
.transactionalId("txId1")
|
||||||
|
).block();
|
||||||
|
|
||||||
|
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
|
||||||
|
//IDEMPOTENT_WRITE of cluster if idempotent is enabled (true)
|
||||||
|
Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
|
||||||
|
assertThat(createdBindings)
|
||||||
|
.hasSize(6)
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.IDEMPOTENT_WRITE, AclPermissionType.ALLOW)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void createsProducerDependantAclsWhenTopicsAndTxIdSpecifiedByPrefix() {
|
||||||
|
ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
|
||||||
|
when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
|
||||||
|
.thenReturn(Mono.empty());
|
||||||
|
|
||||||
|
var principal = UUID.randomUUID().toString();
|
||||||
|
var host = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
aclsService.createProducerAcl(
|
||||||
|
CLUSTER,
|
||||||
|
new CreateProducerAclDTO()
|
||||||
|
.principal(principal)
|
||||||
|
.host(host)
|
||||||
|
.topicsPrefix("topicPref")
|
||||||
|
.transactionsIdPrefix("txIdPref")
|
||||||
|
.idempotent(false)
|
||||||
|
).block();
|
||||||
|
|
||||||
|
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
|
||||||
|
//IDEMPOTENT_WRITE of cluster if idempotent is enabled (false)
|
||||||
|
Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
|
||||||
|
assertThat(createdBindings)
|
||||||
|
.hasSize(5)
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void createsStreamAppDependantAcls() {
|
||||||
|
ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
|
||||||
|
when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
|
||||||
|
.thenReturn(Mono.empty());
|
||||||
|
|
||||||
|
var principal = UUID.randomUUID().toString();
|
||||||
|
var host = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
aclsService.createStreamAppAcl(
|
||||||
|
CLUSTER,
|
||||||
|
new CreateStreamAppAclDTO()
|
||||||
|
.principal(principal)
|
||||||
|
.host(host)
|
||||||
|
.inputTopics(List.of("t1"))
|
||||||
|
.outputTopics(List.of("t2", "t3"))
|
||||||
|
.applicationId("appId1")
|
||||||
|
).block();
|
||||||
|
|
||||||
|
// Read on input topics, Write on output topics
|
||||||
|
// ALL on applicationId-prefixed Groups and Topics
|
||||||
|
Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
|
||||||
|
assertThat(createdBindings)
|
||||||
|
.hasSize(5)
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "t3", PatternType.LITERAL),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.GROUP, "appId1", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)))
|
||||||
|
.contains(new AclBinding(
|
||||||
|
new ResourcePattern(ResourceType.TOPIC, "appId1", PatternType.PREFIXED),
|
||||||
|
new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1849,6 +1849,69 @@ paths:
|
||||||
404:
|
404:
|
||||||
description: Acl not found
|
description: Acl not found
|
||||||
|
|
||||||
|
/api/clusters/{clusterName}/acl/consumer:
|
||||||
|
post:
|
||||||
|
tags:
|
||||||
|
- Acls
|
||||||
|
summary: createConsumerAcl
|
||||||
|
operationId: createConsumerAcl
|
||||||
|
parameters:
|
||||||
|
- name: clusterName
|
||||||
|
in: path
|
||||||
|
required: true
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
requestBody:
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: '#/components/schemas/CreateConsumerAcl'
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: OK
|
||||||
|
|
||||||
|
/api/clusters/{clusterName}/acl/producer:
|
||||||
|
post:
|
||||||
|
tags:
|
||||||
|
- Acls
|
||||||
|
summary: createProducerAcl
|
||||||
|
operationId: createProducerAcl
|
||||||
|
parameters:
|
||||||
|
- name: clusterName
|
||||||
|
in: path
|
||||||
|
required: true
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
requestBody:
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: '#/components/schemas/CreateProducerAcl'
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: OK
|
||||||
|
|
||||||
|
/api/clusters/{clusterName}/acl/streamApp:
|
||||||
|
post:
|
||||||
|
tags:
|
||||||
|
- Acls
|
||||||
|
summary: createStreamAppAcl
|
||||||
|
operationId: createStreamAppAcl
|
||||||
|
parameters:
|
||||||
|
- name: clusterName
|
||||||
|
in: path
|
||||||
|
required: true
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
requestBody:
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: '#/components/schemas/CreateStreamAppAcl'
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: OK
|
||||||
|
|
||||||
/api/authorization:
|
/api/authorization:
|
||||||
get:
|
get:
|
||||||
tags:
|
tags:
|
||||||
|
@ -3478,7 +3541,7 @@ components:
|
||||||
principal:
|
principal:
|
||||||
type: string
|
type: string
|
||||||
host:
|
host:
|
||||||
type: string # "*" if acl can be applied to any resource of given type
|
type: string
|
||||||
operation:
|
operation:
|
||||||
type: string
|
type: string
|
||||||
enum:
|
enum:
|
||||||
|
@ -3502,6 +3565,69 @@ components:
|
||||||
- ALLOW
|
- ALLOW
|
||||||
- DENY
|
- DENY
|
||||||
|
|
||||||
|
CreateConsumerAcl:
|
||||||
|
type: object
|
||||||
|
required: [principal, host]
|
||||||
|
properties:
|
||||||
|
principal:
|
||||||
|
type: string
|
||||||
|
host:
|
||||||
|
type: string
|
||||||
|
topics:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
type: string
|
||||||
|
topicsPrefix:
|
||||||
|
type: string
|
||||||
|
consumerGroups:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
type: string
|
||||||
|
consumerGroupsPrefix:
|
||||||
|
type: string
|
||||||
|
|
||||||
|
CreateProducerAcl:
|
||||||
|
type: object
|
||||||
|
required: [principal, host]
|
||||||
|
properties:
|
||||||
|
principal:
|
||||||
|
type: string
|
||||||
|
host:
|
||||||
|
type: string
|
||||||
|
topics:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
type: string
|
||||||
|
topicsPrefix:
|
||||||
|
type: string
|
||||||
|
transactionalId:
|
||||||
|
type: string
|
||||||
|
transactionsIdPrefix:
|
||||||
|
type: string
|
||||||
|
idempotent:
|
||||||
|
type: boolean
|
||||||
|
default: false
|
||||||
|
|
||||||
|
CreateStreamAppAcl:
|
||||||
|
type: object
|
||||||
|
required: [principal, host, applicationId, inputTopics, outputTopics]
|
||||||
|
properties:
|
||||||
|
principal:
|
||||||
|
type: string
|
||||||
|
host:
|
||||||
|
type: string
|
||||||
|
inputTopics:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
type: string
|
||||||
|
outputTopics:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
type: string
|
||||||
|
applicationId:
|
||||||
|
nullable: false
|
||||||
|
type: string
|
||||||
|
|
||||||
KafkaAclResourceType:
|
KafkaAclResourceType:
|
||||||
type: string
|
type: string
|
||||||
enum:
|
enum:
|
||||||
|
|
Loading…
Add table
Reference in a new issue