ACL_EDIT feature testing fix
This commit is contained in:
parent
c89effff04
commit
43d954eda9
1 changed files with 14 additions and 14 deletions
|
@ -9,7 +9,6 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.common.acl.AclOperation;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
@ -17,12 +16,9 @@ import reactor.core.publisher.Flux;
|
|||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class FeatureService {
|
||||
|
||||
private final AdminClientService adminClientService;
|
||||
|
||||
public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient adminClient,
|
||||
KafkaCluster cluster,
|
||||
ClusterDescription clusterDescription) {
|
||||
|
@ -43,8 +39,8 @@ public class FeatureService {
|
|||
}
|
||||
|
||||
features.add(topicDeletionEnabled(adminClient));
|
||||
features.add(aclView(cluster));
|
||||
features.add(aclEdit(clusterDescription));
|
||||
features.add(aclView(adminClient));
|
||||
features.add(aclEdit(adminClient, clusterDescription));
|
||||
|
||||
return Flux.fromIterable(features).flatMap(m -> m).collectList();
|
||||
}
|
||||
|
@ -55,19 +51,23 @@ public class FeatureService {
|
|||
: Mono.empty();
|
||||
}
|
||||
|
||||
private Mono<ClusterFeature> aclEdit(ClusterDescription clusterDescription) {
|
||||
private Mono<ClusterFeature> aclEdit(ReactiveAdminClient adminClient, ClusterDescription clusterDescription) {
|
||||
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
|
||||
boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER);
|
||||
boolean canEdit = aclViewEnabled(adminClient)
|
||||
&& (authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER));
|
||||
return canEdit
|
||||
? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
|
||||
: Mono.empty();
|
||||
}
|
||||
|
||||
private Mono<ClusterFeature> aclView(KafkaCluster cluster) {
|
||||
return adminClientService.get(cluster).flatMap(
|
||||
ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED)
|
||||
? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
|
||||
: Mono.empty()
|
||||
);
|
||||
private Mono<ClusterFeature> aclView(ReactiveAdminClient adminClient) {
|
||||
return aclViewEnabled(adminClient)
|
||||
? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
|
||||
: Mono.empty();
|
||||
}
|
||||
|
||||
private boolean aclViewEnabled(ReactiveAdminClient adminClient) {
|
||||
return adminClient.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue