FeatureService.java 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.model.ClusterFeature;
  3. import com.provectus.kafka.ui.model.KafkaCluster;
  4. import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Optional;
  9. import java.util.Set;
  10. import java.util.function.Predicate;
  11. import lombok.RequiredArgsConstructor;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.apache.kafka.common.acl.AclOperation;
  14. import org.springframework.stereotype.Service;
  15. import reactor.core.publisher.Flux;
  16. import reactor.core.publisher.Mono;
  17. @Service
  18. @RequiredArgsConstructor
  19. @Slf4j
  20. public class FeatureService {
  21. private final AdminClientService adminClientService;
  22. public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient adminClient,
  23. KafkaCluster cluster,
  24. ClusterDescription clusterDescription) {
  25. List<Mono<ClusterFeature>> features = new ArrayList<>();
  26. if (Optional.ofNullable(cluster.getConnectsClients())
  27. .filter(Predicate.not(Map::isEmpty))
  28. .isPresent()) {
  29. features.add(Mono.just(ClusterFeature.KAFKA_CONNECT));
  30. }
  31. if (cluster.getKsqlClient() != null) {
  32. features.add(Mono.just(ClusterFeature.KSQL_DB));
  33. }
  34. if (cluster.getSchemaRegistryClient() != null) {
  35. features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY));
  36. }
  37. features.add(topicDeletionEnabled(adminClient));
  38. features.add(aclView(cluster));
  39. features.add(aclEdit(clusterDescription));
  40. return Flux.fromIterable(features).flatMap(m -> m).collectList();
  41. }
  42. private Mono<ClusterFeature> topicDeletionEnabled(ReactiveAdminClient adminClient) {
  43. return adminClient.isTopicDeletionEnabled()
  44. ? Mono.just(ClusterFeature.TOPIC_DELETION)
  45. : Mono.empty();
  46. }
  47. private Mono<ClusterFeature> aclEdit(ClusterDescription clusterDescription) {
  48. var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
  49. boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER);
  50. return canEdit
  51. ? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
  52. : Mono.empty();
  53. }
  54. private Mono<ClusterFeature> aclView(KafkaCluster cluster) {
  55. return adminClientService.get(cluster).flatMap(
  56. ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED)
  57. ? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
  58. : Mono.empty()
  59. );
  60. }
  61. }