topics.ts 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import { v4 } from 'uuid';
  2. import {
  3. TopicsApi,
  4. MessagesApi,
  5. Configuration,
  6. Topic,
  7. TopicCreation,
  8. TopicUpdate,
  9. TopicConfig,
  10. } from 'generated-sources';
  11. import {
  12. PromiseThunkResult,
  13. ClusterName,
  14. TopicName,
  15. TopicMessageQueryParams,
  16. TopicFormFormattedParams,
  17. TopicFormDataRaw,
  18. TopicsState,
  19. } from 'redux/interfaces';
  20. import { BASE_PARAMS } from 'lib/constants';
  21. import * as actions from '../actions';
  22. const apiClientConf = new Configuration(BASE_PARAMS);
  23. export const topicsApiClient = new TopicsApi(apiClientConf);
  24. export const messagesApiClient = new MessagesApi(apiClientConf);
  25. export interface FetchTopicsListParams {
  26. clusterName: ClusterName;
  27. page?: number;
  28. perPage?: number;
  29. }
  30. export const fetchTopicsList = (
  31. params: FetchTopicsListParams
  32. ): PromiseThunkResult => async (dispatch, getState) => {
  33. dispatch(actions.fetchTopicsListAction.request());
  34. try {
  35. const { topics, pageCount } = await topicsApiClient.getTopics(params);
  36. const newState = (topics || []).reduce(
  37. (memo: TopicsState, topic) => ({
  38. ...memo,
  39. byName: {
  40. ...memo.byName,
  41. [topic.name]: {
  42. ...memo.byName[topic.name],
  43. ...topic,
  44. id: v4(),
  45. },
  46. },
  47. allNames: [...memo.allNames, topic.name],
  48. }),
  49. {
  50. ...getState().topics,
  51. allNames: [],
  52. totalPages: pageCount || 1,
  53. }
  54. );
  55. dispatch(actions.fetchTopicsListAction.success(newState));
  56. } catch (e) {
  57. dispatch(actions.fetchTopicsListAction.failure());
  58. }
  59. };
  60. export const fetchTopicMessages = (
  61. clusterName: ClusterName,
  62. topicName: TopicName,
  63. queryParams: Partial<TopicMessageQueryParams>
  64. ): PromiseThunkResult => async (dispatch) => {
  65. dispatch(actions.fetchTopicMessagesAction.request());
  66. try {
  67. const messages = await messagesApiClient.getTopicMessages({
  68. clusterName,
  69. topicName,
  70. ...queryParams,
  71. });
  72. dispatch(actions.fetchTopicMessagesAction.success(messages));
  73. } catch (e) {
  74. dispatch(actions.fetchTopicMessagesAction.failure());
  75. }
  76. };
  77. export const fetchTopicDetails = (
  78. clusterName: ClusterName,
  79. topicName: TopicName
  80. ): PromiseThunkResult => async (dispatch, getState) => {
  81. dispatch(actions.fetchTopicDetailsAction.request());
  82. try {
  83. const topicDetails = await topicsApiClient.getTopicDetails({
  84. clusterName,
  85. topicName,
  86. });
  87. const state = getState().topics;
  88. const newState = {
  89. ...state,
  90. byName: {
  91. ...state.byName,
  92. [topicName]: {
  93. ...state.byName[topicName],
  94. ...topicDetails,
  95. },
  96. },
  97. };
  98. dispatch(actions.fetchTopicDetailsAction.success(newState));
  99. } catch (e) {
  100. dispatch(actions.fetchTopicDetailsAction.failure());
  101. }
  102. };
  103. export const fetchTopicConfig = (
  104. clusterName: ClusterName,
  105. topicName: TopicName
  106. ): PromiseThunkResult => async (dispatch, getState) => {
  107. dispatch(actions.fetchTopicConfigAction.request());
  108. try {
  109. const config = await topicsApiClient.getTopicConfigs({
  110. clusterName,
  111. topicName,
  112. });
  113. const state = getState().topics;
  114. const newState = {
  115. ...state,
  116. byName: {
  117. ...state.byName,
  118. [topicName]: {
  119. ...state.byName[topicName],
  120. config: config.map((inputConfig) => ({
  121. ...inputConfig,
  122. })),
  123. },
  124. },
  125. };
  126. dispatch(actions.fetchTopicConfigAction.success(newState));
  127. } catch (e) {
  128. dispatch(actions.fetchTopicConfigAction.failure());
  129. }
  130. };
  131. const formatTopicCreation = (form: TopicFormDataRaw): TopicCreation => {
  132. const {
  133. name,
  134. partitions,
  135. replicationFactor,
  136. cleanupPolicy,
  137. retentionBytes,
  138. retentionMs,
  139. maxMessageBytes,
  140. minInSyncReplicas,
  141. customParams,
  142. } = form;
  143. return {
  144. name,
  145. partitions,
  146. replicationFactor,
  147. configs: {
  148. 'cleanup.policy': cleanupPolicy,
  149. 'retention.ms': retentionMs,
  150. 'retention.bytes': retentionBytes,
  151. 'max.message.bytes': maxMessageBytes,
  152. 'min.insync.replicas': minInSyncReplicas,
  153. ...Object.values(customParams || {}).reduce(
  154. (result: TopicFormFormattedParams, customParam: TopicConfig) => {
  155. return {
  156. ...result,
  157. [customParam.name]: customParam.value,
  158. };
  159. },
  160. {}
  161. ),
  162. },
  163. };
  164. };
  165. const formatTopicUpdate = (form: TopicFormDataRaw): TopicUpdate => {
  166. const {
  167. cleanupPolicy,
  168. retentionBytes,
  169. retentionMs,
  170. maxMessageBytes,
  171. minInSyncReplicas,
  172. customParams,
  173. } = form;
  174. return {
  175. configs: {
  176. 'cleanup.policy': cleanupPolicy,
  177. 'retention.ms': retentionMs,
  178. 'retention.bytes': retentionBytes,
  179. 'max.message.bytes': maxMessageBytes,
  180. 'min.insync.replicas': minInSyncReplicas,
  181. ...Object.values(customParams || {}).reduce(
  182. (result: TopicFormFormattedParams, customParam: TopicConfig) => {
  183. return {
  184. ...result,
  185. [customParam.name]: customParam.value,
  186. };
  187. },
  188. {}
  189. ),
  190. },
  191. };
  192. };
  193. export const createTopic = (
  194. clusterName: ClusterName,
  195. form: TopicFormDataRaw
  196. ): PromiseThunkResult => async (dispatch, getState) => {
  197. dispatch(actions.createTopicAction.request());
  198. try {
  199. const topic: Topic = await topicsApiClient.createTopic({
  200. clusterName,
  201. topicCreation: formatTopicCreation(form),
  202. });
  203. const state = getState().topics;
  204. const newState = {
  205. ...state,
  206. byName: {
  207. ...state.byName,
  208. [topic.name]: {
  209. ...topic,
  210. },
  211. },
  212. allNames: [...state.allNames, topic.name],
  213. };
  214. dispatch(actions.createTopicAction.success(newState));
  215. } catch (e) {
  216. dispatch(actions.createTopicAction.failure());
  217. }
  218. };
  219. export const updateTopic = (
  220. clusterName: ClusterName,
  221. topicName: TopicName,
  222. form: TopicFormDataRaw
  223. ): PromiseThunkResult => async (dispatch, getState) => {
  224. dispatch(actions.updateTopicAction.request());
  225. try {
  226. const topic: Topic = await topicsApiClient.updateTopic({
  227. clusterName,
  228. topicName,
  229. topicUpdate: formatTopicUpdate(form),
  230. });
  231. const state = getState().topics;
  232. const newState = {
  233. ...state,
  234. byName: {
  235. ...state.byName,
  236. [topic.name]: {
  237. ...state.byName[topic.name],
  238. ...topic,
  239. },
  240. },
  241. };
  242. dispatch(actions.updateTopicAction.success(newState));
  243. } catch (e) {
  244. dispatch(actions.updateTopicAction.failure());
  245. }
  246. };
  247. export const deleteTopic = (
  248. clusterName: ClusterName,
  249. topicName: TopicName
  250. ): PromiseThunkResult => async (dispatch) => {
  251. dispatch(actions.deleteTopicAction.request());
  252. try {
  253. await topicsApiClient.deleteTopic({
  254. clusterName,
  255. topicName,
  256. });
  257. dispatch(actions.deleteTopicAction.success(topicName));
  258. } catch (e) {
  259. dispatch(actions.deleteTopicAction.failure());
  260. }
  261. };