topics.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. import { v4 } from 'uuid';
  2. import {
  3. TopicsApi,
  4. MessagesApi,
  5. Configuration,
  6. Topic,
  7. TopicCreation,
  8. TopicUpdate,
  9. TopicConfig,
  10. ConsumerGroupsApi,
  11. CreateTopicMessage,
  12. GetTopicsRequest,
  13. } from 'generated-sources';
  14. import {
  15. PromiseThunkResult,
  16. ClusterName,
  17. TopicName,
  18. TopicFormFormattedParams,
  19. TopicFormDataRaw,
  20. TopicsState,
  21. FailurePayload,
  22. } from 'redux/interfaces';
  23. import { BASE_PARAMS } from 'lib/constants';
  24. import * as actions from 'redux/actions/actions';
  25. import { getResponse } from 'lib/errorHandling';
  26. const apiClientConf = new Configuration(BASE_PARAMS);
  27. export const topicsApiClient = new TopicsApi(apiClientConf);
  28. export const messagesApiClient = new MessagesApi(apiClientConf);
  29. export const topicConsumerGroupsApiClient = new ConsumerGroupsApi(
  30. apiClientConf
  31. );
  32. export const fetchTopicsList =
  33. (params: GetTopicsRequest): PromiseThunkResult =>
  34. async (dispatch, getState) => {
  35. dispatch(actions.fetchTopicsListAction.request());
  36. try {
  37. const { topics, pageCount } = await topicsApiClient.getTopics(params);
  38. const newState = (topics || []).reduce(
  39. (memo: TopicsState, topic) => ({
  40. ...memo,
  41. byName: {
  42. ...memo.byName,
  43. [topic.name]: {
  44. ...memo.byName[topic.name],
  45. ...topic,
  46. id: v4(),
  47. },
  48. },
  49. allNames: [...memo.allNames, topic.name],
  50. }),
  51. {
  52. ...getState().topics,
  53. allNames: [],
  54. totalPages: pageCount || 1,
  55. }
  56. );
  57. dispatch(actions.fetchTopicsListAction.success(newState));
  58. } catch (e) {
  59. dispatch(actions.fetchTopicsListAction.failure());
  60. }
  61. };
  62. export const clearTopicMessages =
  63. (
  64. clusterName: ClusterName,
  65. topicName: TopicName,
  66. partitions?: number[]
  67. ): PromiseThunkResult =>
  68. async (dispatch) => {
  69. dispatch(actions.clearMessagesTopicAction.request());
  70. try {
  71. await messagesApiClient.deleteTopicMessages({
  72. clusterName,
  73. topicName,
  74. partitions,
  75. });
  76. dispatch(actions.clearMessagesTopicAction.success());
  77. } catch (e) {
  78. const response = await getResponse(e);
  79. const alert: FailurePayload = {
  80. subject: [clusterName, topicName, partitions].join('-'),
  81. title: `Clear Topic Messages`,
  82. response,
  83. };
  84. dispatch(actions.clearMessagesTopicAction.failure({ alert }));
  85. }
  86. };
  87. export const fetchTopicDetails =
  88. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  89. async (dispatch, getState) => {
  90. dispatch(actions.fetchTopicDetailsAction.request());
  91. try {
  92. const topicDetails = await topicsApiClient.getTopicDetails({
  93. clusterName,
  94. topicName,
  95. });
  96. const state = getState().topics;
  97. const newState = {
  98. ...state,
  99. byName: {
  100. ...state.byName,
  101. [topicName]: {
  102. ...state.byName[topicName],
  103. ...topicDetails,
  104. },
  105. },
  106. };
  107. dispatch(actions.fetchTopicDetailsAction.success(newState));
  108. } catch (e) {
  109. dispatch(actions.fetchTopicDetailsAction.failure());
  110. }
  111. };
  112. export const fetchTopicConfig =
  113. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  114. async (dispatch, getState) => {
  115. dispatch(actions.fetchTopicConfigAction.request());
  116. try {
  117. const config = await topicsApiClient.getTopicConfigs({
  118. clusterName,
  119. topicName,
  120. });
  121. const state = getState().topics;
  122. const newState = {
  123. ...state,
  124. byName: {
  125. ...state.byName,
  126. [topicName]: {
  127. ...state.byName[topicName],
  128. config: config.map((inputConfig) => ({
  129. ...inputConfig,
  130. })),
  131. },
  132. },
  133. };
  134. dispatch(actions.fetchTopicConfigAction.success(newState));
  135. } catch (e) {
  136. dispatch(actions.fetchTopicConfigAction.failure());
  137. }
  138. };
  139. const topicReducer = (
  140. result: TopicFormFormattedParams,
  141. customParam: TopicConfig
  142. ) => {
  143. return {
  144. ...result,
  145. [customParam.name]: customParam.value,
  146. };
  147. };
  148. const formatTopicCreation = (form: TopicFormDataRaw): TopicCreation => {
  149. const {
  150. name,
  151. partitions,
  152. replicationFactor,
  153. cleanupPolicy,
  154. retentionBytes,
  155. retentionMs,
  156. maxMessageBytes,
  157. minInSyncReplicas,
  158. customParams,
  159. } = form;
  160. return {
  161. name,
  162. partitions,
  163. replicationFactor,
  164. configs: {
  165. 'cleanup.policy': cleanupPolicy,
  166. 'retention.ms': retentionMs,
  167. 'retention.bytes': retentionBytes,
  168. 'max.message.bytes': maxMessageBytes,
  169. 'min.insync.replicas': minInSyncReplicas,
  170. ...Object.values(customParams || {}).reduce(topicReducer, {}),
  171. },
  172. };
  173. };
  174. const formatTopicUpdate = (form: TopicFormDataRaw): TopicUpdate => {
  175. const {
  176. cleanupPolicy,
  177. retentionBytes,
  178. retentionMs,
  179. maxMessageBytes,
  180. minInSyncReplicas,
  181. customParams,
  182. } = form;
  183. return {
  184. configs: {
  185. 'cleanup.policy': cleanupPolicy,
  186. 'retention.ms': retentionMs,
  187. 'retention.bytes': retentionBytes,
  188. 'max.message.bytes': maxMessageBytes,
  189. 'min.insync.replicas': minInSyncReplicas,
  190. ...Object.values(customParams || {}).reduce(topicReducer, {}),
  191. },
  192. };
  193. };
  194. export const createTopic =
  195. (clusterName: ClusterName, form: TopicFormDataRaw): PromiseThunkResult =>
  196. async (dispatch, getState) => {
  197. dispatch(actions.createTopicAction.request());
  198. try {
  199. const topic: Topic = await topicsApiClient.createTopic({
  200. clusterName,
  201. topicCreation: formatTopicCreation(form),
  202. });
  203. const state = getState().topics;
  204. const newState = {
  205. ...state,
  206. byName: {
  207. ...state.byName,
  208. [topic.name]: {
  209. ...topic,
  210. },
  211. },
  212. allNames: [...state.allNames, topic.name],
  213. };
  214. dispatch(actions.createTopicAction.success(newState));
  215. } catch (error) {
  216. const response = await getResponse(error);
  217. const alert: FailurePayload = {
  218. subject: ['schema', form.name].join('-'),
  219. title: `Schema ${form.name}`,
  220. response,
  221. };
  222. dispatch(actions.createTopicAction.failure({ alert }));
  223. }
  224. };
  225. export const updateTopic =
  226. (
  227. clusterName: ClusterName,
  228. topicName: TopicName,
  229. form: TopicFormDataRaw
  230. ): PromiseThunkResult =>
  231. async (dispatch, getState) => {
  232. dispatch(actions.updateTopicAction.request());
  233. try {
  234. const topic: Topic = await topicsApiClient.updateTopic({
  235. clusterName,
  236. topicName,
  237. topicUpdate: formatTopicUpdate(form),
  238. });
  239. const state = getState().topics;
  240. const newState = {
  241. ...state,
  242. byName: {
  243. ...state.byName,
  244. [topic.name]: {
  245. ...state.byName[topic.name],
  246. ...topic,
  247. },
  248. },
  249. };
  250. dispatch(actions.updateTopicAction.success(newState));
  251. } catch (e) {
  252. dispatch(actions.updateTopicAction.failure());
  253. }
  254. };
  255. export const deleteTopic =
  256. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  257. async (dispatch) => {
  258. dispatch(actions.deleteTopicAction.request());
  259. try {
  260. await topicsApiClient.deleteTopic({
  261. clusterName,
  262. topicName,
  263. });
  264. dispatch(actions.deleteTopicAction.success(topicName));
  265. } catch (e) {
  266. dispatch(actions.deleteTopicAction.failure());
  267. }
  268. };
  269. export const fetchTopicConsumerGroups =
  270. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  271. async (dispatch, getState) => {
  272. dispatch(actions.fetchTopicConsumerGroupsAction.request());
  273. try {
  274. const consumerGroups =
  275. await topicConsumerGroupsApiClient.getTopicConsumerGroups({
  276. clusterName,
  277. topicName,
  278. });
  279. const state = getState().topics;
  280. const newState = {
  281. ...state,
  282. byName: {
  283. ...state.byName,
  284. [topicName]: {
  285. ...state.byName[topicName],
  286. consumerGroups,
  287. },
  288. },
  289. };
  290. dispatch(actions.fetchTopicConsumerGroupsAction.success(newState));
  291. } catch (e) {
  292. dispatch(actions.fetchTopicConsumerGroupsAction.failure());
  293. }
  294. };
  295. export const fetchTopicMessageSchema =
  296. (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
  297. async (dispatch) => {
  298. dispatch(actions.fetchTopicMessageSchemaAction.request());
  299. try {
  300. const schema = await messagesApiClient.getTopicSchema({
  301. clusterName,
  302. topicName,
  303. });
  304. dispatch(
  305. actions.fetchTopicMessageSchemaAction.success({ topicName, schema })
  306. );
  307. } catch (e) {
  308. const response = await getResponse(e);
  309. const alert: FailurePayload = {
  310. subject: ['topic', topicName].join('-'),
  311. title: `Topic Schema ${topicName}`,
  312. response,
  313. };
  314. dispatch(actions.fetchTopicMessageSchemaAction.failure({ alert }));
  315. }
  316. };
  317. export const sendTopicMessage =
  318. (
  319. clusterName: ClusterName,
  320. topicName: TopicName,
  321. payload: CreateTopicMessage
  322. ): PromiseThunkResult =>
  323. async (dispatch) => {
  324. dispatch(actions.sendTopicMessageAction.request());
  325. try {
  326. await messagesApiClient.sendTopicMessages({
  327. clusterName,
  328. topicName,
  329. createTopicMessage: {
  330. key: payload.key,
  331. content: payload.content,
  332. headers: payload.headers,
  333. partition: payload.partition,
  334. },
  335. });
  336. dispatch(actions.sendTopicMessageAction.success());
  337. } catch (e) {
  338. const response = await getResponse(e);
  339. const alert: FailurePayload = {
  340. subject: ['topic', topicName].join('-'),
  341. title: `Topic Message ${topicName}`,
  342. response,
  343. };
  344. dispatch(actions.sendTopicMessageAction.failure({ alert }));
  345. }
  346. };
  347. export const updateTopicPartitionsCount =
  348. (
  349. clusterName: ClusterName,
  350. topicName: TopicName,
  351. partitions: number
  352. ): PromiseThunkResult =>
  353. async (dispatch) => {
  354. dispatch(actions.updateTopicPartitionsCountAction.request());
  355. try {
  356. await topicsApiClient.increaseTopicPartitions({
  357. clusterName,
  358. topicName,
  359. partitionsIncrease: { totalPartitionsCount: partitions },
  360. });
  361. dispatch(actions.updateTopicPartitionsCountAction.success());
  362. } catch (error) {
  363. const response = await getResponse(error);
  364. const alert: FailurePayload = {
  365. subject: ['topic-partitions', topicName].join('-'),
  366. title: `Topic ${topicName} partitions count increase failed`,
  367. response,
  368. };
  369. dispatch(actions.updateTopicPartitionsCountAction.failure({ alert }));
  370. }
  371. };
  372. export const updateTopicReplicationFactor =
  373. (
  374. clusterName: ClusterName,
  375. topicName: TopicName,
  376. replicationFactor: number
  377. ): PromiseThunkResult =>
  378. async (dispatch) => {
  379. dispatch(actions.updateTopicReplicationFactorAction.request());
  380. try {
  381. await topicsApiClient.changeReplicationFactor({
  382. clusterName,
  383. topicName,
  384. replicationFactorChange: { totalReplicationFactor: replicationFactor },
  385. });
  386. dispatch(actions.updateTopicReplicationFactorAction.success());
  387. } catch (error) {
  388. const response = await getResponse(error);
  389. const alert: FailurePayload = {
  390. subject: ['topic-replication-factor', topicName].join('-'),
  391. title: `Topic ${topicName} replication factor change failed`,
  392. response,
  393. };
  394. dispatch(actions.updateTopicReplicationFactorAction.failure({ alert }));
  395. }
  396. };