This commit is contained in:
iliax 2023-03-14 16:42:27 +04:00
parent ffff964007
commit 297a1d96e1
8 changed files with 53 additions and 28 deletions

View file

@ -11,4 +11,8 @@ KafkaClient {
user_admin="admin-secret";
};
Client {};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zkuser"
password="zkuserpassword";
};

View file

@ -1,4 +1,4 @@
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="admin-secret";
};
org.apache.zookeeper.server.auth.DigestLoginModule required
user_zkuser="zkuserpassword";
};

View file

@ -8,15 +8,26 @@ services:
ports:
- 8080:8080
depends_on:
- zookeeper
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
# KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="enzo" password="cisternino";'
KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";'
DYNAMIC_CONFIG_ENABLED: true # not necessary for sasl auth, added for tests
zookeeper:
image: wurstmeister/zookeeper:3.4.6
environment:
JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf"
volumes:
- ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:7.2.1
hostname: kafka
@ -26,27 +37,25 @@ services:
- "9997:9997"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf"
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9997
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT'
KAFKA_SUPER_USERS: 'User:admin,User:enzo'
KAFKA_SUPER_USERS: 'User:admin'
volumes:
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./jaas:/etc/kafka/jaas
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"

View file

@ -28,7 +28,6 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.service.masking.DataMasking;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.List;
import java.util.Map;
@ -132,7 +131,8 @@ public interface ClusterMapper {
case IDEMPOTENT_WRITE -> KafkaAclDTO.OperationEnum.IDEMPOTENT_WRITE;
case CREATE_TOKENS -> KafkaAclDTO.OperationEnum.CREATE_TOKENS;
case DESCRIBE_TOKENS -> KafkaAclDTO.OperationEnum.DESCRIBE_TOKENS;
case ANY, UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN;
case ANY -> throw new IllegalArgumentException("ANY operation can be only part of filter");
case UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN;
};
}
@ -144,7 +144,8 @@ public interface ClusterMapper {
case DELEGATION_TOKEN -> KafkaAclDTO.ResourceTypeEnum.DELEGATION_TOKEN;
case TRANSACTIONAL_ID -> KafkaAclDTO.ResourceTypeEnum.TRANSACTIONAL_ID;
case USER -> KafkaAclDTO.ResourceTypeEnum.USER;
case ANY, UNKNOWN -> KafkaAclDTO.ResourceTypeEnum.UNKNOWN;
case ANY -> throw new IllegalArgumentException("ANY type can be only part of filter");
case UNKNOWN -> KafkaAclDTO.ResourceTypeEnum.UNKNOWN;
};
}

View file

@ -5,5 +5,6 @@ public enum ClusterFeature {
KSQL_DB,
SCHEMA_REGISTRY,
TOPIC_DELETION,
KAFKA_ACL
KAFKA_ACL_VIEW,
KAFKA_ACL_EDIT
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.model.ClusterFeature;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -12,6 +13,7 @@ import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -25,7 +27,7 @@ public class FeatureService {
private final AdminClientService adminClientService;
public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster, @Nullable Node controller) {
public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster, ClusterDescription clusterDescription) {
List<Mono<ClusterFeature>> features = new ArrayList<>();
if (Optional.ofNullable(cluster.getConnectsClients())
@ -42,19 +44,17 @@ public class FeatureService {
features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY));
}
if (controller != null) {
features.add(
isTopicDeletionEnabled(cluster, controller)
.flatMap(r -> Boolean.TRUE.equals(r) ? Mono.just(ClusterFeature.TOPIC_DELETION) : Mono.empty())
);
}
features.add(acl(cluster));
features.add(topicDeletionEnabled(cluster, clusterDescription.getController()));
features.add(aclView(cluster));
features.add(aclEdit(clusterDescription));
return Flux.fromIterable(features).flatMap(m -> m).collectList();
}
private Mono<Boolean> isTopicDeletionEnabled(KafkaCluster cluster, Node controller) {
private Mono<ClusterFeature> topicDeletionEnabled(KafkaCluster cluster, @Nullable Node controller) {
if (controller == null) {
return Mono.empty();
}
return adminClientService.get(cluster)
.flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id())))
.map(config ->
@ -63,13 +63,22 @@ public class FeatureService {
.filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY))
.map(e -> Boolean.parseBoolean(e.value()))
.findFirst()
.orElse(true));
.orElse(true))
.flatMap(enabled -> enabled ? Mono.just(ClusterFeature.TOPIC_DELETION) : Mono.empty());
}
private Mono<ClusterFeature> acl(KafkaCluster cluster) {
private Mono<ClusterFeature> aclEdit(ClusterDescription clusterDescription) {
var authorizedOps = clusterDescription.getAuthorizedOperations();
boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER);
return canEdit
? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
: Mono.empty();
}
private Mono<ClusterFeature> aclView(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(
ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED)
? Mono.just(ClusterFeature.KAFKA_ACL)
? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
: Mono.empty()
);
}

View file

@ -41,7 +41,7 @@ public class StatisticsService {
List.of(
metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
getLogDirInfo(description, ac),
featureService.getAvailableFeatures(cluster, description.getController()),
featureService.getAvailableFeatures(cluster, description),
loadTopicConfigs(cluster),
describeTopics(cluster)),
results ->

View file

@ -2056,7 +2056,8 @@ components:
- KAFKA_CONNECT
- KSQL_DB
- TOPIC_DELETION
- KAFKA_ACL
- KAFKA_ACL_VIEW # get ACLs listing
- KAFKA_ACL_EDIT # create & delete ACLs
required:
- id
- name