瀏覽代碼

ISSUE-832 Collect features not on startup (#834)

German Osin 3 年之前
父節點
當前提交
1b2b22f18a

+ 0 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClustersStorage.java

@@ -22,7 +22,6 @@ public class ClustersStorage {
   private final ClustersProperties clusterProperties;
 
   private final ClusterMapper clusterMapper = Mappers.getMapper(ClusterMapper.class);
-  private final FeatureService featureService;
 
   @PostConstruct
   public void init() {
@@ -36,7 +35,6 @@ public class ClustersStorage {
           clusterProperties.getName(),
           cluster.toBuilder()
               .topics(new HashMap<>())
-              .features(featureService.getAvailableFeatures(cluster))
               .build()
       );
     }

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java

@@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service;
 
 import com.provectus.kafka.ui.model.Feature;
 import com.provectus.kafka.ui.model.KafkaCluster;
-import java.util.List;
+import reactor.core.publisher.Flux;
 
 public interface FeatureService {
   /**
@@ -11,5 +11,5 @@ public interface FeatureService {
    * @param cluster - cluster
    * @return List of Feature
    */
-  List<Feature> getAvailableFeatures(KafkaCluster cluster);
+  Flux<Feature> getAvailableFeatures(KafkaCluster cluster);
 }

+ 14 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureServiceImpl.java

@@ -12,6 +12,8 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 import org.apache.kafka.common.Node;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 @Service
 @RequiredArgsConstructor
@@ -21,31 +23,32 @@ public class FeatureServiceImpl implements FeatureService {
   private final BrokerService brokerService;
 
   @Override
-  public List<Feature> getAvailableFeatures(KafkaCluster cluster) {
-    List<Feature> features = new ArrayList<>();
+  public Flux<Feature> getAvailableFeatures(KafkaCluster cluster) {
+    List<Mono<Feature>> features = new ArrayList<>();
 
     if (Optional.ofNullable(cluster.getKafkaConnect())
         .filter(Predicate.not(List::isEmpty))
         .isPresent()) {
-      features.add(Feature.KAFKA_CONNECT);
+      features.add(Mono.just(Feature.KAFKA_CONNECT));
     }
 
     if (cluster.getKsqldbServer() != null) {
-      features.add(Feature.KSQL_DB);
+      features.add(Mono.just(Feature.KSQL_DB));
     }
 
     if (cluster.getSchemaRegistry() != null) {
-      features.add(Feature.SCHEMA_REGISTRY);
+      features.add(Mono.just(Feature.SCHEMA_REGISTRY));
     }
 
-    if (topicDeletionCheck(cluster)) {
-      features.add(Feature.TOPIC_DELETION);
-    }
+    features.add(
+        topicDeletionCheck(cluster)
+            .flatMap(r -> r ? Mono.just(Feature.TOPIC_DELETION) : Mono.empty())
+    );
 
-    return features;
+    return Flux.fromIterable(features).flatMap(m -> m);
   }
 
-  private boolean topicDeletionCheck(KafkaCluster cluster) {
+  private Mono<Boolean> topicDeletionCheck(KafkaCluster cluster) {
     return brokerService.getController(cluster)
         .map(Node::id)
         .flatMap(broker -> brokerService.getBrokerConfigMap(cluster, broker))
@@ -54,6 +57,6 @@ public class FeatureServiceImpl implements FeatureService {
             return Boolean.parseBoolean(config.get(DELETE_TOPIC_ENABLE).getValue());
           }
           return false;
-        }).blockOptional().orElse(false);
+        });
   }
 }

+ 5 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java

@@ -1,9 +1,7 @@
 package com.provectus.kafka.ui.service;
 
-import com.provectus.kafka.ui.exception.IllegalEntityStateException;
 import com.provectus.kafka.ui.exception.InvalidRequestApiException;
 import com.provectus.kafka.ui.exception.LogDirNotFoundApiException;
-import com.provectus.kafka.ui.exception.NotFoundException;
 import com.provectus.kafka.ui.exception.TopicMetadataException;
 import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
 import com.provectus.kafka.ui.exception.ValidationException;
@@ -11,7 +9,6 @@ import com.provectus.kafka.ui.model.BrokerLogdirUpdate;
 import com.provectus.kafka.ui.model.CleanupPolicy;
 import com.provectus.kafka.ui.model.CreateTopicMessage;
 import com.provectus.kafka.ui.model.ExtendedAdminClient;
-import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
 import com.provectus.kafka.ui.model.InternalBrokerMetrics;
 import com.provectus.kafka.ui.model.InternalClusterMetrics;
@@ -104,6 +101,8 @@ public class KafkaService {
   private final ClustersStorage clustersStorage;
   private final DeserializationService deserializationService;
   private final AdminClientService adminClientService;
+  private final FeatureService featureService;
+
 
   public KafkaCluster getUpdatedCluster(KafkaCluster cluster, InternalTopic updatedTopic) {
     final Map<String, InternalTopic> topics =
@@ -142,6 +141,9 @@ public class KafkaService {
                             ).map(segmentSizeDto -> buildFromData(cluster, version, segmentSizeDto))
                         )
             )
+        ).flatMap(
+            nc ->  featureService.getAvailableFeatures(cluster).collectList()
+                .map(f -> nc.toBuilder().features(f).build())
         ).doOnError(e ->
             log.error("Failed to collect cluster {} info", cluster.getName(), e)
         ).onErrorResume(

+ 3 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/OffsetsResetServiceTest.java

@@ -52,8 +52,10 @@ public class OffsetsResetServiceTest extends AbstractBaseTest {
   @BeforeEach
   void init() {
     AdminClientServiceImpl adminClientService = new AdminClientServiceImpl();
+    BrokerService brokerService = new BrokerServiceImpl(adminClientService);
+    FeatureService featureService = new FeatureServiceImpl(brokerService);
     adminClientService.setClientTimeout(5_000);
-    kafkaService = new KafkaService(null, null, null, null, adminClientService);
+    kafkaService = new KafkaService(null, null, null, null, adminClientService, featureService);
     offsetsResetService = new OffsetsResetService(kafkaService);
 
     createTopic(new NewTopic(topic, PARTITIONS, (short) 1));