浏览代码

#356 added endpoint to fetch all cluster connectors (#361)

* added endpoint to fetch all cluster connectors

* - refactoring code
- refactoring schema
- added connector class property
Ramazan Yapparov 4 年之前
父节点
当前提交
eef45fc6ab

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java

@@ -6,6 +6,7 @@ import com.provectus.kafka.ui.model.Connector;
 import com.provectus.kafka.ui.model.ConnectorAction;
 import com.provectus.kafka.ui.model.ConnectorPlugin;
 import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse;
+import com.provectus.kafka.ui.model.FullConnectorInfo;
 import com.provectus.kafka.ui.model.NewConnector;
 import com.provectus.kafka.ui.model.Task;
 import com.provectus.kafka.ui.service.KafkaConnectService;
@@ -62,6 +63,14 @@ public class KafkaConnectController implements KafkaConnectApi {
         .map(ResponseEntity::ok);
   }
 
+  @Override
+  public Mono<ResponseEntity<Flux<FullConnectorInfo>>> getAllConnectors(
+      String clusterName,
+      ServerWebExchange exchange
+  ) {
+    return Mono.just(ResponseEntity.ok(kafkaConnectService.getAllConnectors(clusterName)));
+  }
+
   @Override
   public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
                                                                       String connectName,

+ 36 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java

@@ -7,8 +7,15 @@ import com.provectus.kafka.ui.model.Connector;
 import com.provectus.kafka.ui.model.ConnectorPlugin;
 import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse;
 import com.provectus.kafka.ui.model.ConnectorStatus;
+import com.provectus.kafka.ui.model.ConnectorTaskStatus;
+import com.provectus.kafka.ui.model.FullConnectorInfo;
 import com.provectus.kafka.ui.model.Task;
 import com.provectus.kafka.ui.model.TaskStatus;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Triple;
 import org.mapstruct.Mapper;
 
 @Mapper(componentModel = "spring")
@@ -28,4 +35,33 @@ public interface KafkaConnectMapper {
   ConnectorPluginConfigValidationResponse fromClient(
       com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse
           connectorPluginConfigValidationResponse);
+
+  default FullConnectorInfo fullConnectorInfoFromTuple(Triple<Connector, Map<String, Object>,
+      List<Task>> triple) {
+    Function<Map<String, Object>, List<String>> getTopicsFromConfig = config -> {
+      var topic = config.get("topic");
+      if (topic != null) {
+        return List.of((String) topic);
+      }
+      return Arrays.asList(((String) config.get("topics")).split(","));
+    };
+
+    return new FullConnectorInfo()
+        .connect(triple.getLeft().getConnect())
+        .name(triple.getLeft().getName())
+        .connectorClass((String) triple.getMiddle().get("connector.class"))
+        .type(triple.getLeft().getType())
+        .topics(getTopicsFromConfig.apply(triple.getMiddle()))
+        .status(
+            triple.getLeft().getStatus().getState()
+        )
+        .tasksCount(triple.getRight().size())
+        .failedTasksCount((int) triple.getRight().stream()
+            .map(Task::getStatus)
+            .map(TaskStatus::getState)
+            .filter(ConnectorTaskStatus.FAILED::equals)
+            .count());
+  }
+
+  ;
 }

+ 41 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -1,5 +1,7 @@
 package com.provectus.kafka.ui.service;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.provectus.kafka.ui.client.KafkaConnectClients;
 import com.provectus.kafka.ui.exception.ClusterNotFoundException;
 import com.provectus.kafka.ui.exception.ConnectNotFoundException;
@@ -10,16 +12,21 @@ import com.provectus.kafka.ui.model.Connector;
 import com.provectus.kafka.ui.model.ConnectorAction;
 import com.provectus.kafka.ui.model.ConnectorPlugin;
 import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse;
+import com.provectus.kafka.ui.model.FullConnectorInfo;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaConnectCluster;
 import com.provectus.kafka.ui.model.NewConnector;
 import com.provectus.kafka.ui.model.Task;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -31,6 +38,7 @@ public class KafkaConnectService {
   private final ClustersStorage clustersStorage;
   private final ClusterMapper clusterMapper;
   private final KafkaConnectMapper kafkaConnectMapper;
+  private final ObjectMapper objectMapper;
 
   public Mono<Flux<Connect>> getConnects(String clusterName) {
     return Mono.just(
@@ -43,6 +51,38 @@ public class KafkaConnectService {
     );
   }
 
+  public Flux<FullConnectorInfo> getAllConnectors(String clusterName) {
+    return getConnects(clusterName)
+        .flatMapMany(Function.identity())
+        .flatMap(connect -> getConnectorNames(clusterName, connect))
+        .flatMap(pair -> getConnector(clusterName, pair.getLeft(), pair.getRight()))
+        .flatMap(connector ->
+            getConnectorConfig(clusterName, connector.getConnect(), connector.getName())
+                .map(config -> Pair.of(connector, config))
+        )
+        .flatMap(pair ->
+            getConnectorTasks(clusterName, pair.getLeft().getConnect(), pair.getLeft().getName())
+                .collectList()
+                .map(tasks -> Triple.of(pair.getLeft(), pair.getRight(), tasks))
+        )
+        .map(kafkaConnectMapper::fullConnectorInfoFromTuple);
+  }
+
+  private Flux<Pair<String, String>> getConnectorNames(String clusterName, Connect connect) {
+    return getConnectors(clusterName, connect.getName())
+        .collectList().map(e -> e.get(0))
+        // for some reason `getConnectors` method returns the response as a single string
+        .map(this::parseToList)
+        .flatMapMany(Flux::fromIterable)
+        .map(connector -> Pair.of(connect.getName(), connector));
+  }
+
+  @SneakyThrows
+  private List<String> parseToList(String json) {
+    return objectMapper.readValue(json, new TypeReference<>() {
+    });
+  }
+
   public Flux<String> getConnectors(String clusterName, String connectName) {
     return getConnectAddress(clusterName, connectName)
         .flatMapMany(connect ->
@@ -76,6 +116,7 @@ public class KafkaConnectService {
                           var status = connectorStatus.getConnector();
                           connector.status(kafkaConnectMapper.fromClient(status));
                           return (Connector) new Connector()
+                              .connect(connectName)
                               .status(kafkaConnectMapper.fromClient(status))
                               .type(connector.getType())
                               .tasks(connector.getTasks())

+ 5 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java

@@ -9,6 +9,8 @@ import com.provectus.kafka.ui.model.ConnectorPluginConfig;
 import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse;
 import com.provectus.kafka.ui.model.ConnectorPluginConfigValue;
 import com.provectus.kafka.ui.model.ConnectorStatus;
+import com.provectus.kafka.ui.model.ConnectorTaskStatus;
+import com.provectus.kafka.ui.model.ConnectorType;
 import com.provectus.kafka.ui.model.NewConnector;
 import com.provectus.kafka.ui.model.TaskId;
 import java.util.List;
@@ -100,13 +102,14 @@ public class KafkaConnectServiceTests extends AbstractBaseTest {
   @Test
   public void shouldRetrieveConnector() {
     Connector expected = (Connector) new Connector()
+        .connect(connectName)
         .status(new ConnectorStatus()
-            .state(ConnectorStatus.StateEnum.RUNNING)
+            .state(ConnectorTaskStatus.RUNNING)
             .workerId("kafka-connect:8083"))
         .tasks(List.of(new TaskId()
             .connector(connectorName)
             .task(0)))
-        .type(Connector.TypeEnum.SINK)
+        .type(ConnectorType.SINK)
         .name(connectorName)
         .config(config);
     webTestClient.get()

+ 63 - 17
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -745,11 +745,33 @@ paths:
                 items:
                   $ref: '#/components/schemas/Connect'
 
+  /api/clusters/{clusterName}/connectors:
+    get:
+      tags:
+        - Kafka Connect
+      summary: get all kafka connectors
+      operationId: getAllConnectors
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/FullConnectorInfo'
+
   /api/clusters/{clusterName}/connects/{connectName}/connectors:
     get:
       tags:
         - Kafka Connect
-      summary: get all connectors from Kafka Connect service
+      summary: get connectors for provided kafka connect instance
       operationId: getConnectors
       parameters:
         - name: clusterName
@@ -1616,13 +1638,17 @@ components:
               items:
                 $ref: '#/components/schemas/TaskId'
             type:
-              type: string
-              enum:
-                - source
-                - sink
+              $ref: '#/components/schemas/ConnectorType'
             status:
               $ref: '#/components/schemas/ConnectorStatus'
+            connect:
+              type: string
 
+    ConnectorType:
+      type: string
+      enum:
+        - SOURCE
+        - SINK
 
     TaskStatus:
       type: object
@@ -1630,12 +1656,7 @@ components:
         id:
           type: integer
         state:
-          type: string
-          enum:
-            - RUNNING
-            - FAILED
-            - PAUSED
-            - UNASSIGNED
+          $ref: '#/components/schemas/ConnectorTaskStatus'
         worker_id:
           type: string
         trace:
@@ -1645,15 +1666,18 @@ components:
       type: object
       properties:
         state:
-          type: string
-          enum:
-            - RUNNING
-            - FAILED
-            - PAUSED
-            - UNASSIGNED
+          $ref: '#/components/schemas/ConnectorTaskStatus'
         worker_id:
           type: string
 
+    ConnectorTaskStatus:
+      type: string
+      enum:
+        - RUNNING
+        - FAILED
+        - PAUSED
+        - UNASSIGNED
+
     ConnectorAction:
       type: string
       enum:
@@ -1760,3 +1784,25 @@ components:
           type: array
           items:
             $ref: '#/components/schemas/ConnectorPluginConfig'
+
+    FullConnectorInfo:
+      type: object
+      properties:
+        connect:
+          type: string
+        name:
+          type: string
+        connector_class:
+          type: string
+        type:
+          $ref: '#/components/schemas/ConnectorType'
+        topics:
+          type: array
+          items:
+            type: string
+        status:
+          $ref: '#/components/schemas/ConnectorTaskStatus'
+        tasks_count:
+          type: integer
+        failed_tasks_count:
+          type: integer