diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java index 83d2ef553e..63b4662962 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java @@ -2,6 +2,9 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.AclsApi; import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.CreateConsumerAclDTO; +import com.provectus.kafka.ui.model.CreateProducerAclDTO; +import com.provectus.kafka.ui.model.CreateStreamAppAclDTO; import com.provectus.kafka.ui.model.KafkaAclDTO; import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO; import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO; @@ -112,4 +115,49 @@ public class AclsController extends AbstractController implements AclsApi { .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv)) .thenReturn(ResponseEntity.ok().build()); } + + @Override + public Mono> createConsumerAcl(String clusterName, + Mono createConsumerAclDto, + ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(createConsumerAclDto) + .flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono> createProducerAcl(String clusterName, + Mono createProducerAclDto, + ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(createProducerAclDto) + .flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono> createStreamAppAcl(String clusterName, + Mono createStreamAppAclDto, + ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(createStreamAppAclDto) + .flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req)) + .thenReturn(ResponseEntity.ok().build()); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java index 8c5a8dab06..79d4e4cdfe 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java @@ -1,15 +1,43 @@ package com.provectus.kafka.ui.service.acl; +import static org.apache.kafka.common.acl.AclOperation.ALL; +import static org.apache.kafka.common.acl.AclOperation.CREATE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; +import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE; +import static org.apache.kafka.common.acl.AclOperation.READ; +import static org.apache.kafka.common.acl.AclOperation.WRITE; +import static org.apache.kafka.common.acl.AclPermissionType.ALLOW; +import static org.apache.kafka.common.resource.PatternType.LITERAL; +import static org.apache.kafka.common.resource.PatternType.PREFIXED; +import static org.apache.kafka.common.resource.ResourceType.CLUSTER; +import static org.apache.kafka.common.resource.ResourceType.GROUP; +import static org.apache.kafka.common.resource.ResourceType.TOPIC; +import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID; + import com.google.common.collect.Sets; +import com.provectus.kafka.ui.model.CreateConsumerAclDTO; +import com.provectus.kafka.ui.model.CreateProducerAclDTO; +import com.provectus.kafka.ui.model.CreateStreamAppAclDTO; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.Set; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -21,11 +49,14 @@ public class AclsService { private final AdminClientService adminClientService; public Mono createAcl(KafkaCluster cluster, AclBinding aclBinding) { - var aclString = AclCsv.createAclString(aclBinding); - log.info("CREATING ACL: [{}]", aclString); return adminClientService.get(cluster) - .flatMap(ac -> ac.createAcls(List.of(aclBinding))) - .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString)); + .flatMap(ac -> createAclsWithLogging(ac, List.of(aclBinding))); + } + + private Mono createAclsWithLogging(ReactiveAdminClient ac, Collection bindings) { + bindings.forEach(b -> log.info("CREATING ACL: [{}]", AclCsv.createAclString(b))); + return ac.createAcls(bindings) + .doOnSuccess(v -> bindings.forEach(b -> log.info("ACL CREATED: [{}]", AclCsv.createAclString(b)))); } public Mono deleteAcl(KafkaCluster cluster, AclBinding aclBinding) { @@ -90,4 +121,150 @@ public class AclsService { } } + // creates allow binding for resources by prefix or specific names list + private List createAllowBindings(ResourceType resourceType, + List opsToAllow, + String principal, + String host, + @Nullable String resourcePrefix, + @Nullable Collection resourceNames) { + List bindings = new ArrayList<>(); + if (resourcePrefix != null) { + for (var op : opsToAllow) { + bindings.add( + new AclBinding( + new ResourcePattern(resourceType, resourcePrefix, PREFIXED), + new AccessControlEntry(principal, host, op, ALLOW))); + } + } + if (!CollectionUtils.isEmpty(resourceNames)) { + resourceNames.stream() + .distinct() + .forEach(resource -> + opsToAllow.forEach(op -> + bindings.add( + new AclBinding( + new ResourcePattern(resourceType, resource, LITERAL), + new AccessControlEntry(principal, host, op, ALLOW))))); + } + return bindings; + } + + public Mono createConsumerAcl(KafkaCluster cluster, CreateConsumerAclDTO request) { + return adminClientService.get(cluster) + .flatMap(ac -> createAclsWithLogging(ac, createConsumerBindings(request))) + .then(); + } + + //Read, Describe on topics, Read on consumerGroups + private List createConsumerBindings(CreateConsumerAclDTO request) { + List bindings = new ArrayList<>(); + bindings.addAll( + createAllowBindings(TOPIC, + List.of(READ, DESCRIBE), + request.getPrincipal(), + request.getHost(), + request.getTopicsPrefix(), + request.getTopics())); + + bindings.addAll( + createAllowBindings( + GROUP, + List.of(READ), + request.getPrincipal(), + request.getHost(), + request.getConsumerGroupsPrefix(), + request.getConsumerGroups())); + return bindings; + } + + public Mono createProducerAcl(KafkaCluster cluster, CreateProducerAclDTO request) { + return adminClientService.get(cluster) + .flatMap(ac -> createAclsWithLogging(ac, createProducerBindings(request))) + .then(); + } + + //Write, Describe, Create permission on topics, Write, Describe on transactionalIds + //IDEMPOTENT_WRITE of cluster if idempotent is enabled + private List createProducerBindings(CreateProducerAclDTO request) { + List bindings = new ArrayList<>(); + bindings.addAll( + createAllowBindings( + TOPIC, + List.of(WRITE, DESCRIBE, CREATE), + request.getPrincipal(), + request.getHost(), + request.getTopicsPrefix(), + request.getTopics())); + + bindings.addAll( + createAllowBindings( + TRANSACTIONAL_ID, + List.of(WRITE, DESCRIBE), + request.getPrincipal(), + request.getHost(), + request.getTransactionsIdPrefix(), + Optional.ofNullable(request.getTransactionalId()).map(List::of).orElse(null))); + + if (Boolean.TRUE.equals(request.getIdempotent())) { + bindings.addAll( + createAllowBindings( + CLUSTER, + List.of(IDEMPOTENT_WRITE), + request.getPrincipal(), + request.getHost(), + null, + List.of(Resource.CLUSTER_NAME))); // cluster name is a const string in ACL api + } + return bindings; + } + + public Mono createStreamAppAcl(KafkaCluster cluster, CreateStreamAppAclDTO request) { + return adminClientService.get(cluster) + .flatMap(ac -> createAclsWithLogging(ac, createStreamAppBindings(request))) + .then(); + } + + // Read on input topics, Write on output topics + // ALL on applicationId-prefixed Groups and Topics + private List createStreamAppBindings(CreateStreamAppAclDTO request) { + List bindings = new ArrayList<>(); + bindings.addAll( + createAllowBindings( + TOPIC, + List.of(READ), + request.getPrincipal(), + request.getHost(), + null, + request.getInputTopics())); + + bindings.addAll( + createAllowBindings( + TOPIC, + List.of(WRITE), + request.getPrincipal(), + request.getHost(), + null, + request.getOutputTopics())); + + bindings.addAll( + createAllowBindings( + GROUP, + List.of(ALL), + request.getPrincipal(), + request.getHost(), + request.getApplicationId(), + null)); + + bindings.addAll( + createAllowBindings( + TOPIC, + List.of(ALL), + request.getPrincipal(), + request.getHost(), + request.getApplicationId(), + null)); + return bindings; + } + } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java index 5791bb2041..b30305308b 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java @@ -4,16 +4,21 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.provectus.kafka.ui.model.CreateConsumerAclDTO; +import com.provectus.kafka.ui.model.CreateProducerAclDTO; +import com.provectus.kafka.ui.model.CreateStreamAppAclDTO; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.AdminClientService; import com.provectus.kafka.ui.service.ReactiveAdminClient; import java.util.Collection; import java.util.List; +import java.util.UUID; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; import org.apache.kafka.common.resource.ResourceType; @@ -79,4 +84,207 @@ class AclsServiceTest { .contains(existingBinding2); } + + @Test + void createsConsumerDependantAcls() { + ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.createAcls((Collection) createdCaptor.capture())) + .thenReturn(Mono.empty()); + + var principal = UUID.randomUUID().toString(); + var host = UUID.randomUUID().toString(); + + aclsService.createConsumerAcl( + CLUSTER, + new CreateConsumerAclDTO() + .principal(principal) + .host(host) + .consumerGroups(List.of("cg1", "cg2")) + .topics(List.of("t1", "t2")) + ).block(); + + //Read, Describe on topics, Read on consumerGroups + Collection createdBindings = (Collection) createdCaptor.getValue(); + assertThat(createdBindings) + .hasSize(6) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.GROUP, "cg1", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.GROUP, "cg2", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW))); + } + + @Test + void createsConsumerDependantAclsWhenTopicsAndGroupsSpecifiedByPrefix() { + ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.createAcls((Collection) createdCaptor.capture())) + .thenReturn(Mono.empty()); + + var principal = UUID.randomUUID().toString(); + var host = UUID.randomUUID().toString(); + + aclsService.createConsumerAcl( + CLUSTER, + new CreateConsumerAclDTO() + .principal(principal) + .host(host) + .consumerGroupsPrefix("cgPref") + .topicsPrefix("topicPref") + ).block(); + + //Read, Describe on topics, Read on consumerGroups + Collection createdBindings = (Collection) createdCaptor.getValue(); + assertThat(createdBindings) + .hasSize(3) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.GROUP, "cgPref", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW))); + } + + @Test + void createsProducerDependantAcls() { + ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.createAcls((Collection) createdCaptor.capture())) + .thenReturn(Mono.empty()); + + var principal = UUID.randomUUID().toString(); + var host = UUID.randomUUID().toString(); + + aclsService.createProducerAcl( + CLUSTER, + new CreateProducerAclDTO() + .principal(principal) + .host(host) + .topics(List.of("t1")) + .idempotent(true) + .transactionalId("txId1") + ).block(); + + //Write, Describe, Create permission on topics, Write, Describe on transactionalIds + //IDEMPOTENT_WRITE of cluster if idempotent is enabled (true) + Collection createdBindings = (Collection) createdCaptor.getValue(); + assertThat(createdBindings) + .hasSize(6) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.IDEMPOTENT_WRITE, AclPermissionType.ALLOW))); + } + + + @Test + void createsProducerDependantAclsWhenTopicsAndTxIdSpecifiedByPrefix() { + ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.createAcls((Collection) createdCaptor.capture())) + .thenReturn(Mono.empty()); + + var principal = UUID.randomUUID().toString(); + var host = UUID.randomUUID().toString(); + + aclsService.createProducerAcl( + CLUSTER, + new CreateProducerAclDTO() + .principal(principal) + .host(host) + .topicsPrefix("topicPref") + .transactionsIdPrefix("txIdPref") + .idempotent(false) + ).block(); + + //Write, Describe, Create permission on topics, Write, Describe on transactionalIds + //IDEMPOTENT_WRITE of cluster if idempotent is enabled (false) + Collection createdBindings = (Collection) createdCaptor.getValue(); + assertThat(createdBindings) + .hasSize(5) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW))); + } + + + @Test + void createsStreamAppDependantAcls() { + ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.createAcls((Collection) createdCaptor.capture())) + .thenReturn(Mono.empty()); + + var principal = UUID.randomUUID().toString(); + var host = UUID.randomUUID().toString(); + + aclsService.createStreamAppAcl( + CLUSTER, + new CreateStreamAppAclDTO() + .principal(principal) + .host(host) + .inputTopics(List.of("t1")) + .outputTopics(List.of("t2", "t3")) + .applicationId("appId1") + ).block(); + + // Read on input topics, Write on output topics + // ALL on applicationId-prefixed Groups and Topics + Collection createdBindings = (Collection) createdCaptor.getValue(); + assertThat(createdBindings) + .hasSize(5) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "t3", PatternType.LITERAL), + new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.GROUP, "appId1", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW))) + .contains(new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "appId1", PatternType.PREFIXED), + new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW))); + } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index b89f8d0963..a7e9c72a44 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1849,6 +1849,69 @@ paths: 404: description: Acl not found + /api/clusters/{clusterName}/acl/consumer: + post: + tags: + - Acls + summary: createConsumerAcl + operationId: createConsumerAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CreateConsumerAcl' + responses: + 200: + description: OK + + /api/clusters/{clusterName}/acl/producer: + post: + tags: + - Acls + summary: createProducerAcl + operationId: createProducerAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CreateProducerAcl' + responses: + 200: + description: OK + + /api/clusters/{clusterName}/acl/streamApp: + post: + tags: + - Acls + summary: createStreamAppAcl + operationId: createStreamAppAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CreateStreamAppAcl' + responses: + 200: + description: OK + /api/authorization: get: tags: @@ -3478,7 +3541,7 @@ components: principal: type: string host: - type: string # "*" if acl can be applied to any resource of given type + type: string operation: type: string enum: @@ -3502,6 +3565,69 @@ components: - ALLOW - DENY + CreateConsumerAcl: + type: object + required: [principal, host] + properties: + principal: + type: string + host: + type: string + topics: + type: array + items: + type: string + topicsPrefix: + type: string + consumerGroups: + type: array + items: + type: string + consumerGroupsPrefix: + type: string + + CreateProducerAcl: + type: object + required: [principal, host] + properties: + principal: + type: string + host: + type: string + topics: + type: array + items: + type: string + topicsPrefix: + type: string + transactionalId: + type: string + transactionsIdPrefix: + type: string + idempotent: + type: boolean + default: false + + CreateStreamAppAcl: + type: object + required: [principal, host, applicationId, inputTopics, outputTopics] + properties: + principal: + type: string + host: + type: string + inputTopics: + type: array + items: + type: string + outputTopics: + type: array + items: + type: string + applicationId: + nullable: false + type: string + KafkaAclResourceType: type: string enum: