topics.ts 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. import {
  2. topicsApiClient as api,
  3. messagesApiClient as messagesApi,
  4. consumerGroupsApiClient,
  5. messagesApiClient,
  6. } from 'lib/api';
  7. import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
  8. import {
  9. ClusterName,
  10. TopicFormData,
  11. TopicFormDataRaw,
  12. TopicFormFormattedParams,
  13. } from 'redux/interfaces';
  14. import {
  15. CreateTopicMessage,
  16. GetTopicDetailsRequest,
  17. GetTopicsRequest,
  18. Topic,
  19. TopicConfig,
  20. TopicCreation,
  21. TopicUpdate,
  22. } from 'generated-sources';
  23. import { showServerError, showSuccessAlert } from 'lib/errorHandling';
  24. export const topicKeys = {
  25. all: (clusterName: ClusterName) =>
  26. ['clusters', clusterName, 'topics'] as const,
  27. list: (
  28. clusterName: ClusterName,
  29. filters: Omit<GetTopicsRequest, 'clusterName'>
  30. ) => [...topicKeys.all(clusterName), filters] as const,
  31. details: ({ clusterName, topicName }: GetTopicDetailsRequest) =>
  32. [...topicKeys.all(clusterName), topicName] as const,
  33. config: (props: GetTopicDetailsRequest) =>
  34. [...topicKeys.details(props), 'config'] as const,
  35. schema: (props: GetTopicDetailsRequest) =>
  36. [...topicKeys.details(props), 'schema'] as const,
  37. consumerGroups: (props: GetTopicDetailsRequest) =>
  38. [...topicKeys.details(props), 'consumerGroups'] as const,
  39. statistics: (props: GetTopicDetailsRequest) =>
  40. [...topicKeys.details(props), 'statistics'] as const,
  41. };
  42. export function useTopics(props: GetTopicsRequest) {
  43. const { clusterName, ...filters } = props;
  44. return useQuery(
  45. topicKeys.list(clusterName, filters),
  46. () => api.getTopics(props),
  47. { keepPreviousData: true }
  48. );
  49. }
  50. export function useTopicDetails(props: GetTopicDetailsRequest) {
  51. return useQuery(topicKeys.details(props), () => api.getTopicDetails(props));
  52. }
  53. export function useTopicConfig(props: GetTopicDetailsRequest) {
  54. return useQuery(topicKeys.config(props), () => api.getTopicConfigs(props));
  55. }
  56. export function useTopicConsumerGroups(props: GetTopicDetailsRequest) {
  57. return useQuery(topicKeys.consumerGroups(props), () =>
  58. consumerGroupsApiClient.getTopicConsumerGroups(props)
  59. );
  60. }
  61. const topicReducer = (
  62. result: TopicFormFormattedParams,
  63. customParam: TopicConfig
  64. ) => {
  65. return {
  66. ...result,
  67. [customParam.name]: customParam.value,
  68. };
  69. };
  70. const formatTopicCreation = (form: TopicFormData): TopicCreation => {
  71. const {
  72. name,
  73. partitions,
  74. replicationFactor,
  75. cleanupPolicy,
  76. retentionBytes,
  77. retentionMs,
  78. maxMessageBytes,
  79. minInSyncReplicas,
  80. customParams,
  81. } = form;
  82. const configs = {
  83. 'cleanup.policy': cleanupPolicy,
  84. 'retention.ms': retentionMs.toString(),
  85. 'retention.bytes': retentionBytes.toString(),
  86. 'max.message.bytes': maxMessageBytes.toString(),
  87. 'min.insync.replicas': minInSyncReplicas.toString(),
  88. ...Object.values(customParams || {}).reduce(topicReducer, {}),
  89. };
  90. const cleanConfigs = () => {
  91. return Object.fromEntries(
  92. Object.entries(configs).filter(([, val]) => val !== '')
  93. );
  94. };
  95. const topicsvalue = {
  96. name,
  97. partitions,
  98. configs: cleanConfigs(),
  99. };
  100. return replicationFactor.toString() !== ''
  101. ? {
  102. ...topicsvalue,
  103. replicationFactor,
  104. }
  105. : topicsvalue;
  106. };
  107. export function useCreateTopicMutation(clusterName: ClusterName) {
  108. const client = useQueryClient();
  109. return useMutation(
  110. (data: TopicFormData) =>
  111. api.createTopic({
  112. clusterName,
  113. topicCreation: formatTopicCreation(data),
  114. }),
  115. {
  116. onSuccess: () => {
  117. client.invalidateQueries(topicKeys.all(clusterName));
  118. },
  119. }
  120. );
  121. }
  122. // this will change later when we validate the request before
  123. export function useCreateTopic(clusterName: ClusterName) {
  124. const mutate = useCreateTopicMutation(clusterName);
  125. return {
  126. createResource: async (param: TopicFormData) => {
  127. return mutate.mutateAsync(param);
  128. },
  129. ...mutate,
  130. };
  131. }
  132. const formatTopicUpdate = (form: TopicFormDataRaw): TopicUpdate => {
  133. const {
  134. cleanupPolicy,
  135. retentionBytes,
  136. retentionMs,
  137. maxMessageBytes,
  138. minInSyncReplicas,
  139. customParams,
  140. } = form;
  141. return {
  142. configs: {
  143. ...Object.values(customParams || {}).reduce(topicReducer, {}),
  144. 'cleanup.policy': cleanupPolicy,
  145. 'retention.ms': retentionMs,
  146. 'retention.bytes': retentionBytes,
  147. 'max.message.bytes': maxMessageBytes,
  148. 'min.insync.replicas': minInSyncReplicas,
  149. },
  150. };
  151. };
  152. export function useUpdateTopic(props: GetTopicDetailsRequest) {
  153. const client = useQueryClient();
  154. return useMutation(
  155. (data: TopicFormDataRaw) => {
  156. return api.updateTopic({
  157. ...props,
  158. topicUpdate: formatTopicUpdate(data),
  159. });
  160. },
  161. {
  162. onSuccess: () => {
  163. showSuccessAlert({
  164. message: `Topic successfully updated.`,
  165. });
  166. client.invalidateQueries(topicKeys.all(props.clusterName));
  167. },
  168. }
  169. );
  170. }
  171. export function useIncreaseTopicPartitionsCount(props: GetTopicDetailsRequest) {
  172. const client = useQueryClient();
  173. return useMutation(
  174. (totalPartitionsCount: number) =>
  175. api.increaseTopicPartitions({
  176. ...props,
  177. partitionsIncrease: { totalPartitionsCount },
  178. }),
  179. {
  180. onSuccess: () => {
  181. showSuccessAlert({
  182. message: `Number of partitions successfully increased`,
  183. });
  184. client.invalidateQueries(topicKeys.all(props.clusterName));
  185. },
  186. }
  187. );
  188. }
  189. export function useUpdateTopicReplicationFactor(props: GetTopicDetailsRequest) {
  190. const client = useQueryClient();
  191. return useMutation(
  192. (totalReplicationFactor: number) =>
  193. api.changeReplicationFactor({
  194. ...props,
  195. replicationFactorChange: { totalReplicationFactor },
  196. }),
  197. {
  198. onSuccess: () => {
  199. showSuccessAlert({
  200. message: `Replication factor successfully updated`,
  201. });
  202. client.invalidateQueries(topicKeys.all(props.clusterName));
  203. },
  204. }
  205. );
  206. }
  207. export function useDeleteTopic(clusterName: ClusterName) {
  208. const client = useQueryClient();
  209. return useMutation(
  210. (topicName: Topic['name']) => api.deleteTopic({ clusterName, topicName }),
  211. {
  212. onSuccess: (_, topicName) => {
  213. showSuccessAlert({
  214. message: `Topic ${topicName} successfully deleted!`,
  215. });
  216. client.invalidateQueries(topicKeys.all(clusterName));
  217. },
  218. }
  219. );
  220. }
  221. export function useClearTopicMessages(
  222. clusterName: ClusterName,
  223. partitions?: number[]
  224. ) {
  225. const client = useQueryClient();
  226. return useMutation(
  227. async (topicName: Topic['name']) => {
  228. await messagesApiClient.deleteTopicMessages({
  229. clusterName,
  230. partitions,
  231. topicName,
  232. });
  233. return topicName;
  234. },
  235. {
  236. onSuccess: (topicName) => {
  237. showSuccessAlert({
  238. id: `message-${topicName}-${clusterName}-${partitions}`,
  239. message: `${topicName} messages have been successfully cleared!`,
  240. });
  241. client.invalidateQueries(topicKeys.all(clusterName));
  242. },
  243. }
  244. );
  245. }
  246. export function useRecreateTopic(props: GetTopicDetailsRequest) {
  247. const client = useQueryClient();
  248. return useMutation(() => api.recreateTopic(props), {
  249. onSuccess: () => {
  250. showSuccessAlert({
  251. message: `Topic ${props.topicName} successfully recreated!`,
  252. });
  253. client.invalidateQueries(topicKeys.all(props.clusterName));
  254. },
  255. });
  256. }
  257. export function useSendMessage(props: GetTopicDetailsRequest) {
  258. const client = useQueryClient();
  259. return useMutation(
  260. (message: CreateTopicMessage) =>
  261. messagesApi.sendTopicMessages({ ...props, createTopicMessage: message }),
  262. {
  263. onSuccess: () => {
  264. showSuccessAlert({
  265. message: `Message successfully sent`,
  266. });
  267. client.invalidateQueries(topicKeys.all(props.clusterName));
  268. },
  269. onError: (e) => {
  270. showServerError(e as Response);
  271. },
  272. }
  273. );
  274. }
  275. // Statistics
  276. export function useTopicAnalysis(
  277. props: GetTopicDetailsRequest,
  278. enabled = true
  279. ) {
  280. return useQuery(
  281. topicKeys.statistics(props),
  282. () => api.getTopicAnalysis(props),
  283. {
  284. enabled,
  285. refetchInterval: 1000,
  286. useErrorBoundary: true,
  287. retry: false,
  288. suspense: false,
  289. }
  290. );
  291. }
  292. export function useAnalyzeTopic(props: GetTopicDetailsRequest) {
  293. const client = useQueryClient();
  294. return useMutation(() => api.analyzeTopic(props), {
  295. onSuccess: () => {
  296. client.invalidateQueries(topicKeys.statistics(props));
  297. },
  298. });
  299. }
  300. export function useCancelTopicAnalysis(props: GetTopicDetailsRequest) {
  301. const client = useQueryClient();
  302. return useMutation(() => api.cancelTopicAnalysis(props), {
  303. onSuccess: () => {
  304. showSuccessAlert({
  305. message: `Topic analysis canceled`,
  306. });
  307. client.invalidateQueries(topicKeys.statistics(props));
  308. },
  309. });
  310. }