FeatureService.java 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.model.ClusterFeature;
  3. import com.provectus.kafka.ui.model.KafkaCluster;
  4. import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Optional;
  9. import java.util.Set;
  10. import java.util.function.Predicate;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.apache.kafka.common.acl.AclOperation;
  13. import org.springframework.stereotype.Service;
  14. import reactor.core.publisher.Flux;
  15. import reactor.core.publisher.Mono;
  16. @Service
  17. @Slf4j
  18. public class FeatureService {
  19. public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient adminClient,
  20. KafkaCluster cluster,
  21. ClusterDescription clusterDescription) {
  22. List<Mono<ClusterFeature>> features = new ArrayList<>();
  23. if (Optional.ofNullable(cluster.getConnectsClients())
  24. .filter(Predicate.not(Map::isEmpty))
  25. .isPresent()) {
  26. features.add(Mono.just(ClusterFeature.KAFKA_CONNECT));
  27. }
  28. if (cluster.getKsqlClient() != null) {
  29. features.add(Mono.just(ClusterFeature.KSQL_DB));
  30. }
  31. if (cluster.getSchemaRegistryClient() != null) {
  32. features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY));
  33. }
  34. features.add(topicDeletionEnabled(adminClient));
  35. features.add(aclView(adminClient));
  36. features.add(aclEdit(adminClient, clusterDescription));
  37. return Flux.fromIterable(features).flatMap(m -> m).collectList();
  38. }
  39. private Mono<ClusterFeature> topicDeletionEnabled(ReactiveAdminClient adminClient) {
  40. return adminClient.isTopicDeletionEnabled()
  41. ? Mono.just(ClusterFeature.TOPIC_DELETION)
  42. : Mono.empty();
  43. }
  44. private Mono<ClusterFeature> aclEdit(ReactiveAdminClient adminClient, ClusterDescription clusterDescription) {
  45. var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
  46. boolean canEdit = aclViewEnabled(adminClient)
  47. && (authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER));
  48. return canEdit
  49. ? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
  50. : Mono.empty();
  51. }
  52. private Mono<ClusterFeature> aclView(ReactiveAdminClient adminClient) {
  53. return aclViewEnabled(adminClient)
  54. ? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
  55. : Mono.empty();
  56. }
  57. private boolean aclViewEnabled(ReactiveAdminClient adminClient) {
  58. return adminClient.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED);
  59. }
  60. }