FeatureService.java 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.model.ClusterFeature;
  3. import com.provectus.kafka.ui.model.KafkaCluster;
  4. import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
  5. import java.util.ArrayList;
  6. import java.util.Collection;
  7. import java.util.List;
  8. import java.util.Map;
  9. import java.util.Optional;
  10. import java.util.Set;
  11. import java.util.function.Predicate;
  12. import javax.annotation.Nullable;
  13. import lombok.RequiredArgsConstructor;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.apache.kafka.common.Node;
  16. import org.apache.kafka.common.acl.AclOperation;
  17. import org.springframework.stereotype.Service;
  18. import reactor.core.publisher.Flux;
  19. import reactor.core.publisher.Mono;
  20. @Service
  21. @RequiredArgsConstructor
  22. @Slf4j
  23. public class FeatureService {
  24. private static final String DELETE_TOPIC_ENABLED_SERVER_PROPERTY = "delete.topic.enable";
  25. private final AdminClientService adminClientService;
  26. public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster,
  27. ClusterDescription clusterDescription) {
  28. List<Mono<ClusterFeature>> features = new ArrayList<>();
  29. if (Optional.ofNullable(cluster.getConnectsClients())
  30. .filter(Predicate.not(Map::isEmpty))
  31. .isPresent()) {
  32. features.add(Mono.just(ClusterFeature.KAFKA_CONNECT));
  33. }
  34. if (cluster.getKsqlClient() != null) {
  35. features.add(Mono.just(ClusterFeature.KSQL_DB));
  36. }
  37. if (cluster.getSchemaRegistryClient() != null) {
  38. features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY));
  39. }
  40. features.add(topicDeletionEnabled(cluster, clusterDescription.getController()));
  41. features.add(aclView(cluster));
  42. features.add(aclEdit(clusterDescription));
  43. return Flux.fromIterable(features).flatMap(m -> m).collectList();
  44. }
  45. private Mono<ClusterFeature> topicDeletionEnabled(KafkaCluster cluster, @Nullable Node controller) {
  46. if (controller == null) {
  47. return Mono.just(ClusterFeature.TOPIC_DELETION); // assuming it is enabled by default
  48. }
  49. return adminClientService.get(cluster)
  50. .flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id())))
  51. .map(config ->
  52. config.values().stream()
  53. .flatMap(Collection::stream)
  54. .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY))
  55. .map(e -> Boolean.parseBoolean(e.value()))
  56. .findFirst()
  57. .orElse(true))
  58. .flatMap(enabled -> enabled
  59. ? Mono.just(ClusterFeature.TOPIC_DELETION)
  60. : Mono.empty());
  61. }
  62. private Mono<ClusterFeature> aclEdit(ClusterDescription clusterDescription) {
  63. var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
  64. boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER);
  65. return canEdit
  66. ? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
  67. : Mono.empty();
  68. }
  69. private Mono<ClusterFeature> aclView(KafkaCluster cluster) {
  70. return adminClientService.get(cluster).flatMap(
  71. ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED)
  72. ? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
  73. : Mono.empty()
  74. );
  75. }
  76. }