diff --git a/documentation/compose/jaas/zookeeper_jaas.conf b/documentation/compose/jaas/zookeeper_jaas.conf new file mode 100644 index 0000000000..2b1754fba0 --- /dev/null +++ b/documentation/compose/jaas/zookeeper_jaas.conf @@ -0,0 +1,4 @@ +Server { + org.apache.zookeeper.server.auth.DigestLoginModule required + user_admin="admin-secret"; +}; \ No newline at end of file diff --git a/documentation/compose/kafka-ui-sasl.yaml b/documentation/compose/kafka-ui-sasl.yaml index 1c0312f11a..6dfe4f7532 100644 --- a/documentation/compose/kafka-ui-sasl.yaml +++ b/documentation/compose/kafka-ui-sasl.yaml @@ -12,41 +12,39 @@ services: - kafka environment: KAFKA_CLUSTERS_0_NAME: local -# SERVER_SERVLET_CONTEXT_PATH: "/kafkaui" - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";' + zookeeper: - image: confluentinc/cp-zookeeper:5.2.4 + image: wurstmeister/zookeeper:3.4.6 environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 + JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf" + volumes: + - ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf ports: - 2181:2181 kafka: - image: wurstmeister/kafka:latest - hostname: kafka - container_name: kafka + image: wurstmeister/kafka:2.13-2.8.1 depends_on: - zookeeper ports: - - '9092:9092' + - 9092:9092 environment: - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENERS: SASL_PLAINTEXT://kafka:9092 - KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_SUPER_USERS: "User:admin" + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: INTERNAL://:9093,EXTERNAL://:9092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9093,EXTERNAL://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT ALLOW_PLAINTEXT_LISTENER: 'yes' - KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf" - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_SASL_ENABLED_MECHANISMS: PLAIN KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN - KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT - KAFKA_SUPER_USERS: User:admin,User:enzo - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf" volumes: - - ./jaas:/etc/kafka/jaas \ No newline at end of file + - ./jaas/kafka_server.conf:/etc/kafka/kafka_jaas.conf \ No newline at end of file diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java new file mode 100644 index 0000000000..7d8fa9812d --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java @@ -0,0 +1,55 @@ +package com.provectus.kafka.ui.controller; + +import com.provectus.kafka.ui.api.AclsApi; +import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.KafkaAclDTO; +import com.provectus.kafka.ui.service.acl.AclsService; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +public class AclsController extends AbstractController implements AclsApi { + + private final AclsService aclsService; + + @Override + public Mono> createAcl(String clusterName, Mono kafkaAclDto, + ServerWebExchange exchange) { + return kafkaAclDto.map(ClusterMapper::toAclBinding) + .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono> deleteAcl(String clusterName, Mono kafkaAclDto, + ServerWebExchange exchange) { + return kafkaAclDto.map(ClusterMapper::toAclBinding) + .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono>> listAcls(String clusterName, ServerWebExchange exchange) { + return Mono.just( + ResponseEntity.ok( + aclsService.listAcls(getCluster(clusterName)).map(ClusterMapper::toKafkaAclDto))); + } + + @Override + public Mono> getAclAsCsv(String clusterName, ServerWebExchange exchange) { + return aclsService.getAclAsCsvString(getCluster(clusterName)) + .map(ResponseEntity::ok) + .flatMap(Mono::just); + } + + @Override + public Mono> syncAclsCsv(String clusterName, Mono csvMono, ServerWebExchange exchange) { + return csvMono.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv)) + .thenReturn(ResponseEntity.ok().build()); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index bf7cb33636..24f52b0cb1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -23,6 +23,7 @@ import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalSchemaRegistry; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; +import com.provectus.kafka.ui.model.KafkaAclDTO; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaConnectCluster; import com.provectus.kafka.ui.model.MetricDTO; @@ -35,7 +36,6 @@ import com.provectus.kafka.ui.model.TopicDetailsDTO; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityCheck; import com.provectus.kafka.ui.model.schemaregistry.InternalCompatibilityLevel; import com.provectus.kafka.ui.service.metrics.RawMetric; -import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -43,6 +43,13 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Named; @@ -181,4 +188,33 @@ public interface ClusterMapper { return copy; } + static AclBinding toAclBinding(KafkaAclDTO dto) { + return new AclBinding( + new ResourcePattern( + ResourceType.valueOf(dto.getResourceType().name()), + dto.getResourceName(), + PatternType.valueOf(dto.getNamePatternType().name()) + ), + new AccessControlEntry( + dto.getPrincipal(), + dto.getHost(), + AclOperation.valueOf(dto.getOperation().name()), + AclPermissionType.valueOf(dto.getPermission().name()) + ) + ); + } + + static KafkaAclDTO toKafkaAclDto(AclBinding binding) { + var pattern = binding.pattern(); + var filter = binding.toFilter().entryFilter(); + return new KafkaAclDTO() + .resourceType(KafkaAclDTO.ResourceTypeEnum.fromValue(pattern.resourceType().name())) + .resourceName(pattern.name()) + .namePatternType(KafkaAclDTO.NamePatternTypeEnum.fromValue(pattern.patternType().name())) + .principal(filter.principal()) + .host(filter.host()) + .operation(KafkaAclDTO.OperationEnum.fromValue(filter.operation().name())) + .permission(KafkaAclDTO.PermissionEnum.fromValue(filter.permissionType().name())); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java index ff0e2fca4b..f35039190d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Feature.java @@ -4,5 +4,6 @@ public enum Feature { KAFKA_CONNECT, KSQL_DB, SCHEMA_REGISTRY, - TOPIC_DELETION + TOPIC_DELETION, + KAFKA_ACL } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index 9097f2b25e..0eac8931a6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.model.Feature; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.ReactiveAdminClient.SupportedFeature; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -42,16 +43,15 @@ public class FeatureService { } if (controller != null) { - features.add( - isTopicDeletionEnabled(cluster, controller) - .flatMap(r -> Boolean.TRUE.equals(r) ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty()) - ); + features.add(topicDeletion(cluster, controller)); } + features.add(acl(cluster)); + return Flux.fromIterable(features).flatMap(m -> m).collectList(); } - private Mono isTopicDeletionEnabled(KafkaCluster cluster, Node controller) { + private Mono topicDeletion(KafkaCluster cluster, Node controller) { return adminClientService.get(cluster) .flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id()))) .map(config -> @@ -60,6 +60,15 @@ public class FeatureService { .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY)) .map(e -> Boolean.parseBoolean(e.value())) .findFirst() - .orElse(true)); + .orElse(true)) + .flatMap(enabled -> enabled ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty()); + } + + private Mono acl(KafkaCluster cluster) { + return adminClientService.get(cluster).flatMap( + ac -> ac.getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED) + ? Mono.just(Feature.KAFKA_ACL) + : Mono.empty() + ); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 463e2d9b6c..755874009b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -4,6 +4,7 @@ import static com.google.common.util.concurrent.Uninterruptibles.getUninterrupti import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.provectus.kafka.ui.exception.IllegalEntityStateException; @@ -12,7 +13,6 @@ import com.provectus.kafka.ui.util.MapUtil; import com.provectus.kafka.ui.util.NumberUtil; import java.io.Closeable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -52,13 +52,17 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; @@ -69,25 +73,28 @@ import reactor.util.function.Tuples; @RequiredArgsConstructor public class ReactiveAdminClient implements Closeable { - private enum SupportedFeature { + public enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), - CONFIG_DOCUMENTATION_RETRIEVAL(2.6f); + CONFIG_DOCUMENTATION_RETRIEVAL(2.6f), + AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled); - private final float sinceVersion; + private final BiFunction> predicate; - SupportedFeature(float sinceVersion) { - this.sinceVersion = sinceVersion; + SupportedFeature(BiFunction> predicate) { + this.predicate = predicate; } - static Set forVersion(float kafkaVersion) { - return Arrays.stream(SupportedFeature.values()) - .filter(f -> kafkaVersion >= f.sinceVersion) + SupportedFeature(float fromVersion) { + this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion); + } + + static Mono> forVersion(AdminClient ac, @Nullable Float kafkaVersion) { + return Flux.fromArray(SupportedFeature.values()) + .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled))) + .filter(Tuple2::getT2) + .map(Tuple2::getT1) .collect(Collectors.toSet()); } - - static Set defaultFeatures() { - return Set.of(); - } } @Value @@ -101,20 +108,28 @@ public class ReactiveAdminClient implements Closeable { public static Mono create(AdminClient adminClient) { return getClusterVersion(adminClient) - .map(ver -> - new ReactiveAdminClient( - adminClient, - ver, - getSupportedUpdateFeaturesForVersion(ver))); + .flatMap(ver -> + getSupportedUpdateFeaturesForVersion(adminClient, ver) + .map(features -> + new ReactiveAdminClient(adminClient, ver, features))); } - private static Set getSupportedUpdateFeaturesForVersion(String versionStr) { + private static Mono> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) { + Float kafkaVersion = null; try { - float version = NumberUtil.parserClusterVersion(versionStr); - return SupportedFeature.forVersion(version); + kafkaVersion = NumberUtil.parserClusterVersion(versionStr); } catch (NumberFormatException e) { - return SupportedFeature.defaultFeatures(); + //Nothing to do here } + return SupportedFeature.forVersion(ac, kafkaVersion); + } + + private static Mono isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) { + return toMono(ac.describeAcls(AclBindingFilter.ANY).values()) + .thenReturn(true) + .doOnError(th -> !(th instanceof SecurityDisabledException) && !(th instanceof InvalidRequestException), + th -> log.warn("Error checking if security enabled", th)) + .onErrorReturn(false); } //TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here @@ -146,6 +161,10 @@ public class ReactiveAdminClient implements Closeable { private final String version; private final Set features; + public Set getClusterFeatures() { + return features; + } + public Mono> listTopics(boolean listInternal) { return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names()); } @@ -417,6 +436,22 @@ public class ReactiveAdminClient implements Closeable { .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()))); } + public Mono> listAcls() { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + return toMono(client.describeAcls(AclBindingFilter.ANY).values()); + } + + public Mono createAcls(Collection aclBindings) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + return toMono(client.createAcls(aclBindings).all()); + } + + public Mono deleteAcls(Collection aclBindings) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet()); + return toMono(client.deleteAcls(filters).all()).then(); + } + private Mono> topicPartitions(String topic) { return toMono(client.describeTopics(List.of(topic)).all()) .map(r -> r.values().stream() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java new file mode 100644 index 0000000000..821627274c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java @@ -0,0 +1,81 @@ +package com.provectus.kafka.ui.service.acl; + +import com.provectus.kafka.ui.exception.ValidationException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; + +public class AclCsv { + + private static final String LINE_SEPARATOR = System.lineSeparator(); + private static final String VALUES_SEPARATOR = ","; + private static final String HEADER = "Principal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host"; + + public static String transformToCsvString(Collection acls) { + return Stream.concat(Stream.of(HEADER), acls.stream().map(AclCsv::createAclString)) + .collect(Collectors.joining(System.lineSeparator())); + } + + public static String createAclString(AclBinding binding) { + var pattern = binding.pattern(); + var filter = binding.toFilter().entryFilter(); + return String.format( + "%s,%s,%s,%s,%s,%s,%s", + filter.principal(), + pattern.resourceType(), + pattern.patternType(), + pattern.name(), + filter.operation(), + filter.permissionType(), + filter.host() + ); + } + + private static AclBinding parseCsvLine(String csv, int line) { + String[] values = csv.split(VALUES_SEPARATOR); + if (values.length != 7) { + throw new ValidationException("Input csv is not valid - there should be 7 columns in line " + line); + } + for (int i = 0; i < values.length; i++) { + if ((values[i] = values[i].trim()).isBlank()) { + throw new ValidationException("Input csv is not valid - blank value in colum " + i + ", line " + line); + } + } + try { + return new AclBinding( + new ResourcePattern( + ResourceType.valueOf(values[1]), values[3], PatternType.valueOf(values[2])), + new AccessControlEntry( + values[0], values[6], AclOperation.valueOf(values[4]), AclPermissionType.valueOf(values[5])) + ); + } catch (IllegalArgumentException enumParseError) { + throw new ValidationException("Error parsing enum value in line " + line); + } + } + + public static Collection parseCsv(String csvString) { + String[] lines = csvString.split(LINE_SEPARATOR); + if (lines.length == 0) { + throw new ValidationException("Error parsing ACL csv file: "); + } + boolean firstLineIsHeader = HEADER.equalsIgnoreCase(lines[0].trim().replace(" ", "")); + Set result = new HashSet<>(); + for (int i = firstLineIsHeader ? 1 : 0; i < lines.length; i++) { + String line = lines[i]; + if (!line.isBlank()) { + AclBinding aclBinding = parseCsvLine(line, i); + result.add(aclBinding); + } + } + return result; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java new file mode 100644 index 0000000000..e148413575 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java @@ -0,0 +1,93 @@ +package com.provectus.kafka.ui.service.acl; + +import com.google.common.collect.Sets; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.List; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.acl.AclBinding; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AclsService { + + private final AdminClientService adminClientService; + + public Mono createAcl(KafkaCluster cluster, AclBinding aclBinding) { + var aclString = AclCsv.createAclString(aclBinding); + log.info("CREATING ACL: [{}]", aclString); + return adminClientService.get(cluster) + .flatMap(ac -> ac.createAcls(List.of(aclBinding))) + .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString)); + } + + public Mono deleteAcl(KafkaCluster cluster, AclBinding aclBinding) { + var aclString = AclCsv.createAclString(aclBinding); + log.info("DELETING ACL: [{}]", aclString); + return adminClientService.get(cluster) + .flatMap(ac -> ac.deleteAcls(List.of(aclBinding))) + .doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString)); + } + + public Flux listAcls(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ReactiveAdminClient::listAcls) + .flatMapIterable(acls -> acls); + } + + public Mono getAclAsCsvString(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ReactiveAdminClient::listAcls) + .map(AclCsv::transformToCsvString); + } + + public Mono syncAclWithAclCsv(KafkaCluster cluster, String csv) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.listAcls().flatMap(existingAclList -> { + var existingSet = Set.copyOf(existingAclList); + var newAcls = Set.copyOf(AclCsv.parseCsv(csv)); + var toDelete = Sets.difference(existingSet, newAcls); + var toAdd = Sets.difference(newAcls, existingSet); + logAclSyncPlan(cluster, toAdd, toDelete); + if (toAdd.isEmpty() && toDelete.isEmpty()) { + return Mono.empty(); + } + log.info("Starting new ACLs creation"); + return ac.createAcls(toAdd) + .doOnSuccess(v -> { + log.info("{} new ACLs created", toAdd.size()); + log.info("Starting ACLs deletion"); + }) + .then(ac.deleteAcls(toDelete) + .doOnSuccess(v -> log.info("{} ACLs deleted", toDelete.size()))); + })); + } + + private void logAclSyncPlan(KafkaCluster cluster, Set toBeAdded, Set toBeDeleted) { + log.info("'{}' cluster ACL sync plan: ", cluster.getName()); + if (toBeAdded.isEmpty() && toBeDeleted.isEmpty()) { + log.info("Nothing to do, ACL is already in sync"); + return; + } + if (!toBeAdded.isEmpty()) { + log.info("ACLs to be added ({}): ", toBeAdded.size()); + for (AclBinding aclBinding : toBeAdded) { + log.info(" " + AclCsv.createAclString(aclBinding)); + } + } + if (!toBeDeleted.isEmpty()) { + log.info("ACLs to be deleted ({}): ", toBeDeleted.size()); + for (AclBinding aclBinding : toBeDeleted) { + log.info(" " + AclCsv.createAclString(aclBinding)); + } + } + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java new file mode 100644 index 0000000000..c99c51dc6d --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java @@ -0,0 +1,68 @@ +package com.provectus.kafka.ui.service.acl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Collection; +import java.util.List; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class AclCsvTest { + + private static final List TEST_BINDINGS = List.of( + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), + new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW)), + new AclBinding( + new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED), + new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)) + ); + + @ParameterizedTest + @ValueSource(strings = { + "Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n" + + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost", + + //without header + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "\n" + + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost" + + "\n" + }) + void parsesValidInputCsv(String csvString) { + Collection parsed = AclCsv.parseCsv(csvString); + assertThat(parsed).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS); + } + + @ParameterizedTest + @ValueSource(strings = { + // columns > 7 + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*,1,2,3,4", + // columns < 7 + "User:test1,TOPIC,LITERAL,*", + // enum values are illegal + "User:test1,ILLEGAL,LITERAL,*,READ,ALLOW,*", + "User:test1,TOPIC,LITERAL,*,READ,ILLEGAL,*" + }) + void throwsExceptionForInvalidInputCsv(String csvString) { + assertThatThrownBy(() -> AclCsv.parseCsv(csvString)).isNotNull(); + } + + @Test + void transformAndParseUseSameFormat() { + String csv = AclCsv.transformToCsvString(TEST_BINDINGS); + Collection parsedBindings = AclCsv.parseCsv(csv); + assertThat(parsedBindings).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS); + } + +} \ No newline at end of file diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 302cf84c33..9222394aa4 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1776,6 +1776,111 @@ paths: $ref: '#/components/schemas/PartitionsIncreaseResponse' 404: description: Not found + + /api/clusters/{clusterName}/acls: + get: + tags: + - Acls + summary: listKafkaAcls + operationId: listAcls + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/KafkaAcl' + + /api/clusters/{clusterName}/acl/csv: + get: + tags: + - Acls + summary: getAclAsCsv + operationId: getAclAsCsv + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + text/plain: + schema: + type: string + post: + tags: + - Acls + summary: syncAclsCsv + operationId: syncAclsCsv + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + text/plain: + schema: + type: string + responses: + 200: + description: OK + + /api/clusters/{clusterName}/acl: + post: + tags: + - Acls + summary: createAcl + operationId: createAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KafkaAcl' + responses: + 200: + description: OK + + delete: + tags: + - Acls + summary: deleteAcl + operationId: deleteAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KafkaAcl' + responses: + 200: + description: OK + 404: + description: Acl not found + /api/info/timestampformat: get: tags: @@ -1903,6 +2008,7 @@ components: - KAFKA_CONNECT - KSQL_DB - TOPIC_DELETION + - KAFKA_ACL required: - id - name @@ -3216,3 +3322,46 @@ components: - COMPACT - COMPACT_DELETE - UNKNOWN + + KafkaAcl: + type: object + required: [resourceType, resourceName, namePatternType, principal, host, operation, permission] + properties: + resourceType: + type: string + enum: + - TOPIC + - GROUP + - CLUSTER + - TRANSACTIONAL_ID + - DELEGATION_TOPIC + resourceName: + type: string # "*" if acl can be applied to any resource of given type + namePatternType: + type: string + enum: + - LITERAL + - PREFIXED + principal: + type: string + host: + type: string # "*" if acl can be applied to any resource of given type + operation: + type: string + enum: + - ALL # Cluster, Topic, Group + - READ # Topic, Group + - WRITE # Topic, TransactionalId + - CREATE # Cluster, Topic + - DELETE # Topic, Group + - ALTER # Cluster, Topic, + - DESCRIBE # Cluster, Topic, Group, TransactionalId, DelegationToken + - CLUSTER_ACTION # Cluster + - DESCRIBE_CONFIGS # Cluster, Topic + - ALTER_CONFIGS # Cluster, Topic + - IDEMPOTENT_WRITE # - + permission: + type: string + enum: + - ALLOW + - DENY \ No newline at end of file