diff --git a/kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx b/kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx index ab19d773ae..9fe477ca13 100644 --- a/kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx +++ b/kafka-ui-react-app/src/components/Connect/Details/Actions/Actions.tsx @@ -1,6 +1,6 @@ import React from 'react'; import { useHistory, useParams } from 'react-router-dom'; -import { ConnectorState } from 'generated-sources'; +import { ConnectorState, ConnectorAction } from 'generated-sources'; import { ClusterName, ConnectName, ConnectorName } from 'redux/interfaces'; import { clusterConnectConnectorEditPath, @@ -34,6 +34,12 @@ export interface ActionsProps { connectName: ConnectName, connectorName: ConnectorName ): void; + restartTasks( + clusterName: ClusterName, + connectName: ConnectName, + connectorName: ConnectorName, + action: ConnectorAction + ): void; pauseConnector( clusterName: ClusterName, connectName: ConnectName, @@ -52,11 +58,13 @@ const Actions: React.FC = ({ isConnectorDeleting, connectorStatus, restartConnector, + restartTasks, pauseConnector, resumeConnector, isConnectorActionRunning, }) => { const { clusterName, connectName, connectorName } = useParams(); + const history = useHistory(); const [ isDeleteConnectorConfirmationVisible, @@ -76,6 +84,13 @@ const Actions: React.FC = ({ restartConnector(clusterName, connectName, connectorName); }, [restartConnector, clusterName, connectName, connectorName]); + const restartTasksHandler = React.useCallback( + (actionType) => { + restartTasks(clusterName, connectName, connectorName, actionType); + }, + [restartTasks, clusterName, connectName, connectorName] + ); + const pauseConnectorHandler = React.useCallback(() => { pauseConnector(clusterName, connectName, connectorName); }, [pauseConnector, clusterName, connectName, connectorName]); @@ -128,6 +143,32 @@ const Actions: React.FC = ({ Restart Connector + + + + + + + + + + + + + + { actions.restartConnectorAction.failure({ alert: { subject: 'local-first-hdfs-source-connector', - title: 'Kafka Connect Connector Tasks Restart', + title: 'Kafka Connect Connector Restart', response: { status: 404, statusText: 'Not Found', diff --git a/kafka-ui-react-app/src/redux/actions/actions.ts b/kafka-ui-react-app/src/redux/actions/actions.ts index 464d918520..dd72f87a8a 100644 --- a/kafka-ui-react-app/src/redux/actions/actions.ts +++ b/kafka-ui-react-app/src/redux/actions/actions.ts @@ -106,6 +106,12 @@ export const restartConnectorAction = createAsyncAction( 'RESTART_CONNECTOR__FAILURE' )(); +export const restartTasksAction = createAsyncAction( + 'RESTART_TASKS__REQUEST', + 'RESTART_TASKS__SUCCESS', + 'RESTART_TASKS__FAILURE' +)(); + export const pauseConnectorAction = createAsyncAction( 'PAUSE_CONNECTOR__REQUEST', 'PAUSE_CONNECTOR__SUCCESS', diff --git a/kafka-ui-react-app/src/redux/actions/thunks/connectors.ts b/kafka-ui-react-app/src/redux/actions/thunks/connectors.ts index 57514b03f6..35124e7ceb 100644 --- a/kafka-ui-react-app/src/redux/actions/thunks/connectors.ts +++ b/kafka-ui-react-app/src/redux/actions/thunks/connectors.ts @@ -18,6 +18,7 @@ import { } from 'redux/interfaces'; import * as actions from 'redux/actions'; import { getResponse } from 'lib/errorHandling'; +import { batch } from 'react-redux'; const apiClientConf = new Configuration(BASE_PARAMS); export const kafkaConnectApiClient = new KafkaConnectApi(apiClientConf); @@ -196,13 +197,46 @@ export const restartConnector = const response = await getResponse(error); const alert: FailurePayload = { subject: [clusterName, connectName, connectorName].join('-'), - title: `Kafka Connect Connector Tasks Restart`, + title: `Kafka Connect Connector Restart`, response, }; dispatch(actions.restartConnectorAction.failure({ alert })); } }; +export const restartTasks = + ( + clusterName: ClusterName, + connectName: ConnectName, + connectorName: ConnectorName, + action: ConnectorAction + ): PromiseThunkResult => + async (dispatch) => { + dispatch(actions.restartTasksAction.request()); + try { + await kafkaConnectApiClient.updateConnectorState({ + clusterName, + connectName, + connectorName, + action, + }); + batch(() => { + dispatch(actions.restartTasksAction.success()); + dispatch( + fetchConnectorTasks(clusterName, connectName, connectorName, true) + ); + }); + } catch (error) { + const response = await getResponse(error); + const alert: FailurePayload = { + subject: [clusterName, connectName, connectorName].join('-'), + title: `Kafka Connect Connector Tasks Restart`, + response, + }; + dispatch(actions.restartTasksAction.failure({ alert })); + } + }; + export const pauseConnector = ( clusterName: ClusterName,