123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478 |
- import { createAsyncThunk, createSlice } from '@reduxjs/toolkit';
- import {
- Connect,
- Connector,
- ConnectorAction,
- ConnectorState,
- ConnectorTaskStatus,
- FullConnectorInfo,
- NewConnector,
- Task,
- TaskId,
- } from 'generated-sources';
- import { kafkaConnectApiClient } from 'lib/api';
- import { getResponse } from 'lib/errorHandling';
- import {
- ClusterName,
- ConnectName,
- ConnectorConfig,
- ConnectorName,
- ConnectorSearch,
- ConnectState,
- } from 'redux/interfaces';
- import { showSuccessAlert } from 'redux/reducers/alerts/alertsSlice';
- export const fetchConnects = createAsyncThunk<
- { connects: Connect[] },
- ClusterName
- >('connect/fetchConnects', async (clusterName, { rejectWithValue }) => {
- try {
- const connects = await kafkaConnectApiClient.getConnects({ clusterName });
- return { connects };
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- });
- export const fetchConnectors = createAsyncThunk<
- { connectors: FullConnectorInfo[] },
- { clusterName: ClusterName; search?: string }
- >(
- 'connect/fetchConnectors',
- async ({ clusterName, search = '' }, { rejectWithValue }) => {
- try {
- const connectors = await kafkaConnectApiClient.getAllConnectors({
- clusterName,
- search,
- });
- return { connectors };
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const fetchConnector = createAsyncThunk<
- { connector: Connector },
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- }
- >(
- 'connect/fetchConnector',
- async ({ clusterName, connectName, connectorName }, { rejectWithValue }) => {
- try {
- const connector = await kafkaConnectApiClient.getConnector({
- clusterName,
- connectName,
- connectorName,
- });
- return { connector };
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const createConnector = createAsyncThunk<
- { connector: Connector },
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- newConnector: NewConnector;
- }
- >(
- 'connect/createConnector',
- async ({ clusterName, connectName, newConnector }, { rejectWithValue }) => {
- try {
- const connector = await kafkaConnectApiClient.createConnector({
- clusterName,
- connectName,
- newConnector,
- });
- return { connector };
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const deleteConnector = createAsyncThunk<
- { connectorName: string },
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- }
- >(
- 'connect/deleteConnector',
- async (
- { clusterName, connectName, connectorName },
- { rejectWithValue, dispatch }
- ) => {
- try {
- await kafkaConnectApiClient.deleteConnector({
- clusterName,
- connectName,
- connectorName,
- });
- dispatch(fetchConnectors({ clusterName, search: '' }));
- return { connectorName };
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const fetchConnectorTasks = createAsyncThunk<
- { tasks: Task[] },
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- }
- >(
- 'connect/fetchConnectorTasks',
- async ({ clusterName, connectName, connectorName }, { rejectWithValue }) => {
- try {
- const tasks = await kafkaConnectApiClient.getConnectorTasks({
- clusterName,
- connectName,
- connectorName,
- });
- return { tasks };
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const restartConnector = createAsyncThunk<
- undefined,
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- }
- >(
- 'connect/restartConnector',
- async (
- { clusterName, connectName, connectorName },
- { rejectWithValue, dispatch }
- ) => {
- try {
- await kafkaConnectApiClient.updateConnectorState({
- clusterName,
- connectName,
- connectorName,
- action: ConnectorAction.RESTART,
- });
- dispatch(
- fetchConnectorTasks({
- clusterName,
- connectName,
- connectorName,
- })
- );
- return undefined;
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const restartTasks = createAsyncThunk<
- undefined,
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- action: ConnectorAction;
- }
- >(
- 'connect/restartTasks',
- async (
- { clusterName, connectName, connectorName, action },
- { rejectWithValue, dispatch }
- ) => {
- try {
- await kafkaConnectApiClient.updateConnectorState({
- clusterName,
- connectName,
- connectorName,
- action,
- });
- dispatch(
- fetchConnectorTasks({
- clusterName,
- connectName,
- connectorName,
- })
- );
- return undefined;
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const restartConnectorTask = createAsyncThunk<
- undefined,
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- taskId: TaskId['task'];
- }
- >(
- 'connect/restartConnectorTask',
- async (
- { clusterName, connectName, connectorName, taskId },
- { rejectWithValue, dispatch }
- ) => {
- try {
- await kafkaConnectApiClient.restartConnectorTask({
- clusterName,
- connectName,
- connectorName,
- taskId: Number(taskId),
- });
- await dispatch(
- fetchConnectorTasks({
- clusterName,
- connectName,
- connectorName,
- })
- );
- dispatch(
- showSuccessAlert({
- id: `connect-${connectName}-${clusterName}`,
- message: 'Tasks successfully restarted.',
- })
- );
- return undefined;
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const fetchConnectorConfig = createAsyncThunk<
- { config: { [key: string]: unknown } },
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- }
- >(
- 'connect/fetchConnectorConfig',
- async ({ clusterName, connectName, connectorName }, { rejectWithValue }) => {
- try {
- const config = await kafkaConnectApiClient.getConnectorConfig({
- clusterName,
- connectName,
- connectorName,
- });
- return { config };
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const updateConnectorConfig = createAsyncThunk<
- { connector: Connector },
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- connectorConfig: ConnectorConfig;
- }
- >(
- 'connect/updateConnectorConfig',
- async (
- { clusterName, connectName, connectorName, connectorConfig },
- { rejectWithValue, dispatch }
- ) => {
- try {
- const connector = await kafkaConnectApiClient.setConnectorConfig({
- clusterName,
- connectName,
- connectorName,
- requestBody: connectorConfig,
- });
- dispatch(fetchConnector({ clusterName, connectName, connectorName }));
- dispatch(
- showSuccessAlert({
- id: `connector-${connectorName}-${clusterName}`,
- message: 'Connector config updated.',
- })
- );
- return { connector };
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const initialState: ConnectState = {
- connects: [],
- connectors: [],
- currentConnector: {
- connector: null,
- tasks: [],
- config: null,
- },
- search: '',
- };
- const connectSlice = createSlice({
- name: 'connect',
- initialState,
- reducers: {
- setConnectorStatusState: (state, { payload }) => {
- const { connector, tasks } = state.currentConnector;
- if (connector) {
- connector.status.state = payload.connectorState;
- }
- state.currentConnector.tasks = tasks.map((task) => ({
- ...task,
- status: {
- ...task.status,
- state: payload.taskState,
- },
- }));
- },
- },
- extraReducers: (builder) => {
- builder.addCase(fetchConnects.fulfilled, (state, { payload }) => {
- state.connects = payload.connects;
- });
- builder.addCase(fetchConnectors.fulfilled, (state, { payload }) => {
- state.connectors = payload.connectors;
- });
- builder.addCase(fetchConnector.fulfilled, (state, { payload }) => {
- state.currentConnector.connector = payload.connector;
- });
- builder.addCase(createConnector.fulfilled, (state, { payload }) => {
- state.currentConnector.connector = payload.connector;
- });
- builder.addCase(deleteConnector.fulfilled, (state, { payload }) => {
- state.connectors = state.connectors.filter(
- ({ name }) => name !== payload.connectorName
- );
- });
- builder.addCase(fetchConnectorTasks.fulfilled, (state, { payload }) => {
- state.currentConnector.tasks = payload.tasks;
- });
- builder.addCase(fetchConnectorConfig.fulfilled, (state, { payload }) => {
- state.currentConnector.config = payload.config;
- });
- builder.addCase(updateConnectorConfig.fulfilled, (state, { payload }) => {
- state.currentConnector.connector = payload.connector;
- state.currentConnector.config = payload.connector.config;
- });
- },
- });
- export const { setConnectorStatusState } = connectSlice.actions;
- export const pauseCurrentConnector = () =>
- setConnectorStatusState({
- connectorState: ConnectorState.PAUSED,
- taskState: ConnectorTaskStatus.PAUSED,
- });
- export const resumeCurrentConnector = () =>
- setConnectorStatusState({
- connectorState: ConnectorState.RUNNING,
- taskState: ConnectorTaskStatus.RUNNING,
- });
- export const pauseConnector = createAsyncThunk<
- undefined,
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- }
- >(
- 'connect/pauseConnector',
- async (
- { clusterName, connectName, connectorName },
- { rejectWithValue, dispatch }
- ) => {
- try {
- await kafkaConnectApiClient.updateConnectorState({
- clusterName,
- connectName,
- connectorName,
- action: ConnectorAction.PAUSE,
- });
- dispatch(pauseCurrentConnector());
- return undefined;
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const resumeConnector = createAsyncThunk<
- undefined,
- {
- clusterName: ClusterName;
- connectName: ConnectName;
- connectorName: ConnectorName;
- }
- >(
- 'connect/resumeConnector',
- async (
- { clusterName, connectName, connectorName },
- { rejectWithValue, dispatch }
- ) => {
- try {
- await kafkaConnectApiClient.updateConnectorState({
- clusterName,
- connectName,
- connectorName,
- action: ConnectorAction.RESUME,
- });
- dispatch(resumeCurrentConnector());
- return undefined;
- } catch (err) {
- return rejectWithValue(await getResponse(err as Response));
- }
- }
- );
- export const setConnectorSearch = (connectorSearch: ConnectorSearch) => {
- return fetchConnectors({
- clusterName: connectorSearch.clusterName,
- search: connectorSearch.search,
- });
- };
- export default connectSlice.reducer;
|