diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java index 6c58470e59..638af22775 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java @@ -22,7 +22,6 @@ public class ClustersStorage { private final ClustersProperties clusterProperties; private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class); - private final FeatureService featureService; @PostConstruct public void init() { @@ -36,7 +35,6 @@ public class ClustersStorage { clusterProperties.getName(), cluster.toBuilder() .topics(new HashMap<>()) - .features(featureService.getAvailableFeatures(cluster)) .build() ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index 7abe4870ee..58610259da 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service; import com.provectus.kafka.ui.model.Feature; import com.provectus.kafka.ui.model.KafkaCluster; -import java.util.List; +import reactor.core.publisher.Flux; public interface FeatureService { /** @@ -11,5 +11,5 @@ public interface FeatureService { * @param cluster - cluster * @return List of Feature */ - List getAvailableFeatures(KafkaCluster cluster); + Flux getAvailableFeatures(KafkaCluster cluster); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureServiceImpl.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureServiceImpl.java index 919b19d30c..413c237d9c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureServiceImpl.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureServiceImpl.java @@ -12,6 +12,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.apache.kafka.common.Node; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; @Service @RequiredArgsConstructor @@ -21,31 +23,32 @@ public class FeatureServiceImpl implements FeatureService { private final BrokerService brokerService; @Override - public List getAvailableFeatures(KafkaCluster cluster) { - List features = new ArrayList<>(); + public Flux getAvailableFeatures(KafkaCluster cluster) { + List> features = new ArrayList<>(); if (Optional.ofNullable(cluster.getKafkaConnect()) .filter(Predicate.not(List::isEmpty)) .isPresent()) { - features.add(Feature.KAFKA_CONNECT); + features.add(Mono.just(Feature.KAFKA_CONNECT)); } if (cluster.getKsqldbServer() != null) { - features.add(Feature.KSQL_DB); + features.add(Mono.just(Feature.KSQL_DB)); } if (cluster.getSchemaRegistry() != null) { - features.add(Feature.SCHEMA_REGISTRY); + features.add(Mono.just(Feature.SCHEMA_REGISTRY)); } - if (topicDeletionCheck(cluster)) { - features.add(Feature.TOPIC_DELETION); - } + features.add( + topicDeletionCheck(cluster) + .flatMap(r -> r ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty()) + ); - return features; + return Flux.fromIterable(features).flatMap(m -> m); } - private boolean topicDeletionCheck(KafkaCluster cluster) { + private Mono topicDeletionCheck(KafkaCluster cluster) { return brokerService.getController(cluster) .map(Node::id) .flatMap(broker -> brokerService.getBrokerConfigMap(cluster, broker)) @@ -54,6 +57,6 @@ public class FeatureServiceImpl implements FeatureService { return Boolean.parseBoolean(config.get(DELETE_TOPIC_ENABLE).getValue()); } return false; - }).blockOptional().orElse(false); + }); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index e94596b454..d888a388a0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -1,9 +1,7 @@ package com.provectus.kafka.ui.service; -import com.provectus.kafka.ui.exception.IllegalEntityStateException; import com.provectus.kafka.ui.exception.InvalidRequestApiException; import com.provectus.kafka.ui.exception.LogDirNotFoundApiException; -import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.TopicMetadataException; import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException; import com.provectus.kafka.ui.exception.ValidationException; @@ -11,7 +9,6 @@ import com.provectus.kafka.ui.model.BrokerLogdirUpdate; import com.provectus.kafka.ui.model.CleanupPolicy; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; -import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics; @@ -104,6 +101,8 @@ public class KafkaService { private final ClustersStorage clustersStorage; private final DeserializationService deserializationService; private final AdminClientService adminClientService; + private final FeatureService featureService; + public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) { final Map topics = @@ -142,6 +141,9 @@ public class KafkaService { ).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto)) ) ) + ).flatMap( + nc -> featureService.getAvailableFeatures(cluster).collectList() + .map(f -> nc.toBuilder().features(f).build()) ).doOnError(e -> log.error("Failed to collect cluster {} info", cluster.getName(), e) ).onErrorResume( diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java index 0130d2698a..779bda4d3e 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java @@ -52,8 +52,10 @@ public class OffsetsResetServiceTest extends AbstractBaseTest { @BeforeEach void init() { AdminClientServiceImpl adminClientService = new AdminClientServiceImpl(); + BrokerService brokerService = new BrokerServiceImpl(adminClientService); + FeatureService featureService = new FeatureServiceImpl(brokerService); adminClientService.setClientTimeout(5_000); - kafkaService = new KafkaService(null, null, null, null, adminClientService); + kafkaService = new KafkaService(null, null, null, null, adminClientService, featureService); offsetsResetService = new OffsetsResetService(kafkaService); createTopic(new NewTopic(topic, PARTITIONS, (short) 1));