import { v4 } from 'uuid'; import { TopicsApi, MessagesApi, Configuration, Topic, TopicCreation, TopicUpdate, TopicConfig, ConsumerGroupsApi, CreateTopicMessage, GetTopicsRequest, } from 'generated-sources'; import { PromiseThunkResult, ClusterName, TopicName, TopicFormFormattedParams, TopicFormDataRaw, TopicsState, FailurePayload, } from 'redux/interfaces'; import { BASE_PARAMS } from 'lib/constants'; import * as actions from 'redux/actions/actions'; import { getResponse } from 'lib/errorHandling'; const apiClientConf = new Configuration(BASE_PARAMS); export const topicsApiClient = new TopicsApi(apiClientConf); export const messagesApiClient = new MessagesApi(apiClientConf); export const topicConsumerGroupsApiClient = new ConsumerGroupsApi( apiClientConf ); export const fetchTopicsList = (params: GetTopicsRequest): PromiseThunkResult => async (dispatch, getState) => { dispatch(actions.fetchTopicsListAction.request()); try { const { topics, pageCount } = await topicsApiClient.getTopics(params); const newState = (topics || []).reduce( (memo: TopicsState, topic) => ({ ...memo, byName: { ...memo.byName, [topic.name]: { ...memo.byName[topic.name], ...topic, id: v4(), }, }, allNames: [...memo.allNames, topic.name], }), { ...getState().topics, allNames: [], totalPages: pageCount || 1, } ); dispatch(actions.fetchTopicsListAction.success(newState)); } catch (e) { dispatch(actions.fetchTopicsListAction.failure()); } }; export const clearTopicMessages = ( clusterName: ClusterName, topicName: TopicName, partitions?: number[] ): PromiseThunkResult => async (dispatch) => { dispatch(actions.clearMessagesTopicAction.request()); try { await messagesApiClient.deleteTopicMessages({ clusterName, topicName, partitions, }); dispatch(actions.clearMessagesTopicAction.success()); } catch (e) { const response = await getResponse(e); const alert: FailurePayload = { subject: [clusterName, topicName, partitions].join('-'), title: `Clear Topic Messages`, response, }; dispatch(actions.clearMessagesTopicAction.failure({ alert })); } }; export const fetchTopicDetails = (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult => async (dispatch, getState) => { dispatch(actions.fetchTopicDetailsAction.request()); try { const topicDetails = await topicsApiClient.getTopicDetails({ clusterName, topicName, }); const state = getState().topics; const newState = { ...state, byName: { ...state.byName, [topicName]: { ...state.byName[topicName], ...topicDetails, }, }, }; dispatch(actions.fetchTopicDetailsAction.success(newState)); } catch (e) { dispatch(actions.fetchTopicDetailsAction.failure()); } }; export const fetchTopicConfig = (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult => async (dispatch, getState) => { dispatch(actions.fetchTopicConfigAction.request()); try { const config = await topicsApiClient.getTopicConfigs({ clusterName, topicName, }); const state = getState().topics; const newState = { ...state, byName: { ...state.byName, [topicName]: { ...state.byName[topicName], config: config.map((inputConfig) => ({ ...inputConfig, })), }, }, }; dispatch(actions.fetchTopicConfigAction.success(newState)); } catch (e) { dispatch(actions.fetchTopicConfigAction.failure()); } }; const topicReducer = ( result: TopicFormFormattedParams, customParam: TopicConfig ) => { return { ...result, [customParam.name]: customParam.value, }; }; const formatTopicCreation = (form: TopicFormDataRaw): TopicCreation => { const { name, partitions, replicationFactor, cleanupPolicy, retentionBytes, retentionMs, maxMessageBytes, minInSyncReplicas, customParams, } = form; return { name, partitions, replicationFactor, configs: { 'cleanup.policy': cleanupPolicy, 'retention.ms': retentionMs, 'retention.bytes': retentionBytes, 'max.message.bytes': maxMessageBytes, 'min.insync.replicas': minInSyncReplicas, ...Object.values(customParams || {}).reduce(topicReducer, {}), }, }; }; const formatTopicUpdate = (form: TopicFormDataRaw): TopicUpdate => { const { cleanupPolicy, retentionBytes, retentionMs, maxMessageBytes, minInSyncReplicas, customParams, } = form; return { configs: { 'cleanup.policy': cleanupPolicy, 'retention.ms': retentionMs, 'retention.bytes': retentionBytes, 'max.message.bytes': maxMessageBytes, 'min.insync.replicas': minInSyncReplicas, ...Object.values(customParams || {}).reduce(topicReducer, {}), }, }; }; export const createTopic = (clusterName: ClusterName, form: TopicFormDataRaw): PromiseThunkResult => async (dispatch, getState) => { dispatch(actions.createTopicAction.request()); try { const topic: Topic = await topicsApiClient.createTopic({ clusterName, topicCreation: formatTopicCreation(form), }); const state = getState().topics; const newState = { ...state, byName: { ...state.byName, [topic.name]: { ...topic, }, }, allNames: [...state.allNames, topic.name], }; dispatch(actions.createTopicAction.success(newState)); } catch (error) { const response = await getResponse(error); const alert: FailurePayload = { subject: ['schema', form.name].join('-'), title: `Schema ${form.name}`, response, }; dispatch(actions.createTopicAction.failure({ alert })); } }; export const updateTopic = ( clusterName: ClusterName, topicName: TopicName, form: TopicFormDataRaw ): PromiseThunkResult => async (dispatch, getState) => { dispatch(actions.updateTopicAction.request()); try { const topic: Topic = await topicsApiClient.updateTopic({ clusterName, topicName, topicUpdate: formatTopicUpdate(form), }); const state = getState().topics; const newState = { ...state, byName: { ...state.byName, [topic.name]: { ...state.byName[topic.name], ...topic, }, }, }; dispatch(actions.updateTopicAction.success(newState)); } catch (e) { dispatch(actions.updateTopicAction.failure()); } }; export const deleteTopic = (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult => async (dispatch) => { dispatch(actions.deleteTopicAction.request()); try { await topicsApiClient.deleteTopic({ clusterName, topicName, }); dispatch(actions.deleteTopicAction.success(topicName)); } catch (e) { dispatch(actions.deleteTopicAction.failure()); } }; export const fetchTopicConsumerGroups = (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult => async (dispatch, getState) => { dispatch(actions.fetchTopicConsumerGroupsAction.request()); try { const consumerGroups = await topicConsumerGroupsApiClient.getTopicConsumerGroups({ clusterName, topicName, }); const state = getState().topics; const newState = { ...state, byName: { ...state.byName, [topicName]: { ...state.byName[topicName], consumerGroups, }, }, }; dispatch(actions.fetchTopicConsumerGroupsAction.success(newState)); } catch (e) { dispatch(actions.fetchTopicConsumerGroupsAction.failure()); } }; export const fetchTopicMessageSchema = (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult => async (dispatch) => { dispatch(actions.fetchTopicMessageSchemaAction.request()); try { const schema = await messagesApiClient.getTopicSchema({ clusterName, topicName, }); dispatch( actions.fetchTopicMessageSchemaAction.success({ topicName, schema }) ); } catch (e) { const response = await getResponse(e); const alert: FailurePayload = { subject: ['topic', topicName].join('-'), title: `Topic Schema ${topicName}`, response, }; dispatch(actions.fetchTopicMessageSchemaAction.failure({ alert })); } }; export const sendTopicMessage = ( clusterName: ClusterName, topicName: TopicName, payload: CreateTopicMessage ): PromiseThunkResult => async (dispatch) => { dispatch(actions.sendTopicMessageAction.request()); try { await messagesApiClient.sendTopicMessages({ clusterName, topicName, createTopicMessage: { key: payload.key, content: payload.content, headers: payload.headers, partition: payload.partition, }, }); dispatch(actions.sendTopicMessageAction.success()); } catch (e) { const response = await getResponse(e); const alert: FailurePayload = { subject: ['topic', topicName].join('-'), title: `Topic Message ${topicName}`, response, }; dispatch(actions.sendTopicMessageAction.failure({ alert })); } }; export const updateTopicPartitionsCount = ( clusterName: ClusterName, topicName: TopicName, partitions: number ): PromiseThunkResult => async (dispatch) => { dispatch(actions.updateTopicPartitionsCountAction.request()); try { await topicsApiClient.increaseTopicPartitions({ clusterName, topicName, partitionsIncrease: { totalPartitionsCount: partitions }, }); dispatch(actions.updateTopicPartitionsCountAction.success()); } catch (error) { const response = await getResponse(error); const alert: FailurePayload = { subject: ['topic-partitions', topicName].join('-'), title: `Topic ${topicName} partitions count increase failed`, response, }; dispatch(actions.updateTopicPartitionsCountAction.failure({ alert })); } }; export const updateTopicReplicationFactor = ( clusterName: ClusterName, topicName: TopicName, replicationFactor: number ): PromiseThunkResult => async (dispatch) => { dispatch(actions.updateTopicReplicationFactorAction.request()); try { await topicsApiClient.changeReplicationFactor({ clusterName, topicName, replicationFactorChange: { totalReplicationFactor: replicationFactor }, }); dispatch(actions.updateTopicReplicationFactorAction.success()); } catch (error) { const response = await getResponse(error); const alert: FailurePayload = { subject: ['topic-replication-factor', topicName].join('-'), title: `Topic ${topicName} replication factor change failed`, response, }; dispatch(actions.updateTopicReplicationFactorAction.failure({ alert })); } };