Browse Source

[Kafka Connect] Fix getting list of connector's topics (#504)

* [Kafka Connect] Fix getting list of connector's topics

* Fix code style in KafkaConnectService.java

* Fix code style
Ildar Almakaev 4 years ago
parent
commit
721cf0e750

+ 17 - 30
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java

@@ -11,11 +11,8 @@ import com.provectus.kafka.ui.model.ConnectorTaskStatus;
 import com.provectus.kafka.ui.model.FullConnectorInfo;
 import com.provectus.kafka.ui.model.Task;
 import com.provectus.kafka.ui.model.TaskStatus;
-import java.util.Arrays;
+import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
 import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import org.apache.commons.lang3.tuple.Triple;
 import org.mapstruct.Mapper;
 
 @Mapper(componentModel = "spring")
@@ -36,32 +33,22 @@ public interface KafkaConnectMapper {
       com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse
           connectorPluginConfigValidationResponse);
 
-  default FullConnectorInfo fullConnectorInfoFromTuple(Triple<Connector, Map<String, Object>,
-      List<Task>> triple) {
-    Function<Map<String, Object>, List<String>> getTopicsFromConfig = config -> {
-      var topic = config.get("topic");
-      if (topic != null) {
-        return List.of((String) topic);
-      }
-      return Arrays.asList(((String) config.get("topics")).split(","));
-    };
-
+  default FullConnectorInfo fullConnectorInfoFromTuple(InternalConnectInfo connectInfo) {
+    Connector connector = connectInfo.getConnector();
+    List<Task> tasks = connectInfo.getTasks();
+    int failedTasksCount = (int) tasks.stream()
+        .map(Task::getStatus)
+        .map(TaskStatus::getState)
+        .filter(ConnectorTaskStatus.FAILED::equals)
+        .count();
     return new FullConnectorInfo()
-        .connect(triple.getLeft().getConnect())
-        .name(triple.getLeft().getName())
-        .connectorClass((String) triple.getMiddle().get("connector.class"))
-        .type(triple.getLeft().getType())
-        .topics(getTopicsFromConfig.apply(triple.getMiddle()))
-        .status(
-            triple.getLeft().getStatus()
-        )
-        .tasksCount(triple.getRight().size())
-        .failedTasksCount((int) triple.getRight().stream()
-            .map(Task::getStatus)
-            .map(TaskStatus::getState)
-            .filter(ConnectorTaskStatus.FAILED::equals)
-            .count());
+        .connect(connector.getConnect())
+        .name(connector.getName())
+        .connectorClass((String) connectInfo.getConfig().get("connector.class"))
+        .type(connector.getType())
+        .topics(connectInfo.getTopics())
+        .status(connector.getStatus())
+        .tasksCount(tasks.size())
+        .failedTasksCount(failedTasksCount);
   }
-
-  ;
 }

+ 17 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/connect/InternalConnectInfo.java

@@ -0,0 +1,17 @@
+package com.provectus.kafka.ui.model.connect;
+
+import com.provectus.kafka.ui.model.Connector;
+import com.provectus.kafka.ui.model.Task;
+import java.util.List;
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder(toBuilder = true)
+public class InternalConnectInfo {
+  private final Connector connector;
+  private final Map<String, Object> config;
+  private final List<Task> tasks;
+  private final List<String> topics;
+}

+ 39 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.provectus.kafka.ui.client.KafkaConnectClients;
+import com.provectus.kafka.ui.connect.model.ConnectorTopics;
 import com.provectus.kafka.ui.exception.ClusterNotFoundException;
 import com.provectus.kafka.ui.exception.ConnectNotFoundException;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
@@ -17,6 +18,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaConnectCluster;
 import com.provectus.kafka.ui.model.NewConnector;
 import com.provectus.kafka.ui.model.Task;
+import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -26,7 +28,6 @@ import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.commons.lang3.tuple.Triple;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -58,16 +59,47 @@ public class KafkaConnectService {
         .flatMap(pair -> getConnector(clusterName, pair.getLeft(), pair.getRight()))
         .flatMap(connector ->
             getConnectorConfig(clusterName, connector.getConnect(), connector.getName())
-                .map(config -> Pair.of(connector, config))
-        )
-        .flatMap(pair ->
-            getConnectorTasks(clusterName, pair.getLeft().getConnect(), pair.getLeft().getName())
-                .collectList()
-                .map(tasks -> Triple.of(pair.getLeft(), pair.getRight(), tasks))
+                .map(config -> InternalConnectInfo.builder()
+                    .connector(connector)
+                    .config(config)
+                    .build()
+                )
         )
+        .flatMap(connectInfo -> {
+          Connector connector = connectInfo.getConnector();
+          return getConnectorTasks(clusterName, connector.getConnect(), connector.getName())
+              .collectList()
+              .map(tasks -> InternalConnectInfo.builder()
+                  .connector(connector)
+                  .config(connectInfo.getConfig())
+                  .tasks(tasks)
+                  .build()
+              );
+        })
+        .flatMap(connectInfo -> {
+          Connector connector = connectInfo.getConnector();
+          return getConnectorTopics(clusterName, connector.getConnect(), connector.getName())
+              .map(ct -> InternalConnectInfo.builder()
+                  .connector(connector)
+                  .config(connectInfo.getConfig())
+                  .tasks(connectInfo.getTasks())
+                  .topics(ct.getTopics())
+                  .build()
+              );
+        })
         .map(kafkaConnectMapper::fullConnectorInfoFromTuple);
   }
 
+  private Mono<ConnectorTopics> getConnectorTopics(String clusterName, String connectClusterName,
+                                                   String connectorName) {
+    return getConnectAddress(clusterName, connectClusterName)
+        .flatMap(connectUrl -> KafkaConnectClients
+            .withBaseUrl(connectUrl)
+            .getConnectorTopics(connectorName)
+            .map(result -> result.get(connectorName))
+        );
+  }
+
   private Flux<Pair<String, String>> getConnectorNames(String clusterName, Connect connect) {
     return getConnectors(clusterName, connect.getName())
         .collectList().map(e -> e.get(0))

+ 30 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml

@@ -231,6 +231,28 @@ paths:
                 items:
                   $ref: '#/components/schemas/ConnectorTask'
 
+  /connectors/{connectorName}/topics:
+    get:
+      tags:
+        - KafkaConnectClient
+      summary: The set of topic names the connector has been using since its creation or since the last time its set of active topics was reset
+      operationId: getConnectorTopics
+      parameters:
+        - name: connectorName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: object
+                additionalProperties:
+                  $ref: '#/components/schemas/ConnectorTopics'
+
   /connectors/{connectorName}/tasks/{taskId}/status:
     get:
       tags:
@@ -499,3 +521,11 @@ components:
           items:
             $ref: '#/components/schemas/ConnectorPluginConfig'
 
+    ConnectorTopics:
+      type: object
+      properties:
+        topics:
+          type: array
+          items:
+            type: string
+