topics.ts 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. import {
  2. topicsApiClient as api,
  3. messagesApiClient as messagesApi,
  4. consumerGroupsApiClient,
  5. messagesApiClient,
  6. } from 'lib/api';
  7. import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
  8. import {
  9. ClusterName,
  10. TopicFormData,
  11. TopicFormDataRaw,
  12. TopicFormFormattedParams,
  13. } from 'redux/interfaces';
  14. import {
  15. CreateTopicMessage,
  16. GetTopicDetailsRequest,
  17. GetTopicsRequest,
  18. Topic,
  19. TopicConfig,
  20. TopicCreation,
  21. TopicUpdate,
  22. } from 'generated-sources';
  23. import { showServerError, showSuccessAlert } from 'lib/errorHandling';
  24. export const topicKeys = {
  25. all: (clusterName: ClusterName) =>
  26. ['clusters', clusterName, 'topics'] as const,
  27. list: (
  28. clusterName: ClusterName,
  29. filters: Omit<GetTopicsRequest, 'clusterName'>
  30. ) => [...topicKeys.all(clusterName), filters] as const,
  31. details: ({ clusterName, topicName }: GetTopicDetailsRequest) =>
  32. [...topicKeys.all(clusterName), topicName] as const,
  33. config: (props: GetTopicDetailsRequest) =>
  34. [...topicKeys.details(props), 'config'] as const,
  35. schema: (props: GetTopicDetailsRequest) =>
  36. [...topicKeys.details(props), 'schema'] as const,
  37. consumerGroups: (props: GetTopicDetailsRequest) =>
  38. [...topicKeys.details(props), 'consumerGroups'] as const,
  39. statistics: (props: GetTopicDetailsRequest) =>
  40. [...topicKeys.details(props), 'statistics'] as const,
  41. };
  42. export function useTopics(props: GetTopicsRequest) {
  43. const { clusterName, ...filters } = props;
  44. return useQuery(
  45. topicKeys.list(clusterName, filters),
  46. () => api.getTopics(props),
  47. { keepPreviousData: true }
  48. );
  49. }
  50. export function useTopicDetails(props: GetTopicDetailsRequest) {
  51. return useQuery(topicKeys.details(props), () => api.getTopicDetails(props));
  52. }
  53. export function useTopicConfig(props: GetTopicDetailsRequest) {
  54. return useQuery(topicKeys.config(props), () => api.getTopicConfigs(props));
  55. }
  56. export function useTopicConsumerGroups(props: GetTopicDetailsRequest) {
  57. return useQuery(topicKeys.consumerGroups(props), () =>
  58. consumerGroupsApiClient.getTopicConsumerGroups(props)
  59. );
  60. }
  61. const topicReducer = (
  62. result: TopicFormFormattedParams,
  63. customParam: TopicConfig
  64. ) => {
  65. return {
  66. ...result,
  67. [customParam.name]: customParam.value,
  68. };
  69. };
  70. const formatTopicCreation = (form: TopicFormData): TopicCreation => {
  71. const {
  72. name,
  73. partitions,
  74. replicationFactor,
  75. cleanupPolicy,
  76. retentionMs,
  77. maxMessageBytes,
  78. minInSyncReplicas,
  79. customParams,
  80. } = form;
  81. const configs = {
  82. 'cleanup.policy': cleanupPolicy,
  83. 'retention.ms': retentionMs.toString(),
  84. 'max.message.bytes': maxMessageBytes.toString(),
  85. 'min.insync.replicas': minInSyncReplicas.toString(),
  86. ...Object.values(customParams || {}).reduce(topicReducer, {}),
  87. };
  88. const cleanConfigs = () => {
  89. return Object.fromEntries(
  90. Object.entries(configs).filter(([, val]) => val !== '')
  91. );
  92. };
  93. const topicsvalue = {
  94. name,
  95. partitions,
  96. configs: cleanConfigs(),
  97. };
  98. return replicationFactor.toString() !== ''
  99. ? {
  100. ...topicsvalue,
  101. replicationFactor,
  102. }
  103. : topicsvalue;
  104. };
  105. export function useCreateTopicMutation(clusterName: ClusterName) {
  106. const client = useQueryClient();
  107. return useMutation(
  108. (data: TopicFormData) =>
  109. api.createTopic({
  110. clusterName,
  111. topicCreation: formatTopicCreation(data),
  112. }),
  113. {
  114. onSuccess: () => {
  115. client.invalidateQueries(topicKeys.all(clusterName));
  116. },
  117. }
  118. );
  119. }
  120. // this will change later when we validate the request before
  121. export function useCreateTopic(clusterName: ClusterName) {
  122. const mutate = useCreateTopicMutation(clusterName);
  123. return {
  124. createResource: async (param: TopicFormData) => {
  125. return mutate.mutateAsync(param);
  126. },
  127. ...mutate,
  128. };
  129. }
  130. const formatTopicUpdate = (form: TopicFormDataRaw): TopicUpdate => {
  131. const {
  132. cleanupPolicy,
  133. retentionBytes,
  134. retentionMs,
  135. maxMessageBytes,
  136. minInSyncReplicas,
  137. customParams,
  138. } = form;
  139. return {
  140. configs: {
  141. ...Object.values(customParams || {}).reduce(topicReducer, {}),
  142. 'cleanup.policy': cleanupPolicy,
  143. 'retention.ms': retentionMs,
  144. 'retention.bytes': retentionBytes,
  145. 'max.message.bytes': maxMessageBytes,
  146. 'min.insync.replicas': minInSyncReplicas,
  147. },
  148. };
  149. };
  150. export function useUpdateTopic(props: GetTopicDetailsRequest) {
  151. const client = useQueryClient();
  152. return useMutation(
  153. (data: TopicFormDataRaw) => {
  154. return api.updateTopic({
  155. ...props,
  156. topicUpdate: formatTopicUpdate(data),
  157. });
  158. },
  159. {
  160. onSuccess: () => {
  161. showSuccessAlert({
  162. message: `Topic successfully updated.`,
  163. });
  164. client.invalidateQueries(topicKeys.all(props.clusterName));
  165. },
  166. }
  167. );
  168. }
  169. export function useIncreaseTopicPartitionsCount(props: GetTopicDetailsRequest) {
  170. const client = useQueryClient();
  171. return useMutation(
  172. (totalPartitionsCount: number) =>
  173. api.increaseTopicPartitions({
  174. ...props,
  175. partitionsIncrease: { totalPartitionsCount },
  176. }),
  177. {
  178. onSuccess: () => {
  179. showSuccessAlert({
  180. message: `Number of partitions successfully increased`,
  181. });
  182. client.invalidateQueries(topicKeys.all(props.clusterName));
  183. },
  184. }
  185. );
  186. }
  187. export function useUpdateTopicReplicationFactor(props: GetTopicDetailsRequest) {
  188. const client = useQueryClient();
  189. return useMutation(
  190. (totalReplicationFactor: number) =>
  191. api.changeReplicationFactor({
  192. ...props,
  193. replicationFactorChange: { totalReplicationFactor },
  194. }),
  195. {
  196. onSuccess: () => {
  197. showSuccessAlert({
  198. message: `Replication factor successfully updated`,
  199. });
  200. client.invalidateQueries(topicKeys.all(props.clusterName));
  201. },
  202. }
  203. );
  204. }
  205. export function useDeleteTopic(clusterName: ClusterName) {
  206. const client = useQueryClient();
  207. return useMutation(
  208. (topicName: Topic['name']) => api.deleteTopic({ clusterName, topicName }),
  209. {
  210. onSuccess: (_, topicName) => {
  211. showSuccessAlert({
  212. message: `Topic ${topicName} successfully deleted!`,
  213. });
  214. client.invalidateQueries(topicKeys.all(clusterName));
  215. },
  216. }
  217. );
  218. }
  219. export function useClearTopicMessages(
  220. clusterName: ClusterName,
  221. partitions?: number[]
  222. ) {
  223. const client = useQueryClient();
  224. return useMutation(
  225. async (topicName: Topic['name']) => {
  226. await messagesApiClient.deleteTopicMessages({
  227. clusterName,
  228. partitions,
  229. topicName,
  230. });
  231. return topicName;
  232. },
  233. {
  234. onSuccess: (topicName) => {
  235. showSuccessAlert({
  236. id: `message-${topicName}-${clusterName}-${partitions}`,
  237. message: `${topicName} messages have been successfully cleared!`,
  238. });
  239. client.invalidateQueries(topicKeys.all(clusterName));
  240. },
  241. }
  242. );
  243. }
  244. export function useRecreateTopic(props: GetTopicDetailsRequest) {
  245. const client = useQueryClient();
  246. return useMutation(() => api.recreateTopic(props), {
  247. onSuccess: () => {
  248. showSuccessAlert({
  249. message: `Topic ${props.topicName} successfully recreated!`,
  250. });
  251. client.invalidateQueries(topicKeys.all(props.clusterName));
  252. },
  253. });
  254. }
  255. export function useSendMessage(props: GetTopicDetailsRequest) {
  256. const client = useQueryClient();
  257. return useMutation(
  258. (message: CreateTopicMessage) =>
  259. messagesApi.sendTopicMessages({ ...props, createTopicMessage: message }),
  260. {
  261. onSuccess: () => {
  262. showSuccessAlert({
  263. message: `Message successfully sent`,
  264. });
  265. client.invalidateQueries(topicKeys.all(props.clusterName));
  266. },
  267. onError: (e) => {
  268. showServerError(e as Response);
  269. },
  270. }
  271. );
  272. }
  273. // Statistics
  274. export function useTopicAnalysis(
  275. props: GetTopicDetailsRequest,
  276. enabled = true
  277. ) {
  278. return useQuery(
  279. topicKeys.statistics(props),
  280. () => api.getTopicAnalysis(props),
  281. {
  282. enabled,
  283. refetchInterval: 1000,
  284. useErrorBoundary: true,
  285. retry: false,
  286. suspense: false,
  287. onError: (error: Response) => {
  288. if (error.status !== 404) {
  289. showServerError(error as Response);
  290. }
  291. },
  292. }
  293. );
  294. }
  295. export function useAnalyzeTopic(props: GetTopicDetailsRequest) {
  296. const client = useQueryClient();
  297. return useMutation(() => api.analyzeTopic(props), {
  298. onSuccess: () => {
  299. client.invalidateQueries(topicKeys.statistics(props));
  300. },
  301. });
  302. }
  303. export function useCancelTopicAnalysis(props: GetTopicDetailsRequest) {
  304. const client = useQueryClient();
  305. return useMutation(() => api.cancelTopicAnalysis(props), {
  306. onSuccess: () => {
  307. showSuccessAlert({
  308. message: `Topic analysis canceled`,
  309. });
  310. client.invalidateQueries(topicKeys.statistics(props));
  311. },
  312. });
  313. }