wip
This commit is contained in:
parent
e80096eb95
commit
be26f86d50
2 changed files with 7 additions and 3 deletions
|
@ -8,6 +8,7 @@ import java.util.Collection;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Predicate;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
@ -53,7 +54,7 @@ public class FeatureService {
|
|||
|
||||
private Mono<ClusterFeature> topicDeletionEnabled(KafkaCluster cluster, @Nullable Node controller) {
|
||||
if (controller == null) {
|
||||
return Mono.empty();
|
||||
return Mono.just(ClusterFeature.TOPIC_DELETION); // assuming it is enabled by default
|
||||
}
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id())))
|
||||
|
@ -64,11 +65,13 @@ public class FeatureService {
|
|||
.map(e -> Boolean.parseBoolean(e.value()))
|
||||
.findFirst()
|
||||
.orElse(true))
|
||||
.flatMap(enabled -> enabled ? Mono.just(ClusterFeature.TOPIC_DELETION) : Mono.empty());
|
||||
.flatMap(enabled -> enabled
|
||||
? Mono.just(ClusterFeature.TOPIC_DELETION)
|
||||
: Mono.empty());
|
||||
}
|
||||
|
||||
private Mono<ClusterFeature> aclEdit(ClusterDescription clusterDescription) {
|
||||
var authorizedOps = clusterDescription.getAuthorizedOperations();
|
||||
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
|
||||
boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER);
|
||||
return canEdit
|
||||
? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
|
||||
|
|
|
@ -115,6 +115,7 @@ public class ReactiveAdminClient implements Closeable {
|
|||
Node controller;
|
||||
String clusterId;
|
||||
Collection<Node> nodes;
|
||||
@Nullable // null, if ACL is disabled
|
||||
Set<AclOperation> authorizedOperations;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue