浏览代码

Unavailable connects suppressing (#4061)

1. suppressing connect errors when returning list of all connectors
2. minor refactoring of KafkaConnectService.getAllConnectors
Ilya Kuramshin 1 年之前
父节点
当前提交
895d27a306

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java

@@ -34,7 +34,7 @@ public interface KafkaConnectMapper {
       com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse
           connectorPluginConfigValidationResponse);
 
-  default FullConnectorInfoDTO fullConnectorInfoFromTuple(InternalConnectInfo connectInfo) {
+  default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
     ConnectorDTO connector = connectInfo.getConnector();
     List<TaskDTO> tasks = connectInfo.getTasks();
     int failedTasksCount = (int) tasks.stream()

+ 21 - 35
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
@@ -39,7 +38,6 @@ import org.springframework.stereotype.Service;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuples;
 
 @Service
 @Slf4j
@@ -61,39 +59,22 @@ public class KafkaConnectService {
   public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
                                                      @Nullable final String search) {
     return getConnects(cluster)
-        .flatMap(connect -> getConnectorNames(cluster, connect.getName()).map(cn -> Tuples.of(connect.getName(), cn)))
-        .flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
-        .flatMap(connector ->
-            getConnectorConfig(cluster, connector.getConnect(), connector.getName())
-                .map(config -> InternalConnectInfo.builder()
-                    .connector(connector)
-                    .config(config)
-                    .build()
-                )
-        )
-        .flatMap(connectInfo -> {
-          ConnectorDTO connector = connectInfo.getConnector();
-          return getConnectorTasks(cluster, connector.getConnect(), connector.getName())
-              .collectList()
-              .map(tasks -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(tasks)
-                  .build()
-              );
-        })
-        .flatMap(connectInfo -> {
-          ConnectorDTO connector = connectInfo.getConnector();
-          return getConnectorTopics(cluster, connector.getConnect(), connector.getName())
-              .map(ct -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(connectInfo.getTasks())
-                  .topics(ct.getTopics())
-                  .build()
-              );
-        })
-        .map(kafkaConnectMapper::fullConnectorInfoFromTuple)
+        .flatMap(connect ->
+            getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
+                .flatMap(connectorName ->
+                    Mono.zip(
+                        getConnector(cluster, connect.getName(), connectorName),
+                        getConnectorConfig(cluster, connect.getName(), connectorName),
+                        getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
+                        getConnectorTopics(cluster, connect.getName(), connectorName)
+                    ).map(tuple ->
+                        InternalConnectInfo.builder()
+                            .connector(tuple.getT1())
+                            .config(tuple.getT2())
+                            .tasks(tuple.getT3())
+                            .topics(tuple.getT4().getTopics())
+                            .build())))
+        .map(kafkaConnectMapper::fullConnectorInfo)
         .filter(matchesSearchTerm(search));
   }
 
@@ -132,6 +113,11 @@ public class KafkaConnectService {
         .flatMapMany(Flux::fromIterable);
   }
 
+  // returns empty flux if there was an error communicating with Connect
+  public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
+    return getConnectorNames(cluster, connectName).onErrorComplete();
+  }
+
   @SneakyThrows
   private List<String> parseConnectorsNamesStringToList(String json) {
     return objectMapper.readValue(json, new TypeReference<>() {

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java

@@ -25,7 +25,7 @@ class ConnectorsExporter {
 
   Flux<DataEntityList> export(KafkaCluster cluster) {
     return kafkaConnectService.getConnects(cluster)
-        .flatMap(connect -> kafkaConnectService.getConnectorNames(cluster, connect.getName())
+        .flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
             .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
             .flatMap(connectorDTO ->
                 kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName())

+ 2 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java

@@ -77,6 +77,8 @@ public abstract class AbstractIntegrationTest {
       System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
       System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
+      System.setProperty("kafka.clusters.0.kafkaConnect.1.name", "notavailable");
+      System.setProperty("kafka.clusters.0.kafkaConnect.1.address", "http://notavailable:6666");
       System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
       System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
       System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java

@@ -61,7 +61,7 @@ class ConnectorsExporterTest {
     when(kafkaConnectService.getConnects(CLUSTER))
         .thenReturn(Flux.just(connect));
 
-    when(kafkaConnectService.getConnectorNames(CLUSTER, connect.getName()))
+    when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName()))
         .thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName()));
 
     when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName()))