diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java index 06136192a4..c6f4b61b7e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java @@ -6,6 +6,7 @@ import com.provectus.kafka.ui.model.Connector; import com.provectus.kafka.ui.model.ConnectorAction; import com.provectus.kafka.ui.model.ConnectorPlugin; import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse; +import com.provectus.kafka.ui.model.FullConnectorInfo; import com.provectus.kafka.ui.model.NewConnector; import com.provectus.kafka.ui.model.Task; import com.provectus.kafka.ui.service.KafkaConnectService; @@ -62,6 +63,14 @@ public class KafkaConnectController implements KafkaConnectApi { .map(ResponseEntity::ok); } + @Override + public Mono>> getAllConnectors( + String clusterName, + ServerWebExchange exchange + ) { + return Mono.just(ResponseEntity.ok(kafkaConnectService.getAllConnectors(clusterName))); + } + @Override public Mono>> getConnectorConfig(String clusterName, String connectName, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java index 8c20b3866d..5d18463d28 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/KafkaConnectMapper.java @@ -7,8 +7,15 @@ import com.provectus.kafka.ui.model.Connector; import com.provectus.kafka.ui.model.ConnectorPlugin; import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse; import com.provectus.kafka.ui.model.ConnectorStatus; +import com.provectus.kafka.ui.model.ConnectorTaskStatus; +import com.provectus.kafka.ui.model.FullConnectorInfo; import com.provectus.kafka.ui.model.Task; import com.provectus.kafka.ui.model.TaskStatus; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.commons.lang3.tuple.Triple; import org.mapstruct.Mapper; @Mapper(componentModel = "spring") @@ -28,4 +35,33 @@ public interface KafkaConnectMapper { ConnectorPluginConfigValidationResponse fromClient( com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse); + + default FullConnectorInfo fullConnectorInfoFromTuple(Triple, + List> triple) { + Function, List> getTopicsFromConfig = config -> { + var topic = config.get("topic"); + if (topic != null) { + return List.of((String) topic); + } + return Arrays.asList(((String) config.get("topics")).split(",")); + }; + + return new FullConnectorInfo() + .connect(triple.getLeft().getConnect()) + .name(triple.getLeft().getName()) + .connectorClass((String) triple.getMiddle().get("connector.class")) + .type(triple.getLeft().getType()) + .topics(getTopicsFromConfig.apply(triple.getMiddle())) + .status( + triple.getLeft().getStatus().getState() + ) + .tasksCount(triple.getRight().size()) + .failedTasksCount((int) triple.getRight().stream() + .map(Task::getStatus) + .map(TaskStatus::getState) + .filter(ConnectorTaskStatus.FAILED::equals) + .count()); + } + + ; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 85e163f174..cb381edb46 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -1,5 +1,7 @@ package com.provectus.kafka.ui.service; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.client.KafkaConnectClients; import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.exception.ConnectNotFoundException; @@ -10,16 +12,21 @@ import com.provectus.kafka.ui.model.Connector; import com.provectus.kafka.ui.model.ConnectorAction; import com.provectus.kafka.ui.model.ConnectorPlugin; import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse; +import com.provectus.kafka.ui.model.FullConnectorInfo; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaConnectCluster; import com.provectus.kafka.ui.model.NewConnector; import com.provectus.kafka.ui.model.Task; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -31,6 +38,7 @@ public class KafkaConnectService { private final ClustersStorage clustersStorage; private final ClusterMapper clusterMapper; private final KafkaConnectMapper kafkaConnectMapper; + private final ObjectMapper objectMapper; public Mono> getConnects(String clusterName) { return Mono.just( @@ -43,6 +51,38 @@ public class KafkaConnectService { ); } + public Flux getAllConnectors(String clusterName) { + return getConnects(clusterName) + .flatMapMany(Function.identity()) + .flatMap(connect -> getConnectorNames(clusterName, connect)) + .flatMap(pair -> getConnector(clusterName, pair.getLeft(), pair.getRight())) + .flatMap(connector -> + getConnectorConfig(clusterName, connector.getConnect(), connector.getName()) + .map(config -> Pair.of(connector, config)) + ) + .flatMap(pair -> + getConnectorTasks(clusterName, pair.getLeft().getConnect(), pair.getLeft().getName()) + .collectList() + .map(tasks -> Triple.of(pair.getLeft(), pair.getRight(), tasks)) + ) + .map(kafkaConnectMapper::fullConnectorInfoFromTuple); + } + + private Flux> getConnectorNames(String clusterName, Connect connect) { + return getConnectors(clusterName, connect.getName()) + .collectList().map(e -> e.get(0)) + // for some reason `getConnectors` method returns the response as a single string + .map(this::parseToList) + .flatMapMany(Flux::fromIterable) + .map(connector -> Pair.of(connect.getName(), connector)); + } + + @SneakyThrows + private List parseToList(String json) { + return objectMapper.readValue(json, new TypeReference<>() { + }); + } + public Flux getConnectors(String clusterName, String connectName) { return getConnectAddress(clusterName, connectName) .flatMapMany(connect -> @@ -76,6 +116,7 @@ public class KafkaConnectService { var status = connectorStatus.getConnector(); connector.status(kafkaConnectMapper.fromClient(status)); return (Connector) new Connector() + .connect(connectName) .status(kafkaConnectMapper.fromClient(status)) .type(connector.getType()) .tasks(connector.getTasks()) diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java index 1dd47f6c07..444ca10c7c 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java @@ -9,6 +9,8 @@ import com.provectus.kafka.ui.model.ConnectorPluginConfig; import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse; import com.provectus.kafka.ui.model.ConnectorPluginConfigValue; import com.provectus.kafka.ui.model.ConnectorStatus; +import com.provectus.kafka.ui.model.ConnectorTaskStatus; +import com.provectus.kafka.ui.model.ConnectorType; import com.provectus.kafka.ui.model.NewConnector; import com.provectus.kafka.ui.model.TaskId; import java.util.List; @@ -100,13 +102,14 @@ public class KafkaConnectServiceTests extends AbstractBaseTest { @Test public void shouldRetrieveConnector() { Connector expected = (Connector) new Connector() + .connect(connectName) .status(new ConnectorStatus() - .state(ConnectorStatus.StateEnum.RUNNING) + .state(ConnectorTaskStatus.RUNNING) .workerId("kafka-connect:8083")) .tasks(List.of(new TaskId() .connector(connectorName) .task(0))) - .type(Connector.TypeEnum.SINK) + .type(ConnectorType.SINK) .name(connectorName) .config(config); webTestClient.get() diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 86bd3cd0ec..a89bbbc55a 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -745,11 +745,33 @@ paths: items: $ref: '#/components/schemas/Connect' + /api/clusters/{clusterName}/connectors: + get: + tags: + - Kafka Connect + summary: get all kafka connectors + operationId: getAllConnectors + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/FullConnectorInfo' + /api/clusters/{clusterName}/connects/{connectName}/connectors: get: tags: - Kafka Connect - summary: get all connectors from Kafka Connect service + summary: get connectors for provided kafka connect instance operationId: getConnectors parameters: - name: clusterName @@ -1616,13 +1638,17 @@ components: items: $ref: '#/components/schemas/TaskId' type: - type: string - enum: - - source - - sink + $ref: '#/components/schemas/ConnectorType' status: $ref: '#/components/schemas/ConnectorStatus' + connect: + type: string + ConnectorType: + type: string + enum: + - SOURCE + - SINK TaskStatus: type: object @@ -1630,12 +1656,7 @@ components: id: type: integer state: - type: string - enum: - - RUNNING - - FAILED - - PAUSED - - UNASSIGNED + $ref: '#/components/schemas/ConnectorTaskStatus' worker_id: type: string trace: @@ -1645,15 +1666,18 @@ components: type: object properties: state: - type: string - enum: - - RUNNING - - FAILED - - PAUSED - - UNASSIGNED + $ref: '#/components/schemas/ConnectorTaskStatus' worker_id: type: string + ConnectorTaskStatus: + type: string + enum: + - RUNNING + - FAILED + - PAUSED + - UNASSIGNED + ConnectorAction: type: string enum: @@ -1760,3 +1784,25 @@ components: type: array items: $ref: '#/components/schemas/ConnectorPluginConfig' + + FullConnectorInfo: + type: object + properties: + connect: + type: string + name: + type: string + connector_class: + type: string + type: + $ref: '#/components/schemas/ConnectorType' + topics: + type: array + items: + type: string + status: + $ref: '#/components/schemas/ConnectorTaskStatus' + tasks_count: + type: integer + failed_tasks_count: + type: integer