Browse Source

ISSUE-3977: suppressing connect errors when returning list of all connectors

iliax 2 years ago
parent
commit
e69cadfc56

+ 11 - 21
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -61,37 +61,22 @@ public class KafkaConnectService {
   public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
                                                      @Nullable final String search) {
     return getConnects(cluster)
-        .flatMap(connect -> getConnectorNames(cluster, connect.getName()).map(cn -> Tuples.of(connect.getName(), cn)))
-        .flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
+        .flatMap(connect ->
+            getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
+                .flatMap(connectorName -> getConnector(cluster, connect.getName(), connectorName)))
         .flatMap(connector ->
             getConnectorConfig(cluster, connector.getConnect(), connector.getName())
-                .map(config -> InternalConnectInfo.builder()
-                    .connector(connector)
-                    .config(config)
-                    .build()
-                )
-        )
+                .map(config -> InternalConnectInfo.builder().connector(connector).config(config).build()))
         .flatMap(connectInfo -> {
           ConnectorDTO connector = connectInfo.getConnector();
           return getConnectorTasks(cluster, connector.getConnect(), connector.getName())
               .collectList()
-              .map(tasks -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(tasks)
-                  .build()
-              );
+              .map(tasks -> connectInfo.toBuilder().tasks(tasks).build());
         })
         .flatMap(connectInfo -> {
           ConnectorDTO connector = connectInfo.getConnector();
           return getConnectorTopics(cluster, connector.getConnect(), connector.getName())
-              .map(ct -> InternalConnectInfo.builder()
-                  .connector(connector)
-                  .config(connectInfo.getConfig())
-                  .tasks(connectInfo.getTasks())
-                  .topics(ct.getTopics())
-                  .build()
-              );
+              .map(ct -> connectInfo.toBuilder().topics(ct.getTopics()).build());
         })
         .map(kafkaConnectMapper::fullConnectorInfoFromTuple)
         .filter(matchesSearchTerm(search));
@@ -132,6 +117,11 @@ public class KafkaConnectService {
         .flatMapMany(Flux::fromIterable);
   }
 
+  // returns empty flux if there was an error communicating with Connect
+  public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
+    return getConnectorNames(cluster, connectName).onErrorComplete();
+  }
+
   @SneakyThrows
   private List<String> parseConnectorsNamesStringToList(String json) {
     return objectMapper.readValue(json, new TypeReference<>() {

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporter.java

@@ -25,7 +25,7 @@ class ConnectorsExporter {
 
   Flux<DataEntityList> export(KafkaCluster cluster) {
     return kafkaConnectService.getConnects(cluster)
-        .flatMap(connect -> kafkaConnectService.getConnectorNames(cluster, connect.getName())
+        .flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
             .flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
             .flatMap(connectorDTO ->
                 kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName())

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/ConnectorsExporterTest.java

@@ -61,7 +61,7 @@ class ConnectorsExporterTest {
     when(kafkaConnectService.getConnects(CLUSTER))
         .thenReturn(Flux.just(connect));
 
-    when(kafkaConnectService.getConnectorNames(CLUSTER, connect.getName()))
+    when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName()))
         .thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName()));
 
     when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName()))