Kaynağa Gözat

BE: Impl ACL endpoints for consumer, producer, stream apps (#3783)

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Ilya Kuramshin 1 yıl önce
ebeveyn
işleme
1cd303a90b

+ 54 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java

@@ -2,6 +2,9 @@ package com.provectus.kafka.ui.controller;
 
 import com.provectus.kafka.ui.api.AclsApi;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaAclDTO;
 import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
 import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
@@ -123,4 +126,55 @@ public class AclsController extends AbstractController implements AclsApi {
         .doOnEach(sig -> auditService.audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
   }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
+                                                      Mono<CreateConsumerAclDTO> createConsumerAclDto,
+                                                      ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createConsumerAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createConsumerAclDto)
+        .flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
+                                                      Mono<CreateProducerAclDTO> createProducerAclDto,
+                                                      ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createProducerAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createProducerAclDto)
+        .flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
+
+  @Override
+  public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
+                                                       Mono<CreateStreamAppAclDTO> createStreamAppAclDto,
+                                                       ServerWebExchange exchange) {
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .aclActions(AclAction.EDIT)
+        .operationName("createStreamAppAcl")
+        .build();
+
+    return accessControlService.validateAccess(context)
+        .then(createStreamAppAclDto)
+        .flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
+  }
 }

+ 181 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java

@@ -1,16 +1,44 @@
 package com.provectus.kafka.ui.service.acl;
 
+import static org.apache.kafka.common.acl.AclOperation.ALL;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
+import static org.apache.kafka.common.resource.ResourceType.GROUP;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
+
 import com.google.common.collect.Sets;
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.AdminClientService;
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
 import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -22,11 +50,14 @@ public class AclsService {
   private final AdminClientService adminClientService;
 
   public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
-    var aclString = AclCsv.createAclString(aclBinding);
-    log.info("CREATING ACL: [{}]", aclString);
     return adminClientService.get(cluster)
-        .flatMap(ac -> ac.createAcls(List.of(aclBinding)))
-        .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString));
+        .flatMap(ac -> createAclsWithLogging(ac, List.of(aclBinding)));
+  }
+
+  private Mono<Void> createAclsWithLogging(ReactiveAdminClient ac, Collection<AclBinding> bindings) {
+    bindings.forEach(b -> log.info("CREATING ACL: [{}]", AclCsv.createAclString(b)));
+    return ac.createAcls(bindings)
+        .doOnSuccess(v -> bindings.forEach(b -> log.info("ACL CREATED: [{}]", AclCsv.createAclString(b))));
   }
 
   public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
@@ -92,4 +123,150 @@ public class AclsService {
     }
   }
 
