topics.ts 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. import { v4 } from 'uuid';
  2. import {
  3. TopicsApi,
  4. MessagesApi,
  5. Configuration,
  6. Topic,
  7. TopicCreation,
  8. TopicUpdate,
  9. TopicConfig,
  10. ConsumerGroupsApi,
  11. GetTopicsRequest,
  12. } from 'generated-sources';
  13. import {
  14. PromiseThunkResult,
  15. ClusterName,
  16. TopicName,
  17. TopicFormFormattedParams,
  18. TopicFormDataRaw,
  19. TopicsState,
  20. FailurePayload,
  21. TopicFormData,
  22. AppDispatch,
  23. } from 'redux/interfaces';
  24. import { clearTopicMessages } from 'redux/reducers/topicMessages/topicMessagesSlice';
  25. import { BASE_PARAMS } from 'lib/constants';
  26. import * as actions from 'redux/actions/actions';
  27. import { getResponse } from 'lib/errorHandling';
  28. import { showSuccessAlert } from 'redux/reducers/alerts/alertsSlice';
  29. const apiClientConf = new Configuration(BASE_PARAMS);
  30. export const topicsApiClient = new TopicsApi(apiClientConf);
  31. export const messagesApiClient = new MessagesApi(apiClientConf);
  32. export const topicConsumerGroupsApiClient = new ConsumerGroupsApi(
  33. apiClientConf
  34. );
  35. export const fetchTopicsList =
  36. (params: GetTopicsRequest): PromiseThunkResult =>
  37. async (dispatch, getState) => {
  38. dispatch(actions.fetchTopicsListAction.request());
  39. try {
  40. const { topics, pageCount } = await topicsApiClient.getTopics(params);
  41. const newState = (topics || []).reduce(
  42. (memo: TopicsState, topic) => ({
  43. ...memo,
  44. byName: {
  45. ...memo.byName,
  46. [topic.name]: {
  47. ...memo.byName[topic.name],
  48. ...topic,
  49. id: v4(),
  50. },
  51. },
  52. allNames: [...memo.allNames, topic.name],
  53. }),
  54. {
  55. ...getState().topics,
  56. allNames: [],
  57. totalPages: pageCount || 1,
  58. }
  59. );
  60. dispatch(actions.fetchTopicsListAction.success(newState));
  61. } catch (e) {
  62. dispatch(actions.fetchTopicsListAction.failure());
  63. }
  64. };
  65. export const clearTopicsMessages =
  66. (clusterName: ClusterName, topicsName: TopicName[]): PromiseThunkResult =>
  67. async (dispatch) => {
  68. topicsName.forEach((topicName) => {
  69. dispatch(clearTopicMessages({ clusterName, topicName }));
  70. });
  71. };
  72. export const fetchTopicDetails =
  73. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  74. async (dispatch, getState) => {
  75. dispatch(actions.fetchTopicDetailsAction.request());
  76. try {
  77. const topicDetails = await topicsApiClient.getTopicDetails({
  78. clusterName,
  79. topicName,
  80. });
  81. const state = getState().topics;
  82. const newState = {
  83. ...state,
  84. byName: {
  85. ...state.byName,
  86. [topicName]: {
  87. ...state.byName[topicName],
  88. ...topicDetails,
  89. },
  90. },
  91. };
  92. dispatch(actions.fetchTopicDetailsAction.success(newState));
  93. } catch (e) {
  94. dispatch(actions.fetchTopicDetailsAction.failure());
  95. }
  96. };
  97. export const fetchTopicConfig =
  98. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  99. async (dispatch, getState) => {
  100. dispatch(actions.fetchTopicConfigAction.request());
  101. try {
  102. const config = await topicsApiClient.getTopicConfigs({
  103. clusterName,
  104. topicName,
  105. });
  106. const state = getState().topics;
  107. const newState = {
  108. ...state,
  109. byName: {
  110. ...state.byName,
  111. [topicName]: {
  112. ...state.byName[topicName],
  113. config: config.map((inputConfig) => ({
  114. ...inputConfig,
  115. })),
  116. },
  117. },
  118. };
  119. dispatch(actions.fetchTopicConfigAction.success(newState));
  120. } catch (e) {
  121. dispatch(actions.fetchTopicConfigAction.failure());
  122. }
  123. };
  124. const topicReducer = (
  125. result: TopicFormFormattedParams,
  126. customParam: TopicConfig
  127. ) => {
  128. return {
  129. ...result,
  130. [customParam.name]: customParam.value,
  131. };
  132. };
  133. export const formatTopicCreation = (form: TopicFormData): TopicCreation => {
  134. const {
  135. name,
  136. partitions,
  137. replicationFactor,
  138. cleanupPolicy,
  139. retentionBytes,
  140. retentionMs,
  141. maxMessageBytes,
  142. minInsyncReplicas,
  143. customParams,
  144. } = form;
  145. return {
  146. name,
  147. partitions,
  148. replicationFactor,
  149. configs: {
  150. 'cleanup.policy': cleanupPolicy,
  151. 'retention.ms': retentionMs.toString(),
  152. 'retention.bytes': retentionBytes.toString(),
  153. 'max.message.bytes': maxMessageBytes.toString(),
  154. 'min.insync.replicas': minInsyncReplicas.toString(),
  155. ...Object.values(customParams || {}).reduce(topicReducer, {}),
  156. },
  157. };
  158. };
  159. const formatTopicUpdate = (form: TopicFormDataRaw): TopicUpdate => {
  160. const {
  161. cleanupPolicy,
  162. retentionBytes,
  163. retentionMs,
  164. maxMessageBytes,
  165. minInsyncReplicas,
  166. customParams,
  167. } = form;
  168. return {
  169. configs: {
  170. 'cleanup.policy': cleanupPolicy,
  171. 'retention.ms': retentionMs,
  172. 'retention.bytes': retentionBytes,
  173. 'max.message.bytes': maxMessageBytes,
  174. 'min.insync.replicas': minInsyncReplicas,
  175. ...Object.values(customParams || {}).reduce(topicReducer, {}),
  176. },
  177. };
  178. };
  179. export const updateTopic =
  180. (
  181. clusterName: ClusterName,
  182. topicName: TopicName,
  183. form: TopicFormDataRaw
  184. ): PromiseThunkResult =>
  185. async (dispatch, getState) => {
  186. dispatch(actions.updateTopicAction.request());
  187. try {
  188. const topic: Topic = await topicsApiClient.updateTopic({
  189. clusterName,
  190. topicName,
  191. topicUpdate: formatTopicUpdate(form),
  192. });
  193. const state = getState().topics;
  194. const newState = {
  195. ...state,
  196. byName: {
  197. ...state.byName,
  198. [topic.name]: {
  199. ...state.byName[topic.name],
  200. ...topic,
  201. },
  202. },
  203. };
  204. dispatch(actions.updateTopicAction.success(newState));
  205. } catch (e) {
  206. dispatch(actions.updateTopicAction.failure());
  207. }
  208. };
  209. export const deleteTopic =
  210. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  211. async (dispatch) => {
  212. dispatch(actions.deleteTopicAction.request());
  213. try {
  214. await topicsApiClient.deleteTopic({
  215. clusterName,
  216. topicName,
  217. });
  218. dispatch(actions.deleteTopicAction.success(topicName));
  219. } catch (e) {
  220. dispatch(actions.deleteTopicAction.failure());
  221. }
  222. };
  223. export const recreateTopic =
  224. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  225. async (dispatch) => {
  226. dispatch(actions.recreateTopicAction.request());
  227. try {
  228. const topic = await topicsApiClient.recreateTopic({
  229. clusterName,
  230. topicName,
  231. });
  232. dispatch(actions.recreateTopicAction.success(topic));
  233. (dispatch as AppDispatch)(
  234. showSuccessAlert({
  235. id: topicName,
  236. message: 'Topic successfully recreated!',
  237. })
  238. );
  239. } catch (e) {
  240. dispatch(actions.recreateTopicAction.failure());
  241. }
  242. };
  243. export const deleteTopics =
  244. (clusterName: ClusterName, topicsName: TopicName[]): PromiseThunkResult =>
  245. async (dispatch) => {
  246. topicsName.forEach((topicName) => {
  247. dispatch(deleteTopic(clusterName, topicName));
  248. });
  249. };
  250. export const fetchTopicConsumerGroups =
  251. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  252. async (dispatch, getState) => {
  253. dispatch(actions.fetchTopicConsumerGroupsAction.request());
  254. try {
  255. const consumerGroups =
  256. await topicConsumerGroupsApiClient.getTopicConsumerGroups({
  257. clusterName,
  258. topicName,
  259. });
  260. const state = getState().topics;
  261. const newState = {
  262. ...state,
  263. byName: {
  264. ...state.byName,
  265. [topicName]: {
  266. ...state.byName[topicName],
  267. consumerGroups,
  268. },
  269. },
  270. };
  271. dispatch(actions.fetchTopicConsumerGroupsAction.success(newState));
  272. } catch (e) {
  273. dispatch(actions.fetchTopicConsumerGroupsAction.failure());
  274. }
  275. };
  276. export const fetchTopicMessageSchema =
  277. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  278. async (dispatch) => {
  279. dispatch(actions.fetchTopicMessageSchemaAction.request());
  280. try {
  281. const schema = await messagesApiClient.getTopicSchema({
  282. clusterName,
  283. topicName,
  284. });
  285. dispatch(
  286. actions.fetchTopicMessageSchemaAction.success({ topicName, schema })
  287. );
  288. } catch (e) {
  289. const response = await getResponse(e);
  290. const alert: FailurePayload = {
  291. subject: ['topic', topicName].join('-'),
  292. title: `Topic Schema ${topicName}`,
  293. response,
  294. };
  295. dispatch(actions.fetchTopicMessageSchemaAction.failure({ alert }));
  296. }
  297. };
  298. export const updateTopicPartitionsCount =
  299. (
  300. clusterName: ClusterName,
  301. topicName: TopicName,
  302. partitions: number
  303. ): PromiseThunkResult =>
  304. async (dispatch) => {
  305. dispatch(actions.updateTopicPartitionsCountAction.request());
  306. try {
  307. await topicsApiClient.increaseTopicPartitions({
  308. clusterName,
  309. topicName,
  310. partitionsIncrease: { totalPartitionsCount: partitions },
  311. });
  312. dispatch(actions.updateTopicPartitionsCountAction.success());
  313. } catch (error) {
  314. const response = await getResponse(error);
  315. const alert: FailurePayload = {
  316. subject: ['topic-partitions', topicName].join('-'),
  317. title: `Topic ${topicName} partitions count increase failed`,
  318. response,
  319. };
  320. dispatch(actions.updateTopicPartitionsCountAction.failure({ alert }));
  321. }
  322. };
  323. export const updateTopicReplicationFactor =
  324. (
  325. clusterName: ClusterName,
  326. topicName: TopicName,
  327. replicationFactor: number
  328. ): PromiseThunkResult =>
  329. async (dispatch) => {
  330. dispatch(actions.updateTopicReplicationFactorAction.request());
  331. try {
  332. await topicsApiClient.changeReplicationFactor({
  333. clusterName,
  334. topicName,
  335. replicationFactorChange: { totalReplicationFactor: replicationFactor },
  336. });
  337. dispatch(actions.updateTopicReplicationFactorAction.success());
  338. } catch (error) {
  339. const response = await getResponse(error);
  340. const alert: FailurePayload = {
  341. subject: ['topic-replication-factor', topicName].join('-'),
  342. title: `Topic ${topicName} replication factor change failed`,
  343. response,
  344. };
  345. dispatch(actions.updateTopicReplicationFactorAction.failure({ alert }));
  346. }
  347. };