diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 109ed6d356..1a5dcbcaf3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.client.KafkaConnectClients; import com.provectus.kafka.ui.connect.model.ConnectorTopics; +import com.provectus.kafka.ui.connect.model.TaskStatus; import com.provectus.kafka.ui.exception.ClusterNotFoundException; import com.provectus.kafka.ui.exception.ConnectNotFoundException; import com.provectus.kafka.ui.mapper.ClusterMapper; @@ -13,6 +14,7 @@ import com.provectus.kafka.ui.model.Connector; import com.provectus.kafka.ui.model.ConnectorAction; import com.provectus.kafka.ui.model.ConnectorPlugin; import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse; +import com.provectus.kafka.ui.model.ConnectorState; import com.provectus.kafka.ui.model.FullConnectorInfo; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaConnectCluster; @@ -161,23 +163,32 @@ public class KafkaConnectService { public Mono getConnector(String clusterName, String connectName, String connectorName) { return getConnectAddress(clusterName, connectName) - .flatMap(connect -> - KafkaConnectClients.withBaseUrl(connect).getConnector(connectorName) - .map(kafkaConnectMapper::fromClient) - .flatMap(connector -> - KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName()) - .map(connectorStatus -> { - var status = connectorStatus.getConnector(); - connector.status(kafkaConnectMapper.fromClient(status)); - return (Connector) new Connector() - .connect(connectName) - .status(kafkaConnectMapper.fromClient(status)) - .type(connector.getType()) - .tasks(connector.getTasks()) - .name(connector.getName()) - .config(connector.getConfig()); - }) - ) + .flatMap(connect -> KafkaConnectClients.withBaseUrl(connect).getConnector(connectorName) + .map(kafkaConnectMapper::fromClient) + .flatMap(connector -> + KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName()) + .map(connectorStatus -> { + var status = connectorStatus.getConnector(); + Connector result = (Connector) new Connector() + .connect(connectName) + .status(kafkaConnectMapper.fromClient(status)) + .type(connector.getType()) + .tasks(connector.getTasks()) + .name(connector.getName()) + .config(connector.getConfig()); + + if (connectorStatus.getTasks() != null) { + boolean isAnyTaskFailed = connectorStatus.getTasks().stream() + .map(TaskStatus::getState) + .anyMatch(TaskStatus.StateEnum.FAILED::equals); + + if (isAnyTaskFailed) { + result.getStatus().state(ConnectorState.TASK_FAILED); + } + } + return result; + }) + ) ); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java index acc82745f3..2f8a2ca27a 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java @@ -8,8 +8,8 @@ import com.provectus.kafka.ui.model.ConnectorPlugin; import com.provectus.kafka.ui.model.ConnectorPluginConfig; import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse; import com.provectus.kafka.ui.model.ConnectorPluginConfigValue; +import com.provectus.kafka.ui.model.ConnectorState; import com.provectus.kafka.ui.model.ConnectorStatus; -import com.provectus.kafka.ui.model.ConnectorTaskStatus; import com.provectus.kafka.ui.model.ConnectorType; import com.provectus.kafka.ui.model.NewConnector; import com.provectus.kafka.ui.model.TaskId; @@ -171,7 +171,7 @@ public class KafkaConnectServiceTests extends AbstractBaseTest { Connector expected = (Connector) new Connector() .connect(connectName) .status(new ConnectorStatus() - .state(ConnectorTaskStatus.RUNNING) + .state(ConnectorState.RUNNING) .workerId("kafka-connect:8083")) .tasks(List.of(new TaskId() .connector(connectorName) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 278e79df31..07b0dca0ea 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2285,7 +2285,7 @@ components: type: object properties: state: - $ref: '#/components/schemas/ConnectorTaskStatus' + $ref: '#/components/schemas/ConnectorState' worker_id: type: string required: @@ -2299,6 +2299,15 @@ components: - PAUSED - UNASSIGNED + ConnectorState: + type: string + enum: + - RUNNING + - FAILED + - PAUSED + - UNASSIGNED + - TASK_FAILED + ConnectorAction: type: string enum: diff --git a/kafka-ui-react-app/src/components/Connect/ConnectorStatusTag.tsx b/kafka-ui-react-app/src/components/Connect/ConnectorStatusTag.tsx new file mode 100644 index 0000000000..4617eae0f9 --- /dev/null +++ b/kafka-ui-react-app/src/components/Connect/ConnectorStatusTag.tsx @@ -0,0 +1,21 @@ +import cx from 'classnames'; +import { ConnectorState } from 'generated-sources'; +import React from 'react'; + +export interface StatusTagProps { + status: ConnectorState; +} + +const ConnectorStatusTag: React.FC = ({ status }) => { + const classNames = cx('tag', { + 'is-success': status === ConnectorState.RUNNING, + 'is-light': status === ConnectorState.PAUSED, + 'is-warning': status === ConnectorState.UNASSIGNED, + 'is-danger': + status === ConnectorState.FAILED || status === ConnectorState.TASK_FAILED, + }); + + return {status}; +}; + +export default ConnectorStatusTag; diff --git a/kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx b/kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx index 625707e56e..8faed37bfe 100644 --- a/kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx +++ b/kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx @@ -1,6 +1,6 @@ import React from 'react'; import { Link, useHistory, useParams } from 'react-router-dom'; -import { ConnectorTaskStatus } from 'generated-sources'; +import { ConnectorState } from 'generated-sources'; import { ClusterName, ConnectName, ConnectorName } from 'redux/interfaces'; import { clusterConnectConnectorEditPath, @@ -21,7 +21,7 @@ export interface ActionsProps { connectorName: ConnectorName ): Promise; isConnectorDeleting: boolean; - connectorStatus?: ConnectorTaskStatus; + connectorStatus?: ConnectorState; restartConnector( clusterName: ClusterName, connectName: ConnectName, @@ -79,7 +79,7 @@ const Actions: React.FC = ({ return (
- {connectorStatus === ConnectorTaskStatus.RUNNING && ( + {connectorStatus === ConnectorState.RUNNING && ( )} - {connectorStatus === ConnectorTaskStatus.PAUSED && ( + {connectorStatus === ConnectorState.PAUSED && (
- {status && } + {status && } {runningTasks && ( - RUNNING - + ({ @@ -44,7 +44,7 @@ const pausedConnectorState = { ...connector, status: { ...connector.status, - state: ConnectorTaskStatus.PAUSED, + state: ConnectorState.PAUSED, }, }, tasks: tasks.map((task) => ({ diff --git a/kafka-ui-react-app/src/redux/reducers/connect/reducer.ts b/kafka-ui-react-app/src/redux/reducers/connect/reducer.ts index 1db3ba1622..eee7a46962 100644 --- a/kafka-ui-react-app/src/redux/reducers/connect/reducer.ts +++ b/kafka-ui-react-app/src/redux/reducers/connect/reducer.ts @@ -2,7 +2,7 @@ import { getType } from 'typesafe-actions'; import * as actions from 'redux/actions'; import { ConnectState } from 'redux/interfaces/connect'; import { Action } from 'redux/interfaces'; -import { ConnectorTaskStatus } from 'generated-sources'; +import { ConnectorState, ConnectorTaskStatus } from 'generated-sources'; export const initialState: ConnectState = { connects: [], @@ -53,7 +53,7 @@ const reducer = (state = initialState, action: Action): ConnectState => { ...state.currentConnector.connector, status: { ...state.currentConnector.connector?.status, - state: ConnectorTaskStatus.PAUSED, + state: ConnectorState.PAUSED, }, } : null, @@ -76,7 +76,7 @@ const reducer = (state = initialState, action: Action): ConnectState => { ...state.currentConnector.connector, status: { ...state.currentConnector.connector?.status, - state: ConnectorTaskStatus.RUNNING, + state: ConnectorState.RUNNING, }, } : null,