+  // creates allow binding for resources by prefix or specific names list
+  private List<AclBinding> createAllowBindings(ResourceType resourceType,
+                                               List<AclOperation> opsToAllow,
+                                               String principal,
+                                               String host,
+                                               @Nullable String resourcePrefix,
+                                               @Nullable Collection<String> resourceNames) {
+    List<AclBinding> bindings = new ArrayList<>();
+    if (resourcePrefix != null) {
+      for (var op : opsToAllow) {
+        bindings.add(
+            new AclBinding(
+                new ResourcePattern(resourceType, resourcePrefix, PREFIXED),
+                new AccessControlEntry(principal, host, op, ALLOW)));
+      }
+    }
+    if (!CollectionUtils.isEmpty(resourceNames)) {
+      resourceNames.stream()
+          .distinct()
+          .forEach(resource ->
+              opsToAllow.forEach(op ->
+                  bindings.add(
+                      new AclBinding(
+                          new ResourcePattern(resourceType, resource, LITERAL),
+                          new AccessControlEntry(principal, host, op, ALLOW)))));
+    }
+    return bindings;
+  }
+
+  public Mono<Void> createConsumerAcl(KafkaCluster cluster, CreateConsumerAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createConsumerBindings(request)))
+        .then();
+  }
+
+  //Read, Describe on topics, Read on consumerGroups
+  private List<AclBinding> createConsumerBindings(CreateConsumerAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(TOPIC,
+            List.of(READ, DESCRIBE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTopicsPrefix(),
+            request.getTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            GROUP,
+            List.of(READ),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getConsumerGroupsPrefix(),
+            request.getConsumerGroups()));
+    return bindings;
+  }
+
+  public Mono<Void> createProducerAcl(KafkaCluster cluster, CreateProducerAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createProducerBindings(request)))
+        .then();
+  }
+
+  //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+  //IDEMPOTENT_WRITE on cluster if idempotent is enabled
+  private List<AclBinding> createProducerBindings(CreateProducerAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(WRITE, DESCRIBE, CREATE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTopicsPrefix(),
+            request.getTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            TRANSACTIONAL_ID,
+            List.of(WRITE, DESCRIBE),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getTransactionsIdPrefix(),
+            Optional.ofNullable(request.getTransactionalId()).map(List::of).orElse(null)));
+
+    if (Boolean.TRUE.equals(request.getIdempotent())) {
+      bindings.addAll(
+          createAllowBindings(
+              CLUSTER,
+              List.of(IDEMPOTENT_WRITE),
+              request.getPrincipal(),
+              request.getHost(),
+              null,
+              List.of(Resource.CLUSTER_NAME))); // cluster name is a const string in ACL api
+    }
+    return bindings;
+  }
+
+  public Mono<Void> createStreamAppAcl(KafkaCluster cluster, CreateStreamAppAclDTO request) {
+    return adminClientService.get(cluster)
+        .flatMap(ac -> createAclsWithLogging(ac, createStreamAppBindings(request)))
+        .then();
+  }
+
+  // Read on input topics, Write on output topics
+  // ALL on applicationId-prefixed Groups and Topics
+  private List<AclBinding> createStreamAppBindings(CreateStreamAppAclDTO request) {
+    List<AclBinding> bindings = new ArrayList<>();
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(READ),
+            request.getPrincipal(),
+            request.getHost(),
+            null,
+            request.getInputTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(WRITE),
+            request.getPrincipal(),
+            request.getHost(),
+            null,
+            request.getOutputTopics()));
+
+    bindings.addAll(
+        createAllowBindings(
+            GROUP,
+            List.of(ALL),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getApplicationId(),
+            null));
+
+    bindings.addAll(
+        createAllowBindings(
+            TOPIC,
+            List.of(ALL),
+            request.getPrincipal(),
+            request.getHost(),
+            request.getApplicationId(),
+            null));
+    return bindings;
+  }
+
 }

+ 214 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java

@@ -4,16 +4,21 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
+import com.provectus.kafka.ui.model.CreateProducerAclDTO;
+import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.AdminClientService;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.resource.ResourceType;
@@ -53,12 +58,12 @@ class AclsServiceTest {
     when(adminClientMock.listAcls(ResourcePatternFilter.ANY))
         .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2)));
 
-    ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
-    when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
         .thenReturn(Mono.empty());
 
-    ArgumentCaptor<?> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
-    when(adminClientMock.deleteAcls((Collection<AclBinding>) deletedCaptor.capture()))
+    ArgumentCaptor<Collection<AclBinding>> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.deleteAcls(deletedCaptor.capture()))
         .thenReturn(Mono.empty());
 
     aclsService.syncAclWithAclCsv(
@@ -68,15 +73,218 @@ class AclsServiceTest {
             + "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
     ).block();
 
-    Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
     assertThat(createdBindings)
         .hasSize(1)
         .contains(newBindingToBeAdded);
 
-    Collection<AclBinding> deletedBindings = (Collection<AclBinding>) deletedCaptor.getValue();
+    Collection<AclBinding> deletedBindings = deletedCaptor.getValue();
     assertThat(deletedBindings)
         .hasSize(1)
         .contains(existingBinding2);
   }
 
+
+  @Test
+  void createsConsumerDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createConsumerAcl(
+        CLUSTER,
+        new CreateConsumerAclDTO()
+            .principal(principal)
+            .host(host)
+            .consumerGroups(List.of("cg1", "cg2"))
+            .topics(List.of("t1", "t2"))
+    ).block();
+
+    //Read, Describe on topics, Read on consumerGroups
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(6)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cg1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cg2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
+  }
+
+  @Test
+  void createsConsumerDependantAclsWhenTopicsAndGroupsSpecifiedByPrefix() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createConsumerAcl(
+        CLUSTER,
+        new CreateConsumerAclDTO()
+            .principal(principal)
+            .host(host)
+            .consumerGroupsPrefix("cgPref")
+            .topicsPrefix("topicPref")
+    ).block();
+
+    //Read, Describe on topics, Read on consumerGroups
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(3)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "cgPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
+  }
+
+  @Test
+  void createsProducerDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createProducerAcl(
+        CLUSTER,
+        new CreateProducerAclDTO()
+            .principal(principal)
+            .host(host)
+            .topics(List.of("t1"))
+            .idempotent(true)
+            .transactionalId("txId1")
+    ).block();
+
+    //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+    //IDEMPOTENT_WRITE on cluster if idempotent is enabled (true)
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(6)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.IDEMPOTENT_WRITE, AclPermissionType.ALLOW)));
+  }
+
+
+  @Test
+  void createsProducerDependantAclsWhenTopicsAndTxIdSpecifiedByPrefix() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createProducerAcl(
+        CLUSTER,
+        new CreateProducerAclDTO()
+            .principal(principal)
+            .host(host)
+            .topicsPrefix("topicPref")
+            .transactionsIdPrefix("txIdPref")
+            .idempotent(false)
+    ).block();
+
+    //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+    //IDEMPOTENT_WRITE on cluster if idempotent is enabled (false)
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(5)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
+  }
+
+
+  @Test
+  void createsStreamAppDependantAcls() {
+    ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+    when(adminClientMock.createAcls(createdCaptor.capture()))
+        .thenReturn(Mono.empty());
+
+    var principal = UUID.randomUUID().toString();
+    var host = UUID.randomUUID().toString();
+
+    aclsService.createStreamAppAcl(
+        CLUSTER,
+        new CreateStreamAppAclDTO()
+            .principal(principal)
+            .host(host)
+            .inputTopics(List.of("t1"))
+            .outputTopics(List.of("t2", "t3"))
+            .applicationId("appId1")
+    ).block();
+
+    // Read on input topics, Write on output topics
+    // ALL on applicationId-prefixed Groups and Topics
+    Collection<AclBinding> createdBindings = createdCaptor.getValue();
+    assertThat(createdBindings)
+        .hasSize(5)
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "t3", PatternType.LITERAL),
+            new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.GROUP, "appId1", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)))
+        .contains(new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "appId1", PatternType.PREFIXED),
+            new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)));
+  }
 }

+ 127 - 1
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1868,6 +1868,69 @@ paths:
         404:
           description: Acl not found
 
+  /api/clusters/{clusterName}/acl/consumer:
+    post:
+      tags:
+        - Acls
+      summary: createConsumerAcl
+      operationId: createConsumerAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateConsumerAcl'
+      responses:
+        200:
+          description: OK
+
+  /api/clusters/{clusterName}/acl/producer:
+    post:
+      tags:
+        - Acls
+      summary: createProducerAcl
+      operationId: createProducerAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateProducerAcl'
+      responses:
+        200:
+          description: OK
+
+  /api/clusters/{clusterName}/acl/streamApp:
+    post:
+      tags:
+        - Acls
+      summary: createStreamAppAcl
+      operationId: createStreamAppAcl
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateStreamAppAcl'
+      responses:
+        200:
+          description: OK
+
   /api/authorization:
     get:
       tags:
@@ -3551,7 +3614,7 @@ components:
         principal:
           type: string
         host:
-          type: string  # "*" if acl can be applied to any resource of given type
+          type: string
         operation:
           type: string
           enum:
@@ -3575,6 +3638,69 @@ components:
             - ALLOW
             - DENY
 
+    CreateConsumerAcl:
+      type: object
+      required: [principal, host]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        topics:
+          type: array
+          items:
+            type: string
+        topicsPrefix:
+          type: string
+        consumerGroups:
+          type: array
+          items:
+            type: string
+        consumerGroupsPrefix:
+          type: string
+
+    CreateProducerAcl:
+      type: object
+      required: [principal, host]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        topics:
+          type: array
+          items:
+            type: string
+        topicsPrefix:
+          type: string
+        transactionalId:
+          type: string
+        transactionsIdPrefix:
+          type: string
+        idempotent:
+          type: boolean
+          default: false
+
+    CreateStreamAppAcl:
+      type: object
+      required: [principal, host, applicationId, inputTopics, outputTopics]
+      properties:
+        principal:
+          type: string
+        host:
+          type: string
+        inputTopics:
+          type: array
+          items:
+            type: string
+        outputTopics:
+          type: array
+          items:
+            type: string
+        applicationId:
+          nullable: false
+          type: string
+
     KafkaAclResourceType:
       type: string
       enum: