From 690dcd3f74ecb11e2a827f48c189179e502fc9e1 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Tue, 2 May 2023 15:58:54 +0400 Subject: [PATCH 1/5] Wizard file upload fix (#3762) Removing manual FilePart openapi mapping - using default generator. File upload test added --- .../ApplicationConfigController.java | 12 +++-- .../ui/util/DynamicConfigOperations.java | 20 +++++--- .../kafka/ui/AbstractIntegrationTest.java | 8 +++ .../ApplicationConfigControllerTest.java | 49 +++++++++++++++++++ .../src/test/resources/fileForUploadTest.txt | 1 + kafka-ui-contract/pom.xml | 3 -- .../main/resources/swagger/kafka-ui-api.yaml | 2 +- 7 files changed, 80 insertions(+), 15 deletions(-) create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/controller/ApplicationConfigControllerTest.java create mode 100644 kafka-ui-api/src/test/resources/fileForUploadTest.txt diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java index 571250ba94..df04b40fab 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java @@ -27,6 +27,7 @@ import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers; import org.springframework.http.ResponseEntity; import org.springframework.http.codec.multipart.FilePart; +import org.springframework.http.codec.multipart.Part; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; @@ -92,16 +93,19 @@ public class ApplicationConfigController implements ApplicationConfigApi { } @Override - public Mono> uploadConfigRelatedFile(FilePart file, ServerWebExchange exchange) { + public Mono> uploadConfigRelatedFile(Flux fileFlux, + ServerWebExchange exchange) { return accessControlService .validateAccess( AccessContext.builder() .applicationConfigActions(EDIT) .build() ) - .then(dynamicConfigOperations.uploadConfigRelatedFile(file)) - .map(path -> new UploadedFileInfoDTO().location(path.toString())) - .map(ResponseEntity::ok); + .then(fileFlux.single()) + .flatMap(file -> + dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file) + .map(path -> new UploadedFileInfoDTO().location(path.toString())) + .map(ResponseEntity::ok)); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java index 75c6d25f95..68f826bd0f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/DynamicConfigOperations.java @@ -90,6 +90,7 @@ public class DynamicConfigOperations { } public PropertiesStructure getCurrentProperties() { + checkIfDynamicConfigEnabled(); return PropertiesStructure.builder() .kafka(getNullableBean(ClustersProperties.class)) .rbac(getNullableBean(RoleBasedAccessControlProperties.class)) @@ -112,11 +113,7 @@ public class DynamicConfigOperations { } public void persist(PropertiesStructure properties) { - if (!dynamicConfigEnabled()) { - throw new ValidationException( - "Dynamic config change is not allowed. " - + "Set dynamic.config.enabled property to 'true' to enabled it."); - } + checkIfDynamicConfigEnabled(); properties.initAndValidate(); String yaml = serializeToYaml(properties); @@ -124,8 +121,9 @@ public class DynamicConfigOperations { } public Mono uploadConfigRelatedFile(FilePart file) { - String targetDirStr = (String) ctx.getEnvironment().getSystemEnvironment() - .getOrDefault(CONFIG_RELATED_UPLOADS_DIR_PROPERTY, CONFIG_RELATED_UPLOADS_DIR_DEFAULT); + checkIfDynamicConfigEnabled(); + String targetDirStr = ctx.getEnvironment() + .getProperty(CONFIG_RELATED_UPLOADS_DIR_PROPERTY, CONFIG_RELATED_UPLOADS_DIR_DEFAULT); Path targetDir = Path.of(targetDirStr); if (!Files.exists(targetDir)) { @@ -149,6 +147,14 @@ public class DynamicConfigOperations { .onErrorMap(th -> new FileUploadException(targetFilePath, th)); } + private void checkIfDynamicConfigEnabled() { + if (!dynamicConfigEnabled()) { + throw new ValidationException( + "Dynamic config change is not allowed. " + + "Set dynamic.config.enabled property to 'true' to enabled it."); + } + } + @SneakyThrows private void writeYamlToFile(String yaml, Path path) { if (Files.isDirectory(path)) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java index dbdfb67fd5..1938f93044 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui; import com.provectus.kafka.ui.container.KafkaConnectContainer; import com.provectus.kafka.ui.container.SchemaRegistryContainer; +import java.nio.file.Path; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.admin.AdminClient; @@ -9,6 +10,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.function.ThrowingConsumer; +import org.junit.jupiter.api.io.TempDir; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; import org.springframework.boot.test.context.SpringBootTest; @@ -47,6 +49,9 @@ public abstract class AbstractIntegrationTest { .dependsOn(kafka) .dependsOn(schemaRegistry); + @TempDir + public static Path tmpDir; + static { kafka.start(); schemaRegistry.start(); @@ -76,6 +81,9 @@ public abstract class AbstractIntegrationTest { System.setProperty("kafka.clusters.1.schemaRegistry", schemaRegistry.getUrl()); System.setProperty("kafka.clusters.1.kafkaConnect.0.name", "kafka-connect"); System.setProperty("kafka.clusters.1.kafkaConnect.0.address", kafkaConnect.getTarget()); + + System.setProperty("dynamic.config.enabled", "true"); + System.setProperty("config.related.uploads.dir", tmpDir.toString()); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/controller/ApplicationConfigControllerTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/controller/ApplicationConfigControllerTest.java new file mode 100644 index 0000000000..7840760868 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/controller/ApplicationConfigControllerTest.java @@ -0,0 +1,49 @@ +package com.provectus.kafka.ui.controller; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.AbstractIntegrationTest; +import com.provectus.kafka.ui.model.UploadedFileInfoDTO; +import java.io.IOException; +import java.nio.file.Path; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.ClassPathResource; +import org.springframework.http.HttpEntity; +import org.springframework.http.client.MultipartBodyBuilder; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.util.MultiValueMap; + +class ApplicationConfigControllerTest extends AbstractIntegrationTest { + + @Autowired + private WebTestClient webTestClient; + + @Test + public void testUpload() throws IOException { + var fileToUpload = new ClassPathResource("/fileForUploadTest.txt", this.getClass()); + + UploadedFileInfoDTO result = webTestClient + .post() + .uri("/api/config/relatedfiles") + .bodyValue(generateBody(fileToUpload)) + .exchange() + .expectStatus() + .isOk() + .expectBody(UploadedFileInfoDTO.class) + .returnResult() + .getResponseBody(); + + assertThat(result).isNotNull(); + assertThat(result.getLocation()).isNotNull(); + assertThat(Path.of(result.getLocation())) + .hasSameBinaryContentAs(fileToUpload.getFile().toPath()); + } + + private MultiValueMap> generateBody(ClassPathResource resource) { + MultipartBodyBuilder builder = new MultipartBodyBuilder(); + builder.part("file", resource); + return builder.build(); + } + +} diff --git a/kafka-ui-api/src/test/resources/fileForUploadTest.txt b/kafka-ui-api/src/test/resources/fileForUploadTest.txt new file mode 100644 index 0000000000..cc58280d07 --- /dev/null +++ b/kafka-ui-api/src/test/resources/fileForUploadTest.txt @@ -0,0 +1 @@ +some content goes here diff --git a/kafka-ui-contract/pom.xml b/kafka-ui-contract/pom.xml index f99f20d3d8..0d8e238368 100644 --- a/kafka-ui-contract/pom.xml +++ b/kafka-ui-contract/pom.xml @@ -101,9 +101,6 @@ true java8 - - filepart=org.springframework.http.codec.multipart.FilePart - diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 2bafb05faa..b589198b5a 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1819,7 +1819,7 @@ paths: properties: file: type: string - format: filepart + format: binary responses: 200: description: OK From 727f38401babcf25d5bb47e675149882ff3ede14 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Tue, 2 May 2023 16:34:57 +0400 Subject: [PATCH 2/5] Expose cluster ACL list (#2818) --- documentation/compose/jaas/kafka_server.conf | 6 +- .../compose/jaas/zookeeper_jaas.conf | 4 + .../compose/kafka-ui-acl-with-zk.yaml | 59 ++++++ .../kafka/ui/controller/AclsController.java | 115 ++++++++++++ .../kafka/ui/mapper/ClusterMapper.java | 81 +++++++- .../kafka/ui/model/ClusterFeature.java | 4 +- .../kafka/ui/model/rbac/AccessContext.java | 12 +- .../kafka/ui/model/rbac/Permission.java | 2 + .../kafka/ui/model/rbac/Resource.java | 3 +- .../ui/model/rbac/permission/AclAction.java | 15 ++ .../kafka/ui/service/FeatureService.java | 23 ++- .../kafka/ui/service/ReactiveAdminClient.java | 85 ++++++--- .../kafka/ui/service/acl/AclCsv.java | 81 ++++++++ .../kafka/ui/service/acl/AclsService.java | 93 +++++++++ .../service/metrics/JmxSslSocketFactory.java | 4 +- .../provectus/kafka/ui/util/KafkaVersion.java | 11 +- .../kafka/ui/service/acl/AclCsvTest.java | 70 +++++++ .../kafka/ui/service/acl/AclsServiceTest.java | 82 ++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 177 ++++++++++++++++++ 19 files changed, 886 insertions(+), 41 deletions(-) create mode 100644 documentation/compose/jaas/zookeeper_jaas.conf create mode 100644 documentation/compose/kafka-ui-acl-with-zk.yaml create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java diff --git a/documentation/compose/jaas/kafka_server.conf b/documentation/compose/jaas/kafka_server.conf index 25388be5aa..0c1fb34652 100644 --- a/documentation/compose/jaas/kafka_server.conf +++ b/documentation/compose/jaas/kafka_server.conf @@ -11,4 +11,8 @@ KafkaClient { user_admin="admin-secret"; }; -Client {}; +Client { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="zkuser" + password="zkuserpassword"; +}; diff --git a/documentation/compose/jaas/zookeeper_jaas.conf b/documentation/compose/jaas/zookeeper_jaas.conf new file mode 100644 index 0000000000..2d7fd1b1c2 --- /dev/null +++ b/documentation/compose/jaas/zookeeper_jaas.conf @@ -0,0 +1,4 @@ +Server { + org.apache.zookeeper.server.auth.DigestLoginModule required + user_zkuser="zkuserpassword"; +}; diff --git a/documentation/compose/kafka-ui-acl-with-zk.yaml b/documentation/compose/kafka-ui-acl-with-zk.yaml new file mode 100644 index 0000000000..e1d70b2970 --- /dev/null +++ b/documentation/compose/kafka-ui-acl-with-zk.yaml @@ -0,0 +1,59 @@ +--- +version: '2' +services: + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - zookeeper + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT + KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN + KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";' + + zookeeper: + image: wurstmeister/zookeeper:3.4.6 + environment: + JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf" + volumes: + - ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf + ports: + - 2181:2181 + + kafka: + image: confluentinc/cp-kafka:7.2.1 + hostname: kafka + container_name: kafka + ports: + - "9092:9092" + - "9997:9997" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf" + KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9997 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' + KAFKA_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT' + KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN' + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN' + KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT' + KAFKA_SUPER_USERS: 'User:admin' + volumes: + - ./scripts/update_run.sh:/tmp/update_run.sh + - ./jaas:/etc/kafka/jaas diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java new file mode 100644 index 0000000000..83d2ef553e --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java @@ -0,0 +1,115 @@ +package com.provectus.kafka.ui.controller; + +import com.provectus.kafka.ui.api.AclsApi; +import com.provectus.kafka.ui.mapper.ClusterMapper; +import com.provectus.kafka.ui.model.KafkaAclDTO; +import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO; +import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO; +import com.provectus.kafka.ui.model.rbac.AccessContext; +import com.provectus.kafka.ui.model.rbac.permission.AclAction; +import com.provectus.kafka.ui.service.acl.AclsService; +import com.provectus.kafka.ui.service.rbac.AccessControlService; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +public class AclsController extends AbstractController implements AclsApi { + + private final AclsService aclsService; + private final AccessControlService accessControlService; + + @Override + public Mono> createAcl(String clusterName, Mono kafkaAclDto, + ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(kafkaAclDto) + .map(ClusterMapper::toAclBinding) + .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono> deleteAcl(String clusterName, Mono kafkaAclDto, + ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(kafkaAclDto) + .map(ClusterMapper::toAclBinding) + .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding)) + .thenReturn(ResponseEntity.ok().build()); + } + + @Override + public Mono>> listAcls(String clusterName, + KafkaAclResourceTypeDTO resourceTypeDto, + String resourceName, + KafkaAclNamePatternTypeDTO namePatternTypeDto, + ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.VIEW) + .build(); + + var resourceType = Optional.ofNullable(resourceTypeDto) + .map(ClusterMapper::mapAclResourceTypeDto) + .orElse(ResourceType.ANY); + + var namePatternType = Optional.ofNullable(namePatternTypeDto) + .map(ClusterMapper::mapPatternTypeDto) + .orElse(PatternType.ANY); + + var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType); + + return accessControlService.validateAccess(context).then( + Mono.just( + ResponseEntity.ok( + aclsService.listAcls(getCluster(clusterName), filter) + .map(ClusterMapper::toKafkaAclDto))) + ); + } + + @Override + public Mono> getAclAsCsv(String clusterName, ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.VIEW) + .build(); + + return accessControlService.validateAccess(context).then( + aclsService.getAclAsCsvString(getCluster(clusterName)) + .map(ResponseEntity::ok) + .flatMap(Mono::just) + ); + } + + @Override + public Mono> syncAclsCsv(String clusterName, Mono csvMono, ServerWebExchange exchange) { + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .aclActions(AclAction.EDIT) + .build(); + + return accessControlService.validateAccess(context) + .then(csvMono) + .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv)) + .thenReturn(ResponseEntity.ok().build()); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index d989ce93ba..a122a269a4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -20,6 +20,9 @@ import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalTopic; import com.provectus.kafka.ui.model.InternalTopicConfig; +import com.provectus.kafka.ui.model.KafkaAclDTO; +import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO; +import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO; import com.provectus.kafka.ui.model.MetricDTO; import com.provectus.kafka.ui.model.Metrics; import com.provectus.kafka.ui.model.PartitionDTO; @@ -27,12 +30,18 @@ import com.provectus.kafka.ui.model.ReplicaDTO; import com.provectus.kafka.ui.model.TopicConfigDTO; import com.provectus.kafka.ui.model.TopicDTO; import com.provectus.kafka.ui.model.TopicDetailsDTO; -import com.provectus.kafka.ui.service.masking.DataMasking; import com.provectus.kafka.ui.service.metrics.RawMetric; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; import org.mapstruct.Mapper; import org.mapstruct.Mapping; @@ -109,8 +118,74 @@ public interface ClusterMapper { return brokerDiskUsage; } - default DataMasking map(List maskingProperties) { - return DataMasking.create(maskingProperties); + static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) { + return switch (operation) { + case ALL -> KafkaAclDTO.OperationEnum.ALL; + case READ -> KafkaAclDTO.OperationEnum.READ; + case WRITE -> KafkaAclDTO.OperationEnum.WRITE; + case CREATE -> KafkaAclDTO.OperationEnum.CREATE; + case DELETE -> KafkaAclDTO.OperationEnum.DELETE; + case ALTER -> KafkaAclDTO.OperationEnum.ALTER; + case DESCRIBE -> KafkaAclDTO.OperationEnum.DESCRIBE; + case CLUSTER_ACTION -> KafkaAclDTO.OperationEnum.CLUSTER_ACTION; + case DESCRIBE_CONFIGS -> KafkaAclDTO.OperationEnum.DESCRIBE_CONFIGS; + case ALTER_CONFIGS -> KafkaAclDTO.OperationEnum.ALTER_CONFIGS; + case IDEMPOTENT_WRITE -> KafkaAclDTO.OperationEnum.IDEMPOTENT_WRITE; + case CREATE_TOKENS -> KafkaAclDTO.OperationEnum.CREATE_TOKENS; + case DESCRIBE_TOKENS -> KafkaAclDTO.OperationEnum.DESCRIBE_TOKENS; + case ANY -> throw new IllegalArgumentException("ANY operation can be only part of filter"); + case UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN; + }; + } + + static KafkaAclResourceTypeDTO mapAclResourceType(ResourceType resourceType) { + return switch (resourceType) { + case CLUSTER -> KafkaAclResourceTypeDTO.CLUSTER; + case TOPIC -> KafkaAclResourceTypeDTO.TOPIC; + case GROUP -> KafkaAclResourceTypeDTO.GROUP; + case DELEGATION_TOKEN -> KafkaAclResourceTypeDTO.DELEGATION_TOKEN; + case TRANSACTIONAL_ID -> KafkaAclResourceTypeDTO.TRANSACTIONAL_ID; + case USER -> KafkaAclResourceTypeDTO.USER; + case ANY -> throw new IllegalArgumentException("ANY type can be only part of filter"); + case UNKNOWN -> KafkaAclResourceTypeDTO.UNKNOWN; + }; + } + + static ResourceType mapAclResourceTypeDto(KafkaAclResourceTypeDTO dto) { + return ResourceType.valueOf(dto.name()); + } + + static PatternType mapPatternTypeDto(KafkaAclNamePatternTypeDTO dto) { + return PatternType.valueOf(dto.name()); + } + + static AclBinding toAclBinding(KafkaAclDTO dto) { + return new AclBinding( + new ResourcePattern( + mapAclResourceTypeDto(dto.getResourceType()), + dto.getResourceName(), + mapPatternTypeDto(dto.getNamePatternType()) + ), + new AccessControlEntry( + dto.getPrincipal(), + dto.getHost(), + AclOperation.valueOf(dto.getOperation().name()), + AclPermissionType.valueOf(dto.getPermission().name()) + ) + ); + } + + static KafkaAclDTO toKafkaAclDto(AclBinding binding) { + var pattern = binding.pattern(); + var filter = binding.toFilter().entryFilter(); + return new KafkaAclDTO() + .resourceType(mapAclResourceType(pattern.resourceType())) + .resourceName(pattern.name()) + .namePatternType(KafkaAclNamePatternTypeDTO.fromValue(pattern.patternType().name())) + .principal(filter.principal()) + .host(filter.host()) + .operation(mapAclOperation(filter.operation())) + .permission(KafkaAclDTO.PermissionEnum.fromValue(filter.permissionType().name())); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java index 9731492f00..2973e5500d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java @@ -4,5 +4,7 @@ public enum ClusterFeature { KAFKA_CONNECT, KSQL_DB, SCHEMA_REGISTRY, - TOPIC_DELETION + TOPIC_DELETION, + KAFKA_ACL_VIEW, + KAFKA_ACL_EDIT } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java index 0c2587d681..45858093a7 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.model.rbac; +import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; @@ -37,6 +38,8 @@ public class AccessContext { Collection ksqlActions; + Collection aclActions; + public static AccessContextBuilder builder() { return new AccessContextBuilder(); } @@ -55,6 +58,7 @@ public class AccessContext { private String schema; private Collection schemaActions = Collections.emptySet(); private Collection ksqlActions = Collections.emptySet(); + private Collection aclActions = Collections.emptySet(); private AccessContextBuilder() { } @@ -131,6 +135,12 @@ public class AccessContext { return this; } + public AccessContextBuilder aclActions(AclAction... actions) { + Assert.isTrue(actions.length > 0, "actions not present"); + this.aclActions = List.of(actions); + return this; + } + public AccessContext build() { return new AccessContext( applicationConfigActions, @@ -140,7 +150,7 @@ public class AccessContext { connect, connectActions, connector, schema, schemaActions, - ksqlActions); + ksqlActions, aclActions); } } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java index afdcf0ca15..16f01f60e6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java @@ -4,6 +4,7 @@ import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.KSQL; +import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; @@ -76,6 +77,7 @@ public class Permission { case SCHEMA -> Arrays.stream(SchemaAction.values()).map(Enum::toString).toList(); case CONNECT -> Arrays.stream(ConnectAction.values()).map(Enum::toString).toList(); case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList(); + case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList(); }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java index 4b2c66361f..f71dfb2979 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java @@ -11,7 +11,8 @@ public enum Resource { CONSUMER, SCHEMA, CONNECT, - KSQL; + KSQL, + ACL; @Nullable public static Resource fromString(String name) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java new file mode 100644 index 0000000000..c86af7e72d --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java @@ -0,0 +1,15 @@ +package com.provectus.kafka.ui.model.rbac.permission; + +import org.apache.commons.lang3.EnumUtils; +import org.jetbrains.annotations.Nullable; + +public enum AclAction implements PermissibleAction { + + VIEW, + EDIT; + + @Nullable + public static AclAction fromString(String name) { + return EnumUtils.getEnum(AclAction.class, name); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index ec749abd14..7ba3f036e9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -2,16 +2,19 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.model.ClusterFeature; import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.Node; +import org.apache.kafka.common.acl.AclOperation; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -26,7 +29,7 @@ public class FeatureService { private final AdminClientService adminClientService; public Mono> getAvailableFeatures(KafkaCluster cluster, - ReactiveAdminClient.ClusterDescription clusterDescription) { + ClusterDescription clusterDescription) { List> features = new ArrayList<>(); if (Optional.ofNullable(cluster.getConnectsClients()) @@ -44,6 +47,8 @@ public class FeatureService { } features.add(topicDeletionEnabled(cluster, clusterDescription.getController())); + features.add(aclView(cluster)); + features.add(aclEdit(clusterDescription)); return Flux.fromIterable(features).flatMap(m -> m).collectList(); } @@ -65,4 +70,20 @@ public class FeatureService { ? Mono.just(ClusterFeature.TOPIC_DELETION) : Mono.empty()); } + + private Mono aclEdit(ClusterDescription clusterDescription) { + var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of()); + boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER); + return canEdit + ? Mono.just(ClusterFeature.KAFKA_ACL_EDIT) + : Mono.empty(); + } + + private Mono aclView(KafkaCluster cluster) { + return adminClientService.get(cluster).flatMap( + ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED) + ? Mono.just(ClusterFeature.KAFKA_ACL_VIEW) + : Mono.empty() + ); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 39332da39e..8451a89f97 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -5,6 +5,7 @@ import static java.util.stream.Collectors.toMap; import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableTable; import com.google.common.collect.Iterables; import com.google.common.collect.Table; @@ -15,7 +16,6 @@ import com.provectus.kafka.ui.util.KafkaVersion; import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant; import java.io.Closeable; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -61,16 +61,22 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -82,26 +88,29 @@ import reactor.util.function.Tuples; @RequiredArgsConstructor public class ReactiveAdminClient implements Closeable { - private enum SupportedFeature { + public enum SupportedFeature { INCREMENTAL_ALTER_CONFIGS(2.3f), CONFIG_DOCUMENTATION_RETRIEVAL(2.6f), - DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f); + DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f), + AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled); - private final float sinceVersion; + private final BiFunction> predicate; - SupportedFeature(float sinceVersion) { - this.sinceVersion = sinceVersion; + SupportedFeature(BiFunction> predicate) { + this.predicate = predicate; } - static Set forVersion(float kafkaVersion) { - return Arrays.stream(SupportedFeature.values()) - .filter(f -> kafkaVersion >= f.sinceVersion) + SupportedFeature(float fromVersion) { + this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion); + } + + static Mono> forVersion(AdminClient ac, @Nullable Float kafkaVersion) { + return Flux.fromArray(SupportedFeature.values()) + .flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled))) + .filter(Tuple2::getT2) + .map(Tuple2::getT1) .collect(Collectors.toSet()); } - - static Set defaultFeatures() { - return Set.of(); - } } @Value @@ -110,25 +119,31 @@ public class ReactiveAdminClient implements Closeable { Node controller; String clusterId; Collection nodes; + @Nullable // null, if ACL is disabled Set authorizedOperations; } public static Mono create(AdminClient adminClient) { return getClusterVersion(adminClient) - .map(ver -> - new ReactiveAdminClient( - adminClient, - ver, - getSupportedUpdateFeaturesForVersion(ver))); + .flatMap(ver -> + getSupportedUpdateFeaturesForVersion(adminClient, ver) + .map(features -> + new ReactiveAdminClient(adminClient, ver, features))); } - private static Set getSupportedUpdateFeaturesForVersion(String versionStr) { - try { - float version = KafkaVersion.parse(versionStr); - return SupportedFeature.forVersion(version); - } catch (NumberFormatException e) { - return SupportedFeature.defaultFeatures(); - } + private static Mono> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) { + @Nullable Float kafkaVersion = KafkaVersion.parse(versionStr).orElse(null); + return SupportedFeature.forVersion(ac, kafkaVersion); + } + + private static Mono isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) { + return toMono(ac.describeAcls(AclBindingFilter.ANY).values()) + .thenReturn(true) + .doOnError(th -> !(th instanceof SecurityDisabledException) + && !(th instanceof InvalidRequestException) + && !(th instanceof UnsupportedVersionException), + th -> log.warn("Error checking if security enabled", th)) + .onErrorReturn(false); } // NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results @@ -162,6 +177,10 @@ public class ReactiveAdminClient implements Closeable { private final String version; private final Set features; + public Set getClusterFeatures() { + return features; + } + public Mono> listTopics(boolean listInternal) { return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names()); } @@ -576,6 +595,22 @@ public class ReactiveAdminClient implements Closeable { ); } + public Mono> listAcls(ResourcePatternFilter filter) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values()); + } + + public Mono createAcls(Collection aclBindings) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + return toMono(client.createAcls(aclBindings).all()); + } + + public Mono deleteAcls(Collection aclBindings) { + Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); + var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet()); + return toMono(client.deleteAcls(filters).all()).then(); + } + public Mono updateBrokerConfigByName(Integer brokerId, String name, String value) { ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java new file mode 100644 index 0000000000..673b17ee1f --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclCsv.java @@ -0,0 +1,81 @@ +package com.provectus.kafka.ui.service.acl; + +import com.provectus.kafka.ui.exception.ValidationException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; + +public class AclCsv { + + private static final String LINE_SEPARATOR = System.lineSeparator(); + private static final String VALUES_SEPARATOR = ","; + private static final String HEADER = "Principal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host"; + + public static String transformToCsvString(Collection acls) { + return Stream.concat(Stream.of(HEADER), acls.stream().map(AclCsv::createAclString)) + .collect(Collectors.joining(System.lineSeparator())); + } + + public static String createAclString(AclBinding binding) { + var pattern = binding.pattern(); + var filter = binding.toFilter().entryFilter(); + return String.format( + "%s,%s,%s,%s,%s,%s,%s", + filter.principal(), + pattern.resourceType(), + pattern.patternType(), + pattern.name(), + filter.operation(), + filter.permissionType(), + filter.host() + ); + } + + private static AclBinding parseCsvLine(String csv, int line) { + String[] values = csv.split(VALUES_SEPARATOR); + if (values.length != 7) { + throw new ValidationException("Input csv is not valid - there should be 7 columns in line " + line); + } + for (int i = 0; i < values.length; i++) { + if ((values[i] = values[i].trim()).isBlank()) { + throw new ValidationException("Input csv is not valid - blank value in colum " + i + ", line " + line); + } + } + try { + return new AclBinding( + new ResourcePattern( + ResourceType.valueOf(values[1]), values[3], PatternType.valueOf(values[2])), + new AccessControlEntry( + values[0], values[6], AclOperation.valueOf(values[4]), AclPermissionType.valueOf(values[5])) + ); + } catch (IllegalArgumentException enumParseError) { + throw new ValidationException("Error parsing enum value in line " + line); + } + } + + public static Collection parseCsv(String csvString) { + String[] lines = csvString.split(LINE_SEPARATOR); + if (lines.length == 0) { + throw new ValidationException("Error parsing ACL csv file: no lines in file"); + } + boolean firstLineIsHeader = HEADER.equalsIgnoreCase(lines[0].trim().replace(" ", "")); + Set result = new HashSet<>(); + for (int i = firstLineIsHeader ? 1 : 0; i < lines.length; i++) { + String line = lines[i]; + if (!line.isBlank()) { + AclBinding aclBinding = parseCsvLine(line, i); + result.add(aclBinding); + } + } + return result; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java new file mode 100644 index 0000000000..8c5a8dab06 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/acl/AclsService.java @@ -0,0 +1,93 @@ +package com.provectus.kafka.ui.service.acl; + +import com.google.common.collect.Sets; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import java.util.List; +import java.util.Set; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AclsService { + + private final AdminClientService adminClientService; + + public Mono createAcl(KafkaCluster cluster, AclBinding aclBinding) { + var aclString = AclCsv.createAclString(aclBinding); + log.info("CREATING ACL: [{}]", aclString); + return adminClientService.get(cluster) + .flatMap(ac -> ac.createAcls(List.of(aclBinding))) + .doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString)); + } + + public Mono deleteAcl(KafkaCluster cluster, AclBinding aclBinding) { + var aclString = AclCsv.createAclString(aclBinding); + log.info("DELETING ACL: [{}]", aclString); + return adminClientService.get(cluster) + .flatMap(ac -> ac.deleteAcls(List.of(aclBinding))) + .doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString)); + } + + public Flux listAcls(KafkaCluster cluster, ResourcePatternFilter filter) { + return adminClientService.get(cluster) + .flatMap(c -> c.listAcls(filter)) + .flatMapIterable(acls -> acls); + } + + public Mono getAclAsCsvString(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(c -> c.listAcls(ResourcePatternFilter.ANY)) + .map(AclCsv::transformToCsvString); + } + + public Mono syncAclWithAclCsv(KafkaCluster cluster, String csv) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.listAcls(ResourcePatternFilter.ANY).flatMap(existingAclList -> { + var existingSet = Set.copyOf(existingAclList); + var newAcls = Set.copyOf(AclCsv.parseCsv(csv)); + var toDelete = Sets.difference(existingSet, newAcls); + var toAdd = Sets.difference(newAcls, existingSet); + logAclSyncPlan(cluster, toAdd, toDelete); + if (toAdd.isEmpty() && toDelete.isEmpty()) { + return Mono.empty(); + } + log.info("Starting new ACLs creation"); + return ac.createAcls(toAdd) + .doOnSuccess(v -> { + log.info("{} new ACLs created", toAdd.size()); + log.info("Starting ACLs deletion"); + }) + .then(ac.deleteAcls(toDelete) + .doOnSuccess(v -> log.info("{} ACLs deleted", toDelete.size()))); + })); + } + + private void logAclSyncPlan(KafkaCluster cluster, Set toBeAdded, Set toBeDeleted) { + log.info("'{}' cluster ACL sync plan: ", cluster.getName()); + if (toBeAdded.isEmpty() && toBeDeleted.isEmpty()) { + log.info("Nothing to do, ACL is already in sync"); + return; + } + if (!toBeAdded.isEmpty()) { + log.info("ACLs to be added ({}): ", toBeAdded.size()); + for (AclBinding aclBinding : toBeAdded) { + log.info(" " + AclCsv.createAclString(aclBinding)); + } + } + if (!toBeDeleted.isEmpty()) { + log.info("ACLs to be deleted ({}): ", toBeDeleted.size()); + for (AclBinding aclBinding : toBeDeleted) { + log.info(" " + AclCsv.createAclString(aclBinding)); + } + } + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java index 06304365c7..fa84fc361c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/JmxSslSocketFactory.java @@ -61,7 +61,9 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory { } catch (Exception e) { log.error("----------------------------------"); log.error("SSL can't be enabled for JMX retrieval. " - + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg.", e); + + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}", + e.getMessage()); + log.trace("SSL can't be enabled for JMX retrieval", e); log.error("----------------------------------"); } SSL_JMX_SUPPORTED = sslJmxSupported; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java index 5ed21c6a6e..3d6b2ca40e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaVersion.java @@ -1,24 +1,21 @@ package com.provectus.kafka.ui.util; -import lombok.extern.slf4j.Slf4j; +import java.util.Optional; -@Slf4j public final class KafkaVersion { private KafkaVersion() { } - public static float parse(String version) throws NumberFormatException { - log.trace("Parsing cluster version [{}]", version); + public static Optional parse(String version) throws NumberFormatException { try { final String[] parts = version.split("\\."); if (parts.length > 2) { version = parts[0] + "." + parts[1]; } - return Float.parseFloat(version.split("-")[0]); + return Optional.of(Float.parseFloat(version.split("-")[0])); } catch (Exception e) { - log.error("Conversion clusterVersion [{}] to float value failed", version, e); - throw e; + return Optional.empty(); } } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java new file mode 100644 index 0000000000..08ca4d1507 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclCsvTest.java @@ -0,0 +1,70 @@ +package com.provectus.kafka.ui.service.acl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.provectus.kafka.ui.exception.ValidationException; +import java.util.Collection; +import java.util.List; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class AclCsvTest { + + private static final List TEST_BINDINGS = List.of( + new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), + new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW)), + new AclBinding( + new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED), + new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)) + ); + + @ParameterizedTest + @ValueSource(strings = { + "Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n" + + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost", + + //without header + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "\n" + + "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost" + + "\n" + }) + void parsesValidInputCsv(String csvString) { + Collection parsed = AclCsv.parseCsv(csvString); + assertThat(parsed).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS); + } + + @ParameterizedTest + @ValueSource(strings = { + // columns > 7 + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*,1,2,3,4", + // columns < 7 + "User:test1,TOPIC,LITERAL,*", + // enum values are illegal + "User:test1,ILLEGAL,LITERAL,*,READ,ALLOW,*", + "User:test1,TOPIC,LITERAL,*,READ,ILLEGAL,*" + }) + void throwsExceptionForInvalidInputCsv(String csvString) { + assertThatThrownBy(() -> AclCsv.parseCsv(csvString)) + .isInstanceOf(ValidationException.class); + } + + @Test + void transformAndParseUseSameFormat() { + String csv = AclCsv.transformToCsvString(TEST_BINDINGS); + Collection parsedBindings = AclCsv.parseCsv(csv); + assertThat(parsedBindings).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java new file mode 100644 index 0000000000..5791bb2041 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/acl/AclsServiceTest.java @@ -0,0 +1,82 @@ +package com.provectus.kafka.ui.service.acl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; +import java.util.Collection; +import java.util.List; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourcePatternFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import reactor.core.publisher.Mono; + +class AclsServiceTest { + + private static final KafkaCluster CLUSTER = KafkaCluster.builder().build(); + + private final ReactiveAdminClient adminClientMock = mock(ReactiveAdminClient.class); + private final AdminClientService adminClientService = mock(AdminClientService.class); + + private final AclsService aclsService = new AclsService(adminClientService); + + @BeforeEach + void initMocks() { + when(adminClientService.get(CLUSTER)).thenReturn(Mono.just(adminClientMock)); + } + + @Test + void testSyncAclWithAclCsv() { + var existingBinding1 = new AclBinding( + new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), + new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW)); + + var existingBinding2 = new AclBinding( + new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED), + new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)); + + var newBindingToBeAdded = new AclBinding( + new ResourcePattern(ResourceType.GROUP, "groupNew", PatternType.PREFIXED), + new AccessControlEntry("User:test3", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY)); + + when(adminClientMock.listAcls(ResourcePatternFilter.ANY)) + .thenReturn(Mono.just(List.of(existingBinding1, existingBinding2))); + + ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.createAcls((Collection) createdCaptor.capture())) + .thenReturn(Mono.empty()); + + ArgumentCaptor deletedCaptor = ArgumentCaptor.forClass(Collection.class); + when(adminClientMock.deleteAcls((Collection) deletedCaptor.capture())) + .thenReturn(Mono.empty()); + + aclsService.syncAclWithAclCsv( + CLUSTER, + "Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n" + + "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n" + + "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost" + ).block(); + + Collection createdBindings = (Collection) createdCaptor.getValue(); + assertThat(createdBindings) + .hasSize(1) + .contains(newBindingToBeAdded); + + Collection deletedBindings = (Collection) deletedCaptor.getValue(); + assertThat(deletedBindings) + .hasSize(1) + .contains(existingBinding2); + } + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index b589198b5a..b89f8d0963 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1730,6 +1730,125 @@ paths: 404: description: Not found + /api/clusters/{clusterName}/acls: + get: + tags: + - Acls + summary: listKafkaAcls + operationId: listAcls + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: resourceType + in: query + required: false + schema: + $ref: '#/components/schemas/KafkaAclResourceType' + - name: resourceName + in: query + required: false + schema: + type: string + - name: namePatternType + in: query + required: false + schema: + $ref: '#/components/schemas/KafkaAclNamePatternType' + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/KafkaAcl' + + /api/clusters/{clusterName}/acl/csv: + get: + tags: + - Acls + summary: getAclAsCsv + operationId: getAclAsCsv + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + text/plain: + schema: + type: string + post: + tags: + - Acls + summary: syncAclsCsv + operationId: syncAclsCsv + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + text/plain: + schema: + type: string + responses: + 200: + description: OK + + /api/clusters/{clusterName}/acl: + post: + tags: + - Acls + summary: createAcl + operationId: createAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KafkaAcl' + responses: + 200: + description: OK + + delete: + tags: + - Acls + summary: deleteAcl + operationId: deleteAcl + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KafkaAcl' + responses: + 200: + description: OK + 404: + description: Acl not found + /api/authorization: get: tags: @@ -1972,6 +2091,8 @@ components: - KAFKA_CONNECT - KSQL_DB - TOPIC_DELETION + - KAFKA_ACL_VIEW # get ACLs listing + - KAFKA_ACL_EDIT # create & delete ACLs required: - id - name @@ -3342,6 +3463,62 @@ components: - SCHEMA - CONNECT - KSQL + - ACL + + KafkaAcl: + type: object + required: [resourceType, resourceName, namePatternType, principal, host, operation, permission] + properties: + resourceType: + $ref: '#/components/schemas/KafkaAclResourceType' + resourceName: + type: string # "*" if acl can be applied to any resource of given type + namePatternType: + $ref: '#/components/schemas/KafkaAclNamePatternType' + principal: + type: string + host: + type: string # "*" if acl can be applied to any resource of given type + operation: + type: string + enum: + - UNKNOWN # Unknown operation, need to update mapping code on BE + - ALL # Cluster, Topic, Group + - READ # Topic, Group + - WRITE # Topic, TransactionalId + - CREATE # Cluster, Topic + - DELETE # Topic, Group + - ALTER # Cluster, Topic, + - DESCRIBE # Cluster, Topic, Group, TransactionalId, DelegationToken + - CLUSTER_ACTION # Cluster + - DESCRIBE_CONFIGS # Cluster, Topic + - ALTER_CONFIGS # Cluster, Topic + - IDEMPOTENT_WRITE # Cluster + - CREATE_TOKENS + - DESCRIBE_TOKENS + permission: + type: string + enum: + - ALLOW + - DENY + + KafkaAclResourceType: + type: string + enum: + - UNKNOWN # Unknown operation, need to update mapping code on BE + - TOPIC + - GROUP + - CLUSTER + - TRANSACTIONAL_ID + - DELEGATION_TOKEN + - USER + + KafkaAclNamePatternType: + type: string + enum: + - MATCH + - LITERAL + - PREFIXED RestartRequest: type: object From 86a7ba44fb4b47d60b43e43e6854e7c0962ed82f Mon Sep 17 00:00:00 2001 From: David Bejanyan <58771979+David-DB88@users.noreply.github.com> Date: Sat, 6 May 2023 21:10:31 +0400 Subject: [PATCH 3/5] FE: SR: Fix updating an existing schema with valid syntax says the syntax is invalid (#3746) --- .../src/components/Schemas/Edit/Form.tsx | 2 +- .../src/lib/__test__/yupExtended.spec.ts | 33 +------------------ kafka-ui-react-app/src/lib/yupExtended.ts | 28 ---------------- 3 files changed, 2 insertions(+), 61 deletions(-) diff --git a/kafka-ui-react-app/src/components/Schemas/Edit/Form.tsx b/kafka-ui-react-app/src/components/Schemas/Edit/Form.tsx index 2fce1ad7d7..56d7bdc817 100644 --- a/kafka-ui-react-app/src/components/Schemas/Edit/Form.tsx +++ b/kafka-ui-react-app/src/components/Schemas/Edit/Form.tsx @@ -55,7 +55,7 @@ const Form: React.FC = () => { yup.object().shape({ newSchema: schema?.schemaType === SchemaType.PROTOBUF - ? yup.string().required().isEnum('Schema syntax is not valid') + ? yup.string().required() : yup.string().required().isJsonObject('Schema syntax is not valid'), }); const methods = useForm({ diff --git a/kafka-ui-react-app/src/lib/__test__/yupExtended.spec.ts b/kafka-ui-react-app/src/lib/__test__/yupExtended.spec.ts index 8100b9a326..bd43dd3f72 100644 --- a/kafka-ui-react-app/src/lib/__test__/yupExtended.spec.ts +++ b/kafka-ui-react-app/src/lib/__test__/yupExtended.spec.ts @@ -1,19 +1,5 @@ -import { isValidEnum, isValidJsonObject } from 'lib/yupExtended'; +import { isValidJsonObject } from 'lib/yupExtended'; -const invalidEnum = ` -ennum SchemType { - AVRO = 0; - JSON = 1; - PROTOBUF = 3; -} -`; -const validEnum = ` -enum SchemType { - AVRO = 0; - JSON = 1; - PROTOBUF = 3; -} -`; describe('yup extended', () => { describe('isValidJsonObject', () => { it('returns false for no value', () => { @@ -35,21 +21,4 @@ describe('yup extended', () => { expect(isValidJsonObject('{ "foo": "bar" }')).toBeTruthy(); }); }); - - describe('isValidEnum', () => { - it('returns false for invalid enum', () => { - expect(isValidEnum(invalidEnum)).toBeFalsy(); - }); - it('returns false for no value', () => { - expect(isValidEnum()).toBeFalsy(); - }); - it('returns true should trim value', () => { - expect( - isValidEnum(` enum SchemType {AVRO = 0; PROTOBUF = 3;} `) - ).toBeTruthy(); - }); - it('returns true for valid enum', () => { - expect(isValidEnum(validEnum)).toBeTruthy(); - }); - }); }); diff --git a/kafka-ui-react-app/src/lib/yupExtended.ts b/kafka-ui-react-app/src/lib/yupExtended.ts index 4c662ca822..241dac9770 100644 --- a/kafka-ui-react-app/src/lib/yupExtended.ts +++ b/kafka-ui-react-app/src/lib/yupExtended.ts @@ -10,7 +10,6 @@ declare module 'yup' { TFlags extends yup.Flags = '' > extends yup.Schema { isJsonObject(message?: string): StringSchema; - isEnum(message?: string): StringSchema; } } @@ -40,32 +39,6 @@ const isJsonObject = (message?: string) => { isValidJsonObject ); }; - -export const isValidEnum = (value?: string) => { - try { - if (!value) return false; - const trimmedValue = value.trim(); - if ( - trimmedValue.indexOf('enum') === 0 && - trimmedValue.lastIndexOf('}') === trimmedValue.length - 1 - ) { - return true; - } - } catch { - // do nothing - } - return false; -}; - -const isEnum = (message?: string) => { - return yup.string().test( - 'isEnum', - // eslint-disable-next-line no-template-curly-in-string - message || '${path} is not Enum object', - isValidEnum - ); -}; - /** * due to yup rerunning all the object validiation during any render, * it makes sense to cache the async results @@ -88,7 +61,6 @@ export function cacheTest( } yup.addMethod(yup.StringSchema, 'isJsonObject', isJsonObject); -yup.addMethod(yup.StringSchema, 'isEnum', isEnum); export const topicFormValidationSchema = yup.object().shape({ name: yup From 379d9926df00e6388ee417b043652cf4d37ad4d0 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 6 May 2023 21:12:36 +0400 Subject: [PATCH 4/5] Bump jacoco-maven-plugin from 0.8.8 to 0.8.10 (#3714) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- kafka-ui-api/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 7f2d4c16be..5ebefe31df 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -12,7 +12,7 @@ kafka-ui-api - 0.8.8 + 0.8.10 jacoco reuseReports ${project.basedir}/target/jacoco.exec From 147b539c376028268d98955e66f0672125cd263b Mon Sep 17 00:00:00 2001 From: David Bejanyan <58771979+David-DB88@users.noreply.github.com> Date: Sat, 6 May 2023 21:36:29 +0400 Subject: [PATCH 5/5] FE: KC: Fix no error is displayed if the syntax is not valid (#3750) Co-authored-by: Roman Zabaluev --- .../src/components/Connect/Details/Config/Config.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-ui-react-app/src/components/Connect/Details/Config/Config.tsx b/kafka-ui-react-app/src/components/Connect/Details/Config/Config.tsx index 0e86d48940..8a372e9d12 100644 --- a/kafka-ui-react-app/src/components/Connect/Details/Config/Config.tsx +++ b/kafka-ui-react-app/src/components/Connect/Details/Config/Config.tsx @@ -37,7 +37,7 @@ const Config: React.FC = () => { formState: { isDirty, isSubmitting, isValid, errors }, setValue, } = useForm({ - mode: 'onTouched', + mode: 'onChange', resolver: yupResolver(validationSchema), defaultValues: { config: JSON.stringify(config, null, '\t'),