connectSlice.ts 11 KB


  1. import { createAsyncThunk, createSlice } from '@reduxjs/toolkit';
  2. import {
  3. Connect,
  4. Connector,
  5. ConnectorAction,
  6. ConnectorState,
  7. ConnectorTaskStatus,
  8. FullConnectorInfo,
  9. NewConnector,
  10. Task,
  11. TaskId,
  12. } from 'generated-sources';
  13. import { kafkaConnectApiClient } from 'lib/api';
  14. import { getResponse } from 'lib/errorHandling';
  15. import {
  16. ClusterName,
  17. ConnectName,
  18. ConnectorConfig,
  19. ConnectorName,
  20. ConnectorSearch,
  21. ConnectState,
  22. } from 'redux/interfaces';
  23. import { showSuccessAlert } from 'redux/reducers/alerts/alertsSlice';
  24. export const fetchConnects = createAsyncThunk<
  25. { connects: Connect[] },
  26. ClusterName
  27. >('connect/fetchConnects', async (clusterName, { rejectWithValue }) => {
  28. try {
  29. const connects = await kafkaConnectApiClient.getConnects({ clusterName });
  30. return { connects };
  31. } catch (err) {
  32. return rejectWithValue(await getResponse(err as Response));
  33. }
  34. });
  35. export const fetchConnectors = createAsyncThunk<
  36. { connectors: FullConnectorInfo[] },
  37. { clusterName: ClusterName; search?: string }
  38. >(
  39. 'connect/fetchConnectors',
  40. async ({ clusterName, search = '' }, { rejectWithValue }) => {
  41. try {
  42. const connectors = await kafkaConnectApiClient.getAllConnectors({
  43. clusterName,
  44. search,
  45. });
  46. return { connectors };
  47. } catch (err) {
  48. return rejectWithValue(await getResponse(err as Response));
  49. }
  50. }
  51. );
  52. export const fetchConnector = createAsyncThunk<
  53. { connector: Connector },
  54. {
  55. clusterName: ClusterName;
  56. connectName: ConnectName;
  57. connectorName: ConnectorName;
  58. }
  59. >(
  60. 'connect/fetchConnector',
  61. async ({ clusterName, connectName, connectorName }, { rejectWithValue }) => {
  62. try {
  63. const connector = await kafkaConnectApiClient.getConnector({
  64. clusterName,
  65. connectName,
  66. connectorName,
  67. });
  68. return { connector };
  69. } catch (err) {
  70. return rejectWithValue(await getResponse(err as Response));
  71. }
  72. }
  73. );
  74. export const createConnector = createAsyncThunk<
  75. { connector: Connector },
  76. {
  77. clusterName: ClusterName;
  78. connectName: ConnectName;
  79. newConnector: NewConnector;
  80. }
  81. >(
  82. 'connect/createConnector',
  83. async ({ clusterName, connectName, newConnector }, { rejectWithValue }) => {
  84. try {
  85. const connector = await kafkaConnectApiClient.createConnector({
  86. clusterName,
  87. connectName,
  88. newConnector,
  89. });
  90. return { connector };
  91. } catch (err) {
  92. return rejectWithValue(await getResponse(err as Response));
  93. }
  94. }
  95. );
  96. export const deleteConnector = createAsyncThunk<
  97. { connectorName: string },
  98. {
  99. clusterName: ClusterName;
  100. connectName: ConnectName;
  101. connectorName: ConnectorName;
  102. }
  103. >(
  104. 'connect/deleteConnector',
  105. async (
  106. { clusterName, connectName, connectorName },
  107. { rejectWithValue, dispatch }
  108. ) => {
  109. try {
  110. await kafkaConnectApiClient.deleteConnector({
  111. clusterName,
  112. connectName,
  113. connectorName,
  114. });
  115. dispatch(fetchConnectors({ clusterName, search: '' }));
  116. return { connectorName };
  117. } catch (err) {
  118. return rejectWithValue(await getResponse(err as Response));
  119. }
  120. }
  121. );
  122. export const fetchConnectorTasks = createAsyncThunk<
  123. { tasks: Task[] },
  124. {
  125. clusterName: ClusterName;
  126. connectName: ConnectName;
  127. connectorName: ConnectorName;
  128. }
  129. >(
  130. 'connect/fetchConnectorTasks',
  131. async ({ clusterName, connectName, connectorName }, { rejectWithValue }) => {
  132. try {
  133. const tasks = await kafkaConnectApiClient.getConnectorTasks({
  134. clusterName,
  135. connectName,
  136. connectorName,
  137. });
  138. return { tasks };
  139. } catch (err) {
  140. return rejectWithValue(await getResponse(err as Response));
  141. }
  142. }
  143. );
  144. export const restartConnector = createAsyncThunk<
  145. undefined,
  146. {
  147. clusterName: ClusterName;
  148. connectName: ConnectName;
  149. connectorName: ConnectorName;
  150. }
  151. >(
  152. 'connect/restartConnector',
  153. async (
  154. { clusterName, connectName, connectorName },
  155. { rejectWithValue, dispatch }
  156. ) => {
  157. try {
  158. await kafkaConnectApiClient.updateConnectorState({
  159. clusterName,
  160. connectName,
  161. connectorName,
  162. action: ConnectorAction.RESTART,
  163. });
  164. dispatch(
  165. fetchConnectorTasks({
  166. clusterName,
  167. connectName,
  168. connectorName,
  169. })
  170. );
  171. return undefined;
  172. } catch (err) {
  173. return rejectWithValue(await getResponse(err as Response));
  174. }
  175. }
  176. );
  177. export const restartTasks = createAsyncThunk<
  178. undefined,
  179. {
  180. clusterName: ClusterName;
  181. connectName: ConnectName;
  182. connectorName: ConnectorName;
  183. action: ConnectorAction;
  184. }
  185. >(
  186. 'connect/restartTasks',
  187. async (
  188. { clusterName, connectName, connectorName, action },
  189. { rejectWithValue, dispatch }
  190. ) => {
  191. try {
  192. await kafkaConnectApiClient.updateConnectorState({
  193. clusterName,
  194. connectName,
  195. connectorName,
  196. action,
  197. });
  198. dispatch(
  199. fetchConnectorTasks({
  200. clusterName,
  201. connectName,
  202. connectorName,
  203. })
  204. );
  205. return undefined;
  206. } catch (err) {
  207. return rejectWithValue(await getResponse(err as Response));
  208. }
  209. }
  210. );
  211. export const restartConnectorTask = createAsyncThunk<
  212. undefined,
  213. {
  214. clusterName: ClusterName;
  215. connectName: ConnectName;
  216. connectorName: ConnectorName;
  217. taskId: TaskId['task'];
  218. }
  219. >(
  220. 'connect/restartConnectorTask',
  221. async (
  222. { clusterName, connectName, connectorName, taskId },
  223. { rejectWithValue, dispatch }
  224. ) => {
  225. try {
  226. await kafkaConnectApiClient.restartConnectorTask({
  227. clusterName,
  228. connectName,
  229. connectorName,
  230. taskId: Number(taskId),
  231. });
  232. await dispatch(
  233. fetchConnectorTasks({
  234. clusterName,
  235. connectName,
  236. connectorName,
  237. })
  238. );
  239. dispatch(
  240. showSuccessAlert({
  241. id: `connect-${connectName}-${clusterName}`,
  242. message: 'Tasks successfully restarted.',
  243. })
  244. );
  245. return undefined;
  246. } catch (err) {
  247. return rejectWithValue(await getResponse(err as Response));
  248. }
  249. }
  250. );
  251. export const fetchConnectorConfig = createAsyncThunk<
  252. { config: { [key: string]: unknown } },
  253. {
  254. clusterName: ClusterName;
  255. connectName: ConnectName;
  256. connectorName: ConnectorName;
  257. }
  258. >(
  259. 'connect/fetchConnectorConfig',
  260. async ({ clusterName, connectName, connectorName }, { rejectWithValue }) => {
  261. try {
  262. const config = await kafkaConnectApiClient.getConnectorConfig({
  263. clusterName,
  264. connectName,
  265. connectorName,
  266. });
  267. return { config };
  268. } catch (err) {
  269. return rejectWithValue(await getResponse(err as Response));
  270. }
  271. }
  272. );
  273. export const updateConnectorConfig = createAsyncThunk<
  274. { connector: Connector },
  275. {
  276. clusterName: ClusterName;
  277. connectName: ConnectName;
  278. connectorName: ConnectorName;
  279. connectorConfig: ConnectorConfig;
  280. }
  281. >(
  282. 'connect/updateConnectorConfig',
  283. async (
  284. { clusterName, connectName, connectorName, connectorConfig },
  285. { rejectWithValue, dispatch }
  286. ) => {
  287. try {
  288. const connector = await kafkaConnectApiClient.setConnectorConfig({
  289. clusterName,
  290. connectName,
  291. connectorName,
  292. requestBody: connectorConfig,
  293. });
  294. dispatch(fetchConnector({ clusterName, connectName, connectorName }));
  295. dispatch(
  296. showSuccessAlert({
  297. id: `connector-${connectorName}-${clusterName}`,
  298. message: 'Connector config updated.',
  299. })
  300. );
  301. return { connector };
  302. } catch (err) {
  303. return rejectWithValue(await getResponse(err as Response));
  304. }
  305. }
  306. );
  307. export const initialState: ConnectState = {
  308. connects: [],
  309. connectors: [],
  310. currentConnector: {
  311. connector: null,
  312. tasks: [],
  313. config: null,
  314. },
  315. search: '',
  316. };
  317. const connectSlice = createSlice({
  318. name: 'connect',
  319. initialState,
  320. reducers: {
  321. setConnectorStatusState: (state, { payload }) => {
  322. const { connector, tasks } = state.currentConnector;
  323. if (connector) {
  324. connector.status.state = payload.connectorState;
  325. }
  326. state.currentConnector.tasks = tasks.map((task) => ({
  327. ...task,
  328. status: {
  329. ...task.status,
  330. state: payload.taskState,
  331. },
  332. }));
  333. },
  334. },
  335. extraReducers: (builder) => {
  336. builder.addCase(fetchConnects.fulfilled, (state, { payload }) => {
  337. state.connects = payload.connects;
  338. });
  339. builder.addCase(fetchConnectors.fulfilled, (state, { payload }) => {
  340. state.connectors = payload.connectors;
  341. });
  342. builder.addCase(fetchConnector.fulfilled, (state, { payload }) => {
  343. state.currentConnector.connector = payload.connector;
  344. });
  345. builder.addCase(createConnector.fulfilled, (state, { payload }) => {
  346. state.currentConnector.connector = payload.connector;
  347. });
  348. builder.addCase(deleteConnector.fulfilled, (state, { payload }) => {
  349. state.connectors = state.connectors.filter(
  350. ({ name }) => name !== payload.connectorName
  351. );
  352. });
  353. builder.addCase(fetchConnectorTasks.fulfilled, (state, { payload }) => {
  354. state.currentConnector.tasks = payload.tasks;
  355. });
  356. builder.addCase(fetchConnectorConfig.fulfilled, (state, { payload }) => {
  357. state.currentConnector.config = payload.config;
  358. });
  359. builder.addCase(updateConnectorConfig.fulfilled, (state, { payload }) => {
  360. state.currentConnector.connector = payload.connector;
  361. state.currentConnector.config = payload.connector.config;
  362. });
  363. },
  364. });
  365. export const { setConnectorStatusState } = connectSlice.actions;
  366. export const pauseCurrentConnector = () =>
  367. setConnectorStatusState({
  368. connectorState: ConnectorState.PAUSED,
  369. taskState: ConnectorTaskStatus.PAUSED,
  370. });
  371. export const resumeCurrentConnector = () =>
  372. setConnectorStatusState({
  373. connectorState: ConnectorState.RUNNING,
  374. taskState: ConnectorTaskStatus.RUNNING,
  375. });
  376. export const pauseConnector = createAsyncThunk<
  377. undefined,
  378. {
  379. clusterName: ClusterName;
  380. connectName: ConnectName;
  381. connectorName: ConnectorName;
  382. }
  383. >(
  384. 'connect/pauseConnector',
  385. async (
  386. { clusterName, connectName, connectorName },
  387. { rejectWithValue, dispatch }
  388. ) => {
  389. try {
  390. await kafkaConnectApiClient.updateConnectorState({
  391. clusterName,
  392. connectName,
  393. connectorName,
  394. action: ConnectorAction.PAUSE,
  395. });
  396. dispatch(pauseCurrentConnector());
  397. return undefined;
  398. } catch (err) {
  399. return rejectWithValue(await getResponse(err as Response));
  400. }
  401. }
  402. );
  403. export const resumeConnector = createAsyncThunk<
  404. undefined,
  405. {
  406. clusterName: ClusterName;
  407. connectName: ConnectName;
  408. connectorName: ConnectorName;
  409. }
  410. >(
  411. 'connect/resumeConnector',
  412. async (
  413. { clusterName, connectName, connectorName },
  414. { rejectWithValue, dispatch }
  415. ) => {
  416. try {
  417. await kafkaConnectApiClient.updateConnectorState({
  418. clusterName,
  419. connectName,
  420. connectorName,
  421. action: ConnectorAction.RESUME,
  422. });
  423. dispatch(resumeCurrentConnector());
  424. return undefined;
  425. } catch (err) {
  426. return rejectWithValue(await getResponse(err as Response));
  427. }
  428. }
  429. );
  430. export const setConnectorSearch = (connectorSearch: ConnectorSearch) => {
  431. return fetchConnectors({
  432. clusterName: connectorSearch.clusterName,
  433. search: connectorSearch.search,
  434. });
  435. };
  436. export default connectSlice.reducer;