FeatureService.java 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.model.Feature;
  3. import com.provectus.kafka.ui.model.KafkaCluster;
  4. import java.util.ArrayList;
  5. import java.util.Collection;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Optional;
  9. import java.util.function.Predicate;
  10. import javax.annotation.Nullable;
  11. import lombok.RequiredArgsConstructor;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.apache.kafka.common.Node;
  14. import org.springframework.stereotype.Service;
  15. import reactor.core.publisher.Flux;
  16. import reactor.core.publisher.Mono;
  17. @Service
  18. @RequiredArgsConstructor
  19. @Slf4j
  20. public class FeatureService {
  21. private static final String DELETE_TOPIC_ENABLED_SERVER_PROPERTY = "delete.topic.enable";
  22. private final AdminClientService adminClientService;
  23. public Mono<List<Feature>> getAvailableFeatures(KafkaCluster cluster, @Nullable Node controller) {
  24. List<Mono<Feature>> features = new ArrayList<>();
  25. if (Optional.ofNullable(cluster.getConnectsClients())
  26. .filter(Predicate.not(Map::isEmpty))
  27. .isPresent()) {
  28. features.add(Mono.just(Feature.KAFKA_CONNECT));
  29. }
  30. if (cluster.getKsqlClient() != null) {
  31. features.add(Mono.just(Feature.KSQL_DB));
  32. }
  33. if (cluster.getSchemaRegistryClient() != null) {
  34. features.add(Mono.just(Feature.SCHEMA_REGISTRY));
  35. }
  36. if (controller != null) {
  37. features.add(
  38. isTopicDeletionEnabled(cluster, controller)
  39. .flatMap(r -> Boolean.TRUE.equals(r) ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty())
  40. );
  41. }
  42. return Flux.fromIterable(features).flatMap(m -> m).collectList();
  43. }
  44. private Mono<Boolean> isTopicDeletionEnabled(KafkaCluster cluster, Node controller) {
  45. return adminClientService.get(cluster)
  46. .flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id())))
  47. .map(config ->
  48. config.values().stream()
  49. .flatMap(Collection::stream)
  50. .filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY))
  51. .map(e -> Boolean.parseBoolean(e.value()))
  52. .findFirst()
  53. .orElse(true));
  54. }
  55. }