|
@@ -9,7 +9,6 @@ import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
import java.util.function.Predicate;
|
|
|
-import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.common.acl.AclOperation;
|
|
|
import org.springframework.stereotype.Service;
|
|
@@ -17,12 +16,9 @@ import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
@Service
|
|
|
-@RequiredArgsConstructor
|
|
|
@Slf4j
|
|
|
public class FeatureService {
|
|
|
|
|
|
- private final AdminClientService adminClientService;
|
|
|
-
|
|
|
public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient adminClient,
|
|
|
KafkaCluster cluster,
|
|
|
ClusterDescription clusterDescription) {
|
|
@@ -43,8 +39,8 @@ public class FeatureService {
|
|
|
}
|
|
|
|
|
|
features.add(topicDeletionEnabled(adminClient));
|
|
|
- features.add(aclView(cluster));
|
|
|
- features.add(aclEdit(clusterDescription));
|
|
|
+ features.add(aclView(adminClient));
|
|
|
+ features.add(aclEdit(adminClient, clusterDescription));
|
|
|
|
|
|
return Flux.fromIterable(features).flatMap(m -> m).collectList();
|
|
|
}
|
|
@@ -55,19 +51,23 @@ public class FeatureService {
|
|
|
: Mono.empty();
|
|
|
}
|
|
|
|
|
|
- private Mono<ClusterFeature> aclEdit(ClusterDescription clusterDescription) {
|
|
|
+ private Mono<ClusterFeature> aclEdit(ReactiveAdminClient adminClient, ClusterDescription clusterDescription) {
|
|
|
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
|
|
|
- boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER);
|
|
|
+ boolean canEdit = aclViewEnabled(adminClient)
|
|
|
+ && (authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER));
|
|
|
return canEdit
|
|
|
? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
|
|
|
: Mono.empty();
|
|
|
}
|
|
|
|
|
|
- private Mono<ClusterFeature> aclView(KafkaCluster cluster) {
|
|
|
- return adminClientService.get(cluster).flatMap(
|
|
|
- ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED)
|
|
|
- ? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
|
|
|
- : Mono.empty()
|
|
|
- );
|
|
|
+ private Mono<ClusterFeature> aclView(ReactiveAdminClient adminClient) {
|
|
|
+ return aclViewEnabled(adminClient)
|
|
|
+ ? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
|
|
|
+ : Mono.empty();
|
|
|
}
|
|
|
+
|
|
|
+ private boolean aclViewEnabled(ReactiveAdminClient adminClient) {
|
|
|
+ return adminClient.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED);
|
|
|
+ }
|
|
|
+
|
|
|
}
|