ksqlDb.ts 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import {
  2. Configuration,
  3. ExecuteKsqlCommandRequest,
  4. KsqlApi,
  5. Table as KsqlTable,
  6. } from 'generated-sources';
  7. import {
  8. PromiseThunkResult,
  9. ClusterName,
  10. FailurePayload,
  11. } from 'redux/interfaces';
  12. import { BASE_PARAMS } from 'lib/constants';
  13. import * as actions from 'redux/actions/actions';
  14. import { getResponse } from 'lib/errorHandling';
  15. const apiClientConf = new Configuration(BASE_PARAMS);
  16. export const ksqlDbApiClient = new KsqlApi(apiClientConf);
  17. export const transformKsqlResponse = (
  18. rawTable: Required<KsqlTable>
  19. ): Dictionary<string>[] =>
  20. rawTable.rows.map((row) =>
  21. row.reduce(
  22. (res, acc, index) => ({
  23. ...res,
  24. [rawTable.headers[index]]: acc,
  25. }),
  26. {} as Dictionary<string>
  27. )
  28. );
  29. const getTables = (clusterName: ClusterName) =>
  30. ksqlDbApiClient.executeKsqlCommand({
  31. clusterName,
  32. ksqlCommand: { ksql: 'SHOW TABLES;' },
  33. });
  34. const getStreams = (clusterName: ClusterName) =>
  35. ksqlDbApiClient.executeKsqlCommand({
  36. clusterName,
  37. ksqlCommand: { ksql: 'SHOW STREAMS;' },
  38. });
  39. export const fetchKsqlDbTables =
  40. (clusterName: ClusterName): PromiseThunkResult =>
  41. async (dispatch) => {
  42. dispatch(actions.fetchKsqlDbTablesAction.request());
  43. try {
  44. const tables = await getTables(clusterName);
  45. const streams = await getStreams(clusterName);
  46. dispatch(
  47. actions.fetchKsqlDbTablesAction.success({
  48. tables: tables.data ? transformKsqlResponse(tables.data) : [],
  49. streams: streams.data ? transformKsqlResponse(streams.data) : [],
  50. })
  51. );
  52. } catch (error) {
  53. const response = await getResponse(error);
  54. const alert: FailurePayload = {
  55. subject: 'ksqlDb',
  56. title: `Failed to fetch tables and streams`,
  57. response,
  58. };
  59. dispatch(actions.fetchKsqlDbTablesAction.failure({ alert }));
  60. }
  61. };
  62. export const executeKsql =
  63. (params: ExecuteKsqlCommandRequest): PromiseThunkResult =>
  64. async (dispatch) => {
  65. dispatch(actions.executeKsqlAction.request());
  66. try {
  67. const response = await ksqlDbApiClient.executeKsqlCommand(params);
  68. dispatch(actions.executeKsqlAction.success(response));
  69. } catch (error) {
  70. const response = await getResponse(error);
  71. const alert: FailurePayload = {
  72. subject: 'ksql execution',
  73. title: `Failed to execute command ${params.ksqlCommand?.ksql}`,
  74. response,
  75. };
  76. dispatch(actions.executeKsqlAction.failure({ alert }));
  77. }
  78. };