topics.ts 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. import { v4 } from 'uuid';
  2. import {
  3. TopicsApi,
  4. MessagesApi,
  5. Configuration,
  6. Topic,
  7. TopicCreation,
  8. TopicUpdate,
  9. TopicConfig,
  10. } from 'generated-sources';
  11. import {
  12. PromiseThunkResult,
  13. ClusterName,
  14. TopicName,
  15. TopicMessageQueryParams,
  16. TopicFormFormattedParams,
  17. TopicFormDataRaw,
  18. TopicsState,
  19. FailurePayload,
  20. } from 'redux/interfaces';
  21. import { BASE_PARAMS } from 'lib/constants';
  22. import * as actions from 'redux/actions/actions';
  23. import { getResponse } from 'lib/errorHandling';
  24. const apiClientConf = new Configuration(BASE_PARAMS);
  25. export const topicsApiClient = new TopicsApi(apiClientConf);
  26. export const messagesApiClient = new MessagesApi(apiClientConf);
  27. export interface FetchTopicsListParams {
  28. clusterName: ClusterName;
  29. page?: number;
  30. perPage?: number;
  31. }
  32. export const fetchTopicsList = (
  33. params: FetchTopicsListParams
  34. ): PromiseThunkResult => async (dispatch, getState) => {
  35. dispatch(actions.fetchTopicsListAction.request());
  36. try {
  37. const { topics, pageCount } = await topicsApiClient.getTopics(params);
  38. const newState = (topics || []).reduce(
  39. (memo: TopicsState, topic) => ({
  40. ...memo,
  41. byName: {
  42. ...memo.byName,
  43. [topic.name]: {
  44. ...memo.byName[topic.name],
  45. ...topic,
  46. id: v4(),
  47. },
  48. },
  49. allNames: [...memo.allNames, topic.name],
  50. }),
  51. {
  52. ...getState().topics,
  53. allNames: [],
  54. totalPages: pageCount || 1,
  55. }
  56. );
  57. dispatch(actions.fetchTopicsListAction.success(newState));
  58. } catch (e) {
  59. dispatch(actions.fetchTopicsListAction.failure());
  60. }
  61. };
  62. export const fetchTopicMessages = (
  63. clusterName: ClusterName,
  64. topicName: TopicName,
  65. queryParams: Partial<TopicMessageQueryParams>
  66. ): PromiseThunkResult => async (dispatch) => {
  67. dispatch(actions.fetchTopicMessagesAction.request());
  68. try {
  69. const messages = await messagesApiClient.getTopicMessages({
  70. clusterName,
  71. topicName,
  72. ...queryParams,
  73. });
  74. dispatch(actions.fetchTopicMessagesAction.success(messages));
  75. } catch (e) {
  76. dispatch(actions.fetchTopicMessagesAction.failure());
  77. }
  78. };
  79. export const fetchTopicDetails = (
  80. clusterName: ClusterName,
  81. topicName: TopicName
  82. ): PromiseThunkResult => async (dispatch, getState) => {
  83. dispatch(actions.fetchTopicDetailsAction.request());
  84. try {
  85. const topicDetails = await topicsApiClient.getTopicDetails({
  86. clusterName,
  87. topicName,
  88. });
  89. const state = getState().topics;
  90. const newState = {
  91. ...state,
  92. byName: {
  93. ...state.byName,
  94. [topicName]: {
  95. ...state.byName[topicName],
  96. ...topicDetails,
  97. },
  98. },
  99. };
  100. dispatch(actions.fetchTopicDetailsAction.success(newState));
  101. } catch (e) {
  102. dispatch(actions.fetchTopicDetailsAction.failure());
  103. }
  104. };
  105. export const fetchTopicConfig = (
  106. clusterName: ClusterName,
  107. topicName: TopicName
  108. ): PromiseThunkResult => async (dispatch, getState) => {
  109. dispatch(actions.fetchTopicConfigAction.request());
  110. try {
  111. const config = await topicsApiClient.getTopicConfigs({
  112. clusterName,
  113. topicName,
  114. });
  115. const state = getState().topics;
  116. const newState = {
  117. ...state,
  118. byName: {
  119. ...state.byName,
  120. [topicName]: {
  121. ...state.byName[topicName],
  122. config: config.map((inputConfig) => ({
  123. ...inputConfig,
  124. })),
  125. },
  126. },
  127. };
  128. dispatch(actions.fetchTopicConfigAction.success(newState));
  129. } catch (e) {
  130. dispatch(actions.fetchTopicConfigAction.failure());
  131. }
  132. };
  133. const formatTopicCreation = (form: TopicFormDataRaw): TopicCreation => {
  134. const {
  135. name,
  136. partitions,
  137. replicationFactor,
  138. cleanupPolicy,
  139. retentionBytes,
  140. retentionMs,
  141. maxMessageBytes,
  142. minInSyncReplicas,
  143. customParams,
  144. } = form;
  145. return {
  146. name,
  147. partitions,
  148. replicationFactor,
  149. configs: {
  150. 'cleanup.policy': cleanupPolicy,
  151. 'retention.ms': retentionMs,
  152. 'retention.bytes': retentionBytes,
  153. 'max.message.bytes': maxMessageBytes,
  154. 'min.insync.replicas': minInSyncReplicas,
  155. ...Object.values(customParams || {}).reduce(
  156. (result: TopicFormFormattedParams, customParam: TopicConfig) => {
  157. return {
  158. ...result,
  159. [customParam.name]: customParam.value,
  160. };
  161. },
  162. {}
  163. ),
  164. },
  165. };
  166. };
  167. const formatTopicUpdate = (form: TopicFormDataRaw): TopicUpdate => {
  168. const {
  169. cleanupPolicy,
  170. retentionBytes,
  171. retentionMs,
  172. maxMessageBytes,
  173. minInSyncReplicas,
  174. customParams,
  175. } = form;
  176. return {
  177. configs: {
  178. 'cleanup.policy': cleanupPolicy,
  179. 'retention.ms': retentionMs,
  180. 'retention.bytes': retentionBytes,
  181. 'max.message.bytes': maxMessageBytes,
  182. 'min.insync.replicas': minInSyncReplicas,
  183. ...Object.values(customParams || {}).reduce(
  184. (result: TopicFormFormattedParams, customParam: TopicConfig) => {
  185. return {
  186. ...result,
  187. [customParam.name]: customParam.value,
  188. };
  189. },
  190. {}
  191. ),
  192. },
  193. };
  194. };
  195. export const createTopic = (
  196. clusterName: ClusterName,
  197. form: TopicFormDataRaw
  198. ): PromiseThunkResult => async (dispatch, getState) => {
  199. dispatch(actions.createTopicAction.request());
  200. try {
  201. const topic: Topic = await topicsApiClient.createTopic({
  202. clusterName,
  203. topicCreation: formatTopicCreation(form),
  204. });
  205. const state = getState().topics;
  206. const newState = {
  207. ...state,
  208. byName: {
  209. ...state.byName,
  210. [topic.name]: {
  211. ...topic,
  212. },
  213. },
  214. allNames: [...state.allNames, topic.name],
  215. };
  216. dispatch(actions.createTopicAction.success(newState));
  217. } catch (error) {
  218. const response = await getResponse(error);
  219. const alert: FailurePayload = {
  220. subjectId: form.name,
  221. subject: 'schema',
  222. title: `Schema ${form.name}`,
  223. response,
  224. };
  225. dispatch(actions.createTopicAction.failure({ alert }));
  226. }
  227. };
  228. export const updateTopic = (
  229. clusterName: ClusterName,
  230. topicName: TopicName,
  231. form: TopicFormDataRaw
  232. ): PromiseThunkResult => async (dispatch, getState) => {
  233. dispatch(actions.updateTopicAction.request());
  234. try {
  235. const topic: Topic = await topicsApiClient.updateTopic({
  236. clusterName,
  237. topicName,
  238. topicUpdate: formatTopicUpdate(form),
  239. });
  240. const state = getState().topics;
  241. const newState = {
  242. ...state,
  243. byName: {
  244. ...state.byName,
  245. [topic.name]: {
  246. ...state.byName[topic.name],
  247. ...topic,
  248. },
  249. },
  250. };
  251. dispatch(actions.updateTopicAction.success(newState));
  252. } catch (e) {
  253. dispatch(actions.updateTopicAction.failure());
  254. }
  255. };
  256. export const deleteTopic = (
  257. clusterName: ClusterName,
  258. topicName: TopicName
  259. ): PromiseThunkResult => async (dispatch) => {
  260. dispatch(actions.deleteTopicAction.request());
  261. try {
  262. await topicsApiClient.deleteTopic({
  263. clusterName,
  264. topicName,
  265. });
  266. dispatch(actions.deleteTopicAction.success(topicName));
  267. } catch (e) {
  268. dispatch(actions.deleteTopicAction.failure());
  269. }
  270. };