diff --git a/documentation/compose/jaas/kafka_server.conf b/documentation/compose/jaas/kafka_server.conf
index 25388be5aa..0c1fb34652 100644
--- a/documentation/compose/jaas/kafka_server.conf
+++ b/documentation/compose/jaas/kafka_server.conf
@@ -11,4 +11,8 @@ KafkaClient {
user_admin="admin-secret";
};
-Client {};
+Client {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="zkuser"
+ password="zkuserpassword";
+};
diff --git a/documentation/compose/jaas/zookeeper_jaas.conf b/documentation/compose/jaas/zookeeper_jaas.conf
new file mode 100644
index 0000000000..2d7fd1b1c2
--- /dev/null
+++ b/documentation/compose/jaas/zookeeper_jaas.conf
@@ -0,0 +1,4 @@
+Server {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ user_zkuser="zkuserpassword";
+};
diff --git a/documentation/compose/kafka-ui-acl-with-zk.yaml b/documentation/compose/kafka-ui-acl-with-zk.yaml
new file mode 100644
index 0000000000..e1d70b2970
--- /dev/null
+++ b/documentation/compose/kafka-ui-acl-with-zk.yaml
@@ -0,0 +1,59 @@
+---
+version: '2'
+services:
+
+ kafka-ui:
+ container_name: kafka-ui
+ image: provectuslabs/kafka-ui:latest
+ ports:
+ - 8080:8080
+ depends_on:
+ - zookeeper
+ - kafka
+ environment:
+ KAFKA_CLUSTERS_0_NAME: local
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
+ KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
+ KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
+ KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";'
+
+ zookeeper:
+ image: wurstmeister/zookeeper:3.4.6
+ environment:
+ JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf"
+ volumes:
+ - ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf
+ ports:
+ - 2181:2181
+
+ kafka:
+ image: confluentinc/cp-kafka:7.2.1
+ hostname: kafka
+ container_name: kafka
+ ports:
+ - "9092:9092"
+ - "9997:9997"
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
+ KAFKA_ADVERTISED_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf"
+ KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_JMX_PORT: 9997
+ KAFKA_JMX_HOSTNAME: localhost
+ KAFKA_NODE_ID: 1
+ KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
+ KAFKA_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
+ KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT'
+ KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
+ KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN'
+ KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT'
+ KAFKA_SUPER_USERS: 'User:admin'
+ volumes:
+ - ./scripts/update_run.sh:/tmp/update_run.sh
+ - ./jaas:/etc/kafka/jaas
diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index 7f2d4c16be..5ebefe31df 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -12,7 +12,7 @@
kafka-ui-api
- 0.8.8
+ 0.8.10
jacoco
reuseReports
${project.basedir}/target/jacoco.exec
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java
new file mode 100644
index 0000000000..83d2ef553e
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java
@@ -0,0 +1,115 @@
+package com.provectus.kafka.ui.controller;
+
+import com.provectus.kafka.ui.api.AclsApi;
+import com.provectus.kafka.ui.mapper.ClusterMapper;
+import com.provectus.kafka.ui.model.KafkaAclDTO;
+import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
+import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
+import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.model.rbac.permission.AclAction;
+import com.provectus.kafka.ui.service.acl.AclsService;
+import com.provectus.kafka.ui.service.rbac.AccessControlService;
+import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequiredArgsConstructor
+public class AclsController extends AbstractController implements AclsApi {
+
+ private final AclsService aclsService;
+ private final AccessControlService accessControlService;
+
+ @Override
+ public Mono> createAcl(String clusterName, Mono kafkaAclDto,
+ ServerWebExchange exchange) {
+ AccessContext context = AccessContext.builder()
+ .cluster(clusterName)
+ .aclActions(AclAction.EDIT)
+ .build();
+
+ return accessControlService.validateAccess(context)
+ .then(kafkaAclDto)
+ .map(ClusterMapper::toAclBinding)
+ .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
+ .thenReturn(ResponseEntity.ok().build());
+ }
+
+ @Override
+ public Mono> deleteAcl(String clusterName, Mono kafkaAclDto,
+ ServerWebExchange exchange) {
+ AccessContext context = AccessContext.builder()
+ .cluster(clusterName)
+ .aclActions(AclAction.EDIT)
+ .build();
+
+ return accessControlService.validateAccess(context)
+ .then(kafkaAclDto)
+ .map(ClusterMapper::toAclBinding)
+ .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
+ .thenReturn(ResponseEntity.ok().build());
+ }
+
+ @Override
+ public Mono>> listAcls(String clusterName,
+ KafkaAclResourceTypeDTO resourceTypeDto,
+ String resourceName,
+ KafkaAclNamePatternTypeDTO namePatternTypeDto,
+ ServerWebExchange exchange) {
+ AccessContext context = AccessContext.builder()
+ .cluster(clusterName)
+ .aclActions(AclAction.VIEW)
+ .build();
+
+ var resourceType = Optional.ofNullable(resourceTypeDto)
+ .map(ClusterMapper::mapAclResourceTypeDto)
+ .orElse(ResourceType.ANY);
+
+ var namePatternType = Optional.ofNullable(namePatternTypeDto)
+ .map(ClusterMapper::mapPatternTypeDto)
+ .orElse(PatternType.ANY);
+
+ var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
+
+ return accessControlService.validateAccess(context).then(
+ Mono.just(
+ ResponseEntity.ok(
+ aclsService.listAcls(getCluster(clusterName), filter)
+ .map(ClusterMapper::toKafkaAclDto)))
+ );
+ }
+
+ @Override
+ public Mono> getAclAsCsv(String clusterName, ServerWebExchange exchange) {
+ AccessContext context = AccessContext.builder()
+ .cluster(clusterName)
+ .aclActions(AclAction.VIEW)
+ .build();
+
+ return accessControlService.validateAccess(context).then(
+ aclsService.getAclAsCsvString(getCluster(clusterName))
+ .map(ResponseEntity::ok)
+ .flatMap(Mono::just)
+ );
+ }
+
+ @Override
+ public Mono> syncAclsCsv(String clusterName, Mono csvMono, ServerWebExchange exchange) {
+ AccessContext context = AccessContext.builder()
+ .cluster(clusterName)
+ .aclActions(AclAction.EDIT)
+ .build();
+
+ return accessControlService.validateAccess(context)
+ .then(csvMono)
+ .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
+ .thenReturn(ResponseEntity.ok().build());
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java
index 571250ba94..df04b40fab 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java
@@ -27,6 +27,7 @@ import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.multipart.FilePart;
+import org.springframework.http.codec.multipart.Part;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
@@ -92,16 +93,19 @@ public class ApplicationConfigController implements ApplicationConfigApi {
}
@Override
- public Mono> uploadConfigRelatedFile(FilePart file, ServerWebExchange exchange) {
+ public Mono> uploadConfigRelatedFile(Flux fileFlux,
+ ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(EDIT)
.build()
)
- .then(dynamicConfigOperations.uploadConfigRelatedFile(file))
- .map(path -> new UploadedFileInfoDTO().location(path.toString()))
- .map(ResponseEntity::ok);
+ .then(fileFlux.single())
+ .flatMap(file ->
+ dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
+ .map(path -> new UploadedFileInfoDTO().location(path.toString()))
+ .map(ResponseEntity::ok));
}
@Override
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
index d989ce93ba..a122a269a4 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java
@@ -20,6 +20,9 @@ import com.provectus.kafka.ui.model.InternalPartition;
import com.provectus.kafka.ui.model.InternalReplica;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
+import com.provectus.kafka.ui.model.KafkaAclDTO;
+import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
+import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
import com.provectus.kafka.ui.model.MetricDTO;
import com.provectus.kafka.ui.model.Metrics;
import com.provectus.kafka.ui.model.PartitionDTO;
@@ -27,12 +30,18 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
-import com.provectus.kafka.ui.service.masking.DataMasking;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
@@ -109,8 +118,74 @@ public interface ClusterMapper {
return brokerDiskUsage;
}
- default DataMasking map(List maskingProperties) {
- return DataMasking.create(maskingProperties);
+ static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
+ return switch (operation) {
+ case ALL -> KafkaAclDTO.OperationEnum.ALL;
+ case READ -> KafkaAclDTO.OperationEnum.READ;
+ case WRITE -> KafkaAclDTO.OperationEnum.WRITE;
+ case CREATE -> KafkaAclDTO.OperationEnum.CREATE;
+ case DELETE -> KafkaAclDTO.OperationEnum.DELETE;
+ case ALTER -> KafkaAclDTO.OperationEnum.ALTER;
+ case DESCRIBE -> KafkaAclDTO.OperationEnum.DESCRIBE;
+ case CLUSTER_ACTION -> KafkaAclDTO.OperationEnum.CLUSTER_ACTION;
+ case DESCRIBE_CONFIGS -> KafkaAclDTO.OperationEnum.DESCRIBE_CONFIGS;
+ case ALTER_CONFIGS -> KafkaAclDTO.OperationEnum.ALTER_CONFIGS;
+ case IDEMPOTENT_WRITE -> KafkaAclDTO.OperationEnum.IDEMPOTENT_WRITE;
+ case CREATE_TOKENS -> KafkaAclDTO.OperationEnum.CREATE_TOKENS;
+ case DESCRIBE_TOKENS -> KafkaAclDTO.OperationEnum.DESCRIBE_TOKENS;
+ case ANY -> throw new IllegalArgumentException("ANY operation can be only part of filter");
+ case UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN;
+ };
+ }
+
+ static KafkaAclResourceTypeDTO mapAclResourceType(ResourceType resourceType) {
+ return switch (resourceType) {
+ case CLUSTER -> KafkaAclResourceTypeDTO.CLUSTER;
+ case TOPIC -> KafkaAclResourceTypeDTO.TOPIC;
+ case GROUP -> KafkaAclResourceTypeDTO.GROUP;
+ case DELEGATION_TOKEN -> KafkaAclResourceTypeDTO.DELEGATION_TOKEN;
+ case TRANSACTIONAL_ID -> KafkaAclResourceTypeDTO.TRANSACTIONAL_ID;
+ case USER -> KafkaAclResourceTypeDTO.USER;
+ case ANY -> throw new IllegalArgumentException("ANY type can be only part of filter");
+ case UNKNOWN -> KafkaAclResourceTypeDTO.UNKNOWN;
+ };
+ }
+
+ static ResourceType mapAclResourceTypeDto(KafkaAclResourceTypeDTO dto) {
+ return ResourceType.valueOf(dto.name());
+ }
+
+ static PatternType mapPatternTypeDto(KafkaAclNamePatternTypeDTO dto) {
+ return PatternType.valueOf(dto.name());
+ }
+
+ static AclBinding toAclBinding(KafkaAclDTO dto) {
+ return new AclBinding(
+ new ResourcePattern(
+ mapAclResourceTypeDto(dto.getResourceType()),
+ dto.getResourceName(),
+ mapPatternTypeDto(dto.getNamePatternType())
+ ),
+ new AccessControlEntry(
+ dto.getPrincipal(),
+ dto.getHost(),
+ AclOperation.valueOf(dto.getOperation().name()),
+ AclPermissionType.valueOf(dto.getPermission().name())
+ )
+ );
+ }
+
+ static KafkaAclDTO toKafkaAclDto(AclBinding binding) {
+ var pattern = binding.pattern();
+ var filter = binding.toFilter().entryFilter();
+ return new KafkaAclDTO()
+ .resourceType(mapAclResourceType(pattern.resourceType()))
+ .resourceName(pattern.name())
+ .namePatternType(KafkaAclNamePatternTypeDTO.fromValue(pattern.patternType().name()))
+ .principal(filter.principal())
+ .host(filter.host())
+ .operation(mapAclOperation(filter.operation()))
+ .permission(KafkaAclDTO.PermissionEnum.fromValue(filter.permissionType().name()));
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java
index 9731492f00..2973e5500d 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java
@@ -4,5 +4,7 @@ public enum ClusterFeature {
KAFKA_CONNECT,
KSQL_DB,
SCHEMA_REGISTRY,
- TOPIC_DELETION
+ TOPIC_DELETION,
+ KAFKA_ACL_VIEW,
+ KAFKA_ACL_EDIT
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java
index 0c2587d681..45858093a7 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac;
+import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
@@ -37,6 +38,8 @@ public class AccessContext {
Collection ksqlActions;
+ Collection aclActions;
+
public static AccessContextBuilder builder() {
return new AccessContextBuilder();
}
@@ -55,6 +58,7 @@ public class AccessContext {
private String schema;
private Collection schemaActions = Collections.emptySet();
private Collection ksqlActions = Collections.emptySet();
+ private Collection aclActions = Collections.emptySet();
private AccessContextBuilder() {
}
@@ -131,6 +135,12 @@ public class AccessContext {
return this;
}
+ public AccessContextBuilder aclActions(AclAction... actions) {
+ Assert.isTrue(actions.length > 0, "actions not present");
+ this.aclActions = List.of(actions);
+ return this;
+ }
+
public AccessContext build() {
return new AccessContext(
applicationConfigActions,
@@ -140,7 +150,7 @@ public class AccessContext {
connect, connectActions,
connector,
schema, schemaActions,
- ksqlActions);
+ ksqlActions, aclActions);
}
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java
index afdcf0ca15..16f01f60e6 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java
@@ -4,6 +4,7 @@ import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;
+import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
@@ -76,6 +77,7 @@ public class Permission {
case SCHEMA -> Arrays.stream(SchemaAction.values()).map(Enum::toString).toList();
case CONNECT -> Arrays.stream(ConnectAction.values()).map(Enum::toString).toList();
case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList();
+ case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList();
};
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java
index 4b2c66361f..f71dfb2979 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java
@@ -11,7 +11,8 @@ public enum Resource {
CONSUMER,
SCHEMA,
CONNECT,
- KSQL;
+ KSQL,
+ ACL;
@Nullable
public static Resource fromString(String name) {
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java
new file mode 100644
index 0000000000..c86af7e72d
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java
@@ -0,0 +1,15 @@
+package com.provectus.kafka.ui.model.rbac.permission;
+
+import org.apache.commons.lang3.EnumUtils;
+import org.jetbrains.annotations.Nullable;
+
+public enum AclAction implements PermissibleAction {
+
+ VIEW,
+ EDIT;
+
+ @Nullable
+ public static AclAction fromString(String name) {
+ return EnumUtils.getEnum(AclAction.class, name);
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java
index ec749abd14..7ba3f036e9 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java
@@ -2,16 +2,19 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.model.ClusterFeature;
import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.acl.AclOperation;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -26,7 +29,7 @@ public class FeatureService {
private final AdminClientService adminClientService;
public Mono> getAvailableFeatures(KafkaCluster cluster,
- ReactiveAdminClient.ClusterDescription clusterDescription) {
+ ClusterDescription clusterDescription) {
List> features = new ArrayList<>();
if (Optional.ofNullable(cluster.getConnectsClients())
@@ -44,6 +47,8 @@ public class FeatureService {
}
features.add(topicDeletionEnabled(cluster, clusterDescription.getController()));
+ features.add(aclView(cluster));
+ features.add(aclEdit(clusterDescription));
return Flux.fromIterable(features).flatMap(m -> m).collectList();
}
@@ -65,4 +70,20 @@ public class FeatureService {
? Mono.just(ClusterFeature.TOPIC_DELETION)
: Mono.empty());
}
+
+ private Mono aclEdit(ClusterDescription clusterDescription) {
+ var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
+ boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER);
+ return canEdit
+ ? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
+ : Mono.empty();
+ }
+
+ private Mono aclView(KafkaCluster cluster) {
+ return adminClientService.get(cluster).flatMap(
+ ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED)
+ ? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
+ : Mono.empty()
+ );
+ }
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
index 39332da39e..8451a89f97 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java
@@ -5,6 +5,7 @@ import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Table;
@@ -15,7 +16,6 @@ import com.provectus.kafka.ui.util.KafkaVersion;
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
import java.io.Closeable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -61,16 +61,22 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -82,26 +88,29 @@ import reactor.util.function.Tuples;
@RequiredArgsConstructor
public class ReactiveAdminClient implements Closeable {
- private enum SupportedFeature {
+ public enum SupportedFeature {
INCREMENTAL_ALTER_CONFIGS(2.3f),
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
- DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f);
+ DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
+ AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
- private final float sinceVersion;
+ private final BiFunction> predicate;
- SupportedFeature(float sinceVersion) {
- this.sinceVersion = sinceVersion;
+ SupportedFeature(BiFunction> predicate) {
+ this.predicate = predicate;
}
- static Set forVersion(float kafkaVersion) {
- return Arrays.stream(SupportedFeature.values())
- .filter(f -> kafkaVersion >= f.sinceVersion)
+ SupportedFeature(float fromVersion) {
+ this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
+ }
+
+ static Mono> forVersion(AdminClient ac, @Nullable Float kafkaVersion) {
+ return Flux.fromArray(SupportedFeature.values())
+ .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
+ .filter(Tuple2::getT2)
+ .map(Tuple2::getT1)
.collect(Collectors.toSet());
}
-
- static Set defaultFeatures() {
- return Set.of();
- }
}
@Value
@@ -110,25 +119,31 @@ public class ReactiveAdminClient implements Closeable {
Node controller;
String clusterId;
Collection nodes;
+ @Nullable // null, if ACL is disabled
Set authorizedOperations;
}
public static Mono create(AdminClient adminClient) {
return getClusterVersion(adminClient)
- .map(ver ->
- new ReactiveAdminClient(
- adminClient,
- ver,
- getSupportedUpdateFeaturesForVersion(ver)));
+ .flatMap(ver ->
+ getSupportedUpdateFeaturesForVersion(adminClient, ver)
+ .map(features ->
+ new ReactiveAdminClient(adminClient, ver, features)));
}
- private static Set getSupportedUpdateFeaturesForVersion(String versionStr) {
- try {
- float version = KafkaVersion.parse(versionStr);
- return SupportedFeature.forVersion(version);
- } catch (NumberFormatException e) {
- return SupportedFeature.defaultFeatures();
- }
+ private static Mono> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) {
+ @Nullable Float kafkaVersion = KafkaVersion.parse(versionStr).orElse(null);
+ return SupportedFeature.forVersion(ac, kafkaVersion);
+ }
+
+ private static Mono isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
+ return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
+ .thenReturn(true)
+ .doOnError(th -> !(th instanceof SecurityDisabledException)
+ && !(th instanceof InvalidRequestException)
+ && !(th instanceof UnsupportedVersionException),
+ th -> log.warn("Error checking if security enabled", th))
+ .onErrorReturn(false);
}
// NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
@@ -162,6 +177,10 @@ public class ReactiveAdminClient implements Closeable {
private final String version;
private final Set features;
+ public Set getClusterFeatures() {
+ return features;
+ }
+
public Mono> listTopics(boolean listInternal) {
return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
}
@@ -576,6 +595,22 @@ public class ReactiveAdminClient implements Closeable {
);
}
+ public Mono> listAcls(ResourcePatternFilter filter) {
+ Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
+ return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
+ }
+
+ public Mono createAcls(Collection aclBindings) {
+ Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
+ return toMono(client.createAcls(aclBindings).all());
+ }
+
+ public Mono deleteAcls(Collection aclBindings) {
+ Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
+ var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
+ return toMono(client.deleteAcls(filters).all()).then();
+ }
+
public Mono updateBrokerConfigByName(Integer brokerId, String name, String value) {
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java
new file mode 100644
index 0000000000..673b17ee1f
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java
@@ -0,0 +1,81 @@
+package com.provectus.kafka.ui.service.acl;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+
+public class AclCsv {
+
+ private static final String LINE_SEPARATOR = System.lineSeparator();
+ private static final String VALUES_SEPARATOR = ",";
+ private static final String HEADER = "Principal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host";
+
+ public static String transformToCsvString(Collection acls) {
+ return Stream.concat(Stream.of(HEADER), acls.stream().map(AclCsv::createAclString))
+ .collect(Collectors.joining(System.lineSeparator()));
+ }
+
+ public static String createAclString(AclBinding binding) {
+ var pattern = binding.pattern();
+ var filter = binding.toFilter().entryFilter();
+ return String.format(
+ "%s,%s,%s,%s,%s,%s,%s",
+ filter.principal(),
+ pattern.resourceType(),
+ pattern.patternType(),
+ pattern.name(),
+ filter.operation(),
+ filter.permissionType(),
+ filter.host()
+ );
+ }
+
+ private static AclBinding parseCsvLine(String csv, int line) {
+ String[] values = csv.split(VALUES_SEPARATOR);
+ if (values.length != 7) {
+ throw new ValidationException("Input csv is not valid - there should be 7 columns in line " + line);
+ }
+ for (int i = 0; i < values.length; i++) {
+ if ((values[i] = values[i].trim()).isBlank()) {
+ throw new ValidationException("Input csv is not valid - blank value in colum " + i + ", line " + line);
+ }
+ }
+ try {
+ return new AclBinding(
+ new ResourcePattern(
+ ResourceType.valueOf(values[1]), values[3], PatternType.valueOf(values[2])),
+ new AccessControlEntry(
+ values[0], values[6], AclOperation.valueOf(values[4]), AclPermissionType.valueOf(values[5]))
+ );
+ } catch (IllegalArgumentException enumParseError) {
+ throw new ValidationException("Error parsing enum value in line " + line);
+ }
+ }
+
+ public static Collection parseCsv(String csvString) {
+ String[] lines = csvString.split(LINE_SEPARATOR);
+ if (lines.length == 0) {
+ throw new ValidationException("Error parsing ACL csv file: no lines in file");
+ }
+ boolean firstLineIsHeader = HEADER.equalsIgnoreCase(lines[0].trim().replace(" ", ""));
+ Set result = new HashSet<>();
+ for (int i = firstLineIsHeader ? 1 : 0; i < lines.length; i++) {
+ String line = lines[i];
+ if (!line.isBlank()) {
+ AclBinding aclBinding = parseCsvLine(line, i);
+ result.add(aclBinding);
+ }
+ }
+ return result;
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java
new file mode 100644
index 0000000000..8c5a8dab06
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java
@@ -0,0 +1,93 @@
+package com.provectus.kafka.ui.service.acl;
+
+import com.google.common.collect.Sets;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.AdminClientService;
+import java.util.List;
+import java.util.Set;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class AclsService {
+
+ private final AdminClientService adminClientService;
+
+ public Mono createAcl(KafkaCluster cluster, AclBinding aclBinding) {
+ var aclString = AclCsv.createAclString(aclBinding);
+ log.info("CREATING ACL: [{}]", aclString);
+ return adminClientService.get(cluster)
+ .flatMap(ac -> ac.createAcls(List.of(aclBinding)))
+ .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString));
+ }
+
+ public Mono deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
+ var aclString = AclCsv.createAclString(aclBinding);
+ log.info("DELETING ACL: [{}]", aclString);
+ return adminClientService.get(cluster)
+ .flatMap(ac -> ac.deleteAcls(List.of(aclBinding)))
+ .doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString));
+ }
+
+ public Flux listAcls(KafkaCluster cluster, ResourcePatternFilter filter) {
+ return adminClientService.get(cluster)
+ .flatMap(c -> c.listAcls(filter))
+ .flatMapIterable(acls -> acls);
+ }
+
+ public Mono getAclAsCsvString(KafkaCluster cluster) {
+ return adminClientService.get(cluster)
+ .flatMap(c -> c.listAcls(ResourcePatternFilter.ANY))
+ .map(AclCsv::transformToCsvString);
+ }
+
+ public Mono syncAclWithAclCsv(KafkaCluster cluster, String csv) {
+ return adminClientService.get(cluster)
+ .flatMap(ac -> ac.listAcls(ResourcePatternFilter.ANY).flatMap(existingAclList -> {
+ var existingSet = Set.copyOf(existingAclList);
+ var newAcls = Set.copyOf(AclCsv.parseCsv(csv));
+ var toDelete = Sets.difference(existingSet, newAcls);
+ var toAdd = Sets.difference(newAcls, existingSet);
+ logAclSyncPlan(cluster, toAdd, toDelete);
+ if (toAdd.isEmpty() && toDelete.isEmpty()) {
+ return Mono.empty();
+ }
+ log.info("Starting new ACLs creation");
+ return ac.createAcls(toAdd)
+ .doOnSuccess(v -> {
+ log.info("{} new ACLs created", toAdd.size());
+ log.info("Starting ACLs deletion");
+ })
+ .then(ac.deleteAcls(toDelete)
+ .doOnSuccess(v -> log.info("{} ACLs deleted", toDelete.size())));
+ }));
+ }
+
+ private void logAclSyncPlan(KafkaCluster cluster, Set toBeAdded, Set toBeDeleted) {
+ log.info("'{}' cluster ACL sync plan: ", cluster.getName());
+ if (toBeAdded.isEmpty() && toBeDeleted.isEmpty()) {
+ log.info("Nothing to do, ACL is already in sync");
+ return;
+ }
+ if (!toBeAdded.isEmpty()) {
+ log.info("ACLs to be added ({}): ", toBeAdded.size());
+ for (AclBinding aclBinding : toBeAdded) {
+ log.info(" " + AclCsv.createAclString(aclBinding));
+ }
+ }
+ if (!toBeDeleted.isEmpty()) {
+ log.info("ACLs to be deleted ({}): ", toBeDeleted.size());
+ for (AclBinding aclBinding : toBeDeleted) {
+ log.info(" " + AclCsv.createAclString(aclBinding));
+ }
+ }
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java
index 06304365c7..fa84fc361c 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java
@@ -61,7 +61,9 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory {
} catch (Exception e) {
log.error("----------------------------------");
log.error("SSL can't be enabled for JMX retrieval. "
- + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg.", e);
+ + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}",
+ e.getMessage());
+ log.trace("SSL can't be enabled for JMX retrieval", e);
log.error("----------------------------------");
}
SSL_JMX_SUPPORTED = sslJmxSupported;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java
index 75c6d25f95..68f826bd0f 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java
@@ -90,6 +90,7 @@ public class DynamicConfigOperations {
}
public PropertiesStructure getCurrentProperties() {
+ checkIfDynamicConfigEnabled();
return PropertiesStructure.builder()
.kafka(getNullableBean(ClustersProperties.class))
.rbac(getNullableBean(RoleBasedAccessControlProperties.class))
@@ -112,11 +113,7 @@ public class DynamicConfigOperations {
}
public void persist(PropertiesStructure properties) {
- if (!dynamicConfigEnabled()) {
- throw new ValidationException(
- "Dynamic config change is not allowed. "
- + "Set dynamic.config.enabled property to 'true' to enabled it.");
- }
+ checkIfDynamicConfigEnabled();
properties.initAndValidate();
String yaml = serializeToYaml(properties);
@@ -124,8 +121,9 @@ public class DynamicConfigOperations {
}
public Mono uploadConfigRelatedFile(FilePart file) {
- String targetDirStr = (String) ctx.getEnvironment().getSystemEnvironment()
- .getOrDefault(CONFIG_RELATED_UPLOADS_DIR_PROPERTY, CONFIG_RELATED_UPLOADS_DIR_DEFAULT);
+ checkIfDynamicConfigEnabled();
+ String targetDirStr = ctx.getEnvironment()
+ .getProperty(CONFIG_RELATED_UPLOADS_DIR_PROPERTY, CONFIG_RELATED_UPLOADS_DIR_DEFAULT);
Path targetDir = Path.of(targetDirStr);
if (!Files.exists(targetDir)) {
@@ -149,6 +147,14 @@ public class DynamicConfigOperations {
.onErrorMap(th -> new FileUploadException(targetFilePath, th));
}
+ private void checkIfDynamicConfigEnabled() {
+ if (!dynamicConfigEnabled()) {
+ throw new ValidationException(
+ "Dynamic config change is not allowed. "
+ + "Set dynamic.config.enabled property to 'true' to enabled it.");
+ }
+ }
+
@SneakyThrows
private void writeYamlToFile(String yaml, Path path) {
if (Files.isDirectory(path)) {
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java
index 5ed21c6a6e..3d6b2ca40e 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java
@@ -1,24 +1,21 @@
package com.provectus.kafka.ui.util;
-import lombok.extern.slf4j.Slf4j;
+import java.util.Optional;
-@Slf4j
public final class KafkaVersion {
private KafkaVersion() {
}
- public static float parse(String version) throws NumberFormatException {
- log.trace("Parsing cluster version [{}]", version);
+ public static Optional parse(String version) throws NumberFormatException {
try {
final String[] parts = version.split("\\.");
if (parts.length > 2) {
version = parts[0] + "." + parts[1];
}
- return Float.parseFloat(version.split("-")[0]);
+ return Optional.of(Float.parseFloat(version.split("-")[0]));
} catch (Exception e) {
- log.error("Conversion clusterVersion [{}] to float value failed", version, e);
- throw e;
+ return Optional.empty();
}
}
}
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
index dbdfb67fd5..1938f93044 100644
--- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java
@@ -2,6 +2,7 @@ package com.provectus.kafka.ui;
import com.provectus.kafka.ui.container.KafkaConnectContainer;
import com.provectus.kafka.ui.container.SchemaRegistryContainer;
+import java.nio.file.Path;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
@@ -9,6 +10,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.function.ThrowingConsumer;
+import org.junit.jupiter.api.io.TempDir;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
@@ -47,6 +49,9 @@ public abstract class AbstractIntegrationTest {
.dependsOn(kafka)
.dependsOn(schemaRegistry);
+ @TempDir
+ public static Path tmpDir;
+
static {
kafka.start();
schemaRegistry.start();
@@ -76,6 +81,9 @@ public abstract class AbstractIntegrationTest {
System.setProperty("kafka.clusters.1.schemaRegistry", schemaRegistry.getUrl());
System.setProperty("kafka.clusters.1.kafkaConnect.0.name", "kafka-connect");
System.setProperty("kafka.clusters.1.kafkaConnect.0.address", kafkaConnect.getTarget());
+
+ System.setProperty("dynamic.config.enabled", "true");
+ System.setProperty("config.related.uploads.dir", tmpDir.toString());
}
}
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/controller/ApplicationConfigControllerTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/controller/ApplicationConfigControllerTest.java
new file mode 100644
index 0000000000..7840760868
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/controller/ApplicationConfigControllerTest.java
@@ -0,0 +1,49 @@
+package com.provectus.kafka.ui.controller;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
+import java.io.IOException;
+import java.nio.file.Path;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.client.MultipartBodyBuilder;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.springframework.util.MultiValueMap;
+
+class ApplicationConfigControllerTest extends AbstractIntegrationTest {
+
+ @Autowired
+ private WebTestClient webTestClient;
+
+ @Test
+ public void testUpload() throws IOException {
+ var fileToUpload = new ClassPathResource("/fileForUploadTest.txt", this.getClass());
+
+ UploadedFileInfoDTO result = webTestClient
+ .post()
+ .uri("/api/config/relatedfiles")
+ .bodyValue(generateBody(fileToUpload))
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .expectBody(UploadedFileInfoDTO.class)
+ .returnResult()
+ .getResponseBody();
+
+ assertThat(result).isNotNull();
+ assertThat(result.getLocation()).isNotNull();
+ assertThat(Path.of(result.getLocation()))
+ .hasSameBinaryContentAs(fileToUpload.getFile().toPath());
+ }
+
+ private MultiValueMap> generateBody(ClassPathResource resource) {
+ MultipartBodyBuilder builder = new MultipartBodyBuilder();
+ builder.part("file", resource);
+ return builder.build();
+ }
+
+}
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java
new file mode 100644
index 0000000000..08ca4d1507
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java
@@ -0,0 +1,70 @@
+package com.provectus.kafka.ui.service.acl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class AclCsvTest {
+
+ private static final List TEST_BINDINGS = List.of(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW)),
+ new AclBinding(
+ new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED),
+ new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY))
+ );
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ "Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
+ + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
+ + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost",
+
+ //without header
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
+ + "\n"
+ + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"
+ + "\n"
+ })
+ void parsesValidInputCsv(String csvString) {
+ Collection parsed = AclCsv.parseCsv(csvString);
+ assertThat(parsed).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS);
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ // columns > 7
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*,1,2,3,4",
+ // columns < 7
+ "User:test1,TOPIC,LITERAL,*",
+ // enum values are illegal
+ "User:test1,ILLEGAL,LITERAL,*,READ,ALLOW,*",
+ "User:test1,TOPIC,LITERAL,*,READ,ILLEGAL,*"
+ })
+ void throwsExceptionForInvalidInputCsv(String csvString) {
+ assertThatThrownBy(() -> AclCsv.parseCsv(csvString))
+ .isInstanceOf(ValidationException.class);
+ }
+
+ @Test
+ void transformAndParseUseSameFormat() {
+ String csv = AclCsv.transformToCsvString(TEST_BINDINGS);
+ Collection parsedBindings = AclCsv.parseCsv(csv);
+ assertThat(parsedBindings).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS);
+ }
+
+}
diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java
new file mode 100644
index 0000000000..5791bb2041
--- /dev/null
+++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java
@@ -0,0 +1,82 @@
+package com.provectus.kafka.ui.service.acl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.AdminClientService;
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.util.Collection;
+import java.util.List;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import reactor.core.publisher.Mono;
+
+class AclsServiceTest {
+
+ private static final KafkaCluster CLUSTER = KafkaCluster.builder().build();
+
+ private final ReactiveAdminClient adminClientMock = mock(ReactiveAdminClient.class);
+ private final AdminClientService adminClientService = mock(AdminClientService.class);
+
+ private final AclsService aclsService = new AclsService(adminClientService);
+
+ @BeforeEach
+ void initMocks() {
+ when(adminClientService.get(CLUSTER)).thenReturn(Mono.just(adminClientMock));
+ }
+
+ @Test
+ void testSyncAclWithAclCsv() {
+ var existingBinding1 = new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
+ new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW));
+
+ var existingBinding2 = new AclBinding(
+ new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED),
+ new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY));
+
+ var newBindingToBeAdded = new AclBinding(
+ new ResourcePattern(ResourceType.GROUP, "groupNew", PatternType.PREFIXED),
+ new AccessControlEntry("User:test3", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY));
+
+ when(adminClientMock.listAcls(ResourcePatternFilter.ANY))
+ .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2)));
+
+ ArgumentCaptor> createdCaptor = ArgumentCaptor.forClass(Collection.class);
+ when(adminClientMock.createAcls((Collection) createdCaptor.capture()))
+ .thenReturn(Mono.empty());
+
+ ArgumentCaptor> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
+ when(adminClientMock.deleteAcls((Collection) deletedCaptor.capture()))
+ .thenReturn(Mono.empty());
+
+ aclsService.syncAclWithAclCsv(
+ CLUSTER,
+ "Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
+ + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
+ + "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
+ ).block();
+
+ Collection createdBindings = (Collection) createdCaptor.getValue();
+ assertThat(createdBindings)
+ .hasSize(1)
+ .contains(newBindingToBeAdded);
+
+ Collection deletedBindings = (Collection) deletedCaptor.getValue();
+ assertThat(deletedBindings)
+ .hasSize(1)
+ .contains(existingBinding2);
+ }
+
+}
diff --git a/kafka-ui-api/src/test/resources/fileForUploadTest.txt b/kafka-ui-api/src/test/resources/fileForUploadTest.txt
new file mode 100644
index 0000000000..cc58280d07
--- /dev/null
+++ b/kafka-ui-api/src/test/resources/fileForUploadTest.txt
@@ -0,0 +1 @@
+some content goes here
diff --git a/kafka-ui-contract/pom.xml b/kafka-ui-contract/pom.xml
index f99f20d3d8..0d8e238368 100644
--- a/kafka-ui-contract/pom.xml
+++ b/kafka-ui-contract/pom.xml
@@ -101,9 +101,6 @@
true
java8
-
- filepart=org.springframework.http.codec.multipart.FilePart
-
diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
index 2bafb05faa..b89f8d0963 100644
--- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
+++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
@@ -1730,6 +1730,125 @@ paths:
404:
description: Not found
+ /api/clusters/{clusterName}/acls:
+ get:
+ tags:
+ - Acls
+ summary: listKafkaAcls
+ operationId: listAcls
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: resourceType
+ in: query
+ required: false
+ schema:
+ $ref: '#/components/schemas/KafkaAclResourceType'
+ - name: resourceName
+ in: query
+ required: false
+ schema:
+ type: string
+ - name: namePatternType
+ in: query
+ required: false
+ schema:
+ $ref: '#/components/schemas/KafkaAclNamePatternType'
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/KafkaAcl'
+
+ /api/clusters/{clusterName}/acl/csv:
+ get:
+ tags:
+ - Acls
+ summary: getAclAsCsv
+ operationId: getAclAsCsv
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ text/plain:
+ schema:
+ type: string
+ post:
+ tags:
+ - Acls
+ summary: syncAclsCsv
+ operationId: syncAclsCsv
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ text/plain:
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+
+ /api/clusters/{clusterName}/acl:
+ post:
+ tags:
+ - Acls
+ summary: createAcl
+ operationId: createAcl
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/KafkaAcl'
+ responses:
+ 200:
+ description: OK
+
+ delete:
+ tags:
+ - Acls
+ summary: deleteAcl
+ operationId: deleteAcl
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/KafkaAcl'
+ responses:
+ 200:
+ description: OK
+ 404:
+ description: Acl not found
+
/api/authorization:
get:
tags:
@@ -1819,7 +1938,7 @@ paths:
properties:
file:
type: string
- format: filepart
+ format: binary
responses:
200:
description: OK
@@ -1972,6 +2091,8 @@ components:
- KAFKA_CONNECT
- KSQL_DB
- TOPIC_DELETION
+ - KAFKA_ACL_VIEW # get ACLs listing
+ - KAFKA_ACL_EDIT # create & delete ACLs
required:
- id
- name
@@ -3342,6 +3463,62 @@ components:
- SCHEMA
- CONNECT
- KSQL
+ - ACL
+
+ KafkaAcl:
+ type: object
+ required: [resourceType, resourceName, namePatternType, principal, host, operation, permission]
+ properties:
+ resourceType:
+ $ref: '#/components/schemas/KafkaAclResourceType'
+ resourceName:
+ type: string # "*" if acl can be applied to any resource of given type
+ namePatternType:
+ $ref: '#/components/schemas/KafkaAclNamePatternType'
+ principal:
+ type: string
+ host:
+ type: string # "*" if acl can be applied to any resource of given type
+ operation:
+ type: string
+ enum:
+ - UNKNOWN # Unknown operation, need to update mapping code on BE
+ - ALL # Cluster, Topic, Group
+ - READ # Topic, Group
+ - WRITE # Topic, TransactionalId
+ - CREATE # Cluster, Topic
+ - DELETE # Topic, Group
+ - ALTER # Cluster, Topic,
+ - DESCRIBE # Cluster, Topic, Group, TransactionalId, DelegationToken
+ - CLUSTER_ACTION # Cluster
+ - DESCRIBE_CONFIGS # Cluster, Topic
+ - ALTER_CONFIGS # Cluster, Topic
+ - IDEMPOTENT_WRITE # Cluster
+ - CREATE_TOKENS
+ - DESCRIBE_TOKENS
+ permission:
+ type: string
+ enum:
+ - ALLOW
+ - DENY
+
+ KafkaAclResourceType:
+ type: string
+ enum:
+ - UNKNOWN # Unknown operation, need to update mapping code on BE
+ - TOPIC
+ - GROUP
+ - CLUSTER
+ - TRANSACTIONAL_ID
+ - DELEGATION_TOKEN
+ - USER
+
+ KafkaAclNamePatternType:
+ type: string
+ enum:
+ - MATCH
+ - LITERAL
+ - PREFIXED
RestartRequest:
type: object
diff --git a/kafka-ui-react-app/src/components/Connect/Details/Config/Config.tsx b/kafka-ui-react-app/src/components/Connect/Details/Config/Config.tsx
index 0e86d48940..8a372e9d12 100644
--- a/kafka-ui-react-app/src/components/Connect/Details/Config/Config.tsx
+++ b/kafka-ui-react-app/src/components/Connect/Details/Config/Config.tsx
@@ -37,7 +37,7 @@ const Config: React.FC = () => {
formState: { isDirty, isSubmitting, isValid, errors },
setValue,
} = useForm({
- mode: 'onTouched',
+ mode: 'onChange',
resolver: yupResolver(validationSchema),
defaultValues: {
config: JSON.stringify(config, null, '\t'),
diff --git a/kafka-ui-react-app/src/components/Schemas/Edit/Form.tsx b/kafka-ui-react-app/src/components/Schemas/Edit/Form.tsx
index 2fce1ad7d7..56d7bdc817 100644
--- a/kafka-ui-react-app/src/components/Schemas/Edit/Form.tsx
+++ b/kafka-ui-react-app/src/components/Schemas/Edit/Form.tsx
@@ -55,7 +55,7 @@ const Form: React.FC = () => {
yup.object().shape({
newSchema:
schema?.schemaType === SchemaType.PROTOBUF
- ? yup.string().required().isEnum('Schema syntax is not valid')
+ ? yup.string().required()
: yup.string().required().isJsonObject('Schema syntax is not valid'),
});
const methods = useForm({
diff --git a/kafka-ui-react-app/src/lib/__test__/yupExtended.spec.ts b/kafka-ui-react-app/src/lib/__test__/yupExtended.spec.ts
index 8100b9a326..bd43dd3f72 100644
--- a/kafka-ui-react-app/src/lib/__test__/yupExtended.spec.ts
+++ b/kafka-ui-react-app/src/lib/__test__/yupExtended.spec.ts
@@ -1,19 +1,5 @@
-import { isValidEnum, isValidJsonObject } from 'lib/yupExtended';
+import { isValidJsonObject } from 'lib/yupExtended';
-const invalidEnum = `
-ennum SchemType {
- AVRO = 0;
- JSON = 1;
- PROTOBUF = 3;
-}
-`;
-const validEnum = `
-enum SchemType {
- AVRO = 0;
- JSON = 1;
- PROTOBUF = 3;
-}
-`;
describe('yup extended', () => {
describe('isValidJsonObject', () => {
it('returns false for no value', () => {
@@ -35,21 +21,4 @@ describe('yup extended', () => {
expect(isValidJsonObject('{ "foo": "bar" }')).toBeTruthy();
});
});
-
- describe('isValidEnum', () => {
- it('returns false for invalid enum', () => {
- expect(isValidEnum(invalidEnum)).toBeFalsy();
- });
- it('returns false for no value', () => {
- expect(isValidEnum()).toBeFalsy();
- });
- it('returns true should trim value', () => {
- expect(
- isValidEnum(` enum SchemType {AVRO = 0; PROTOBUF = 3;} `)
- ).toBeTruthy();
- });
- it('returns true for valid enum', () => {
- expect(isValidEnum(validEnum)).toBeTruthy();
- });
- });
});
diff --git a/kafka-ui-react-app/src/lib/yupExtended.ts b/kafka-ui-react-app/src/lib/yupExtended.ts
index 4c662ca822..241dac9770 100644
--- a/kafka-ui-react-app/src/lib/yupExtended.ts
+++ b/kafka-ui-react-app/src/lib/yupExtended.ts
@@ -10,7 +10,6 @@ declare module 'yup' {
TFlags extends yup.Flags = ''
> extends yup.Schema {
isJsonObject(message?: string): StringSchema;
- isEnum(message?: string): StringSchema;
}
}
@@ -40,32 +39,6 @@ const isJsonObject = (message?: string) => {
isValidJsonObject
);
};
-
-export const isValidEnum = (value?: string) => {
- try {
- if (!value) return false;
- const trimmedValue = value.trim();
- if (
- trimmedValue.indexOf('enum') === 0 &&
- trimmedValue.lastIndexOf('}') === trimmedValue.length - 1
- ) {
- return true;
- }
- } catch {
- // do nothing
- }
- return false;
-};
-
-const isEnum = (message?: string) => {
- return yup.string().test(
- 'isEnum',
- // eslint-disable-next-line no-template-curly-in-string
- message || '${path} is not Enum object',
- isValidEnum
- );
-};
-
/**
* due to yup rerunning all the object validiation during any render,
* it makes sense to cache the async results
@@ -88,7 +61,6 @@ export function cacheTest(
}
yup.addMethod(yup.StringSchema, 'isJsonObject', isJsonObject);
-yup.addMethod(yup.StringSchema, 'isEnum', isEnum);
export const topicFormValidationSchema = yup.object().shape({
name: yup