123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- import { v4 } from 'uuid';
- import {
- TopicsApi,
- MessagesApi,
- Configuration,
- Topic,
- TopicCreation,
- TopicUpdate,
- TopicConfig,
- ConsumerGroupsApi,
- GetTopicsRequest,
- } from 'generated-sources';
- import {
- PromiseThunkResult,
- ClusterName,
- TopicName,
- TopicFormFormattedParams,
- TopicFormDataRaw,
- TopicsState,
- FailurePayload,
- TopicFormData,
- AppDispatch,
- } from 'redux/interfaces';
- import { clearTopicMessages } from 'redux/reducers/topicMessages/topicMessagesSlice';
- import { BASE_PARAMS } from 'lib/constants';
- import * as actions from 'redux/actions/actions';
- import { getResponse } from 'lib/errorHandling';
- import { showSuccessAlert } from 'redux/reducers/alerts/alertsSlice';
- const apiClientConf = new Configuration(BASE_PARAMS);
- export const topicsApiClient = new TopicsApi(apiClientConf);
- export const messagesApiClient = new MessagesApi(apiClientConf);
- export const topicConsumerGroupsApiClient = new ConsumerGroupsApi(
- apiClientConf
- );
- export const fetchTopicsList =
- (params: GetTopicsRequest): PromiseThunkResult =>
- async (dispatch, getState) => {
- dispatch(actions.fetchTopicsListAction.request());
- try {
- const { topics, pageCount } = await topicsApiClient.getTopics(params);
- const newState = (topics || []).reduce(
- (memo: TopicsState, topic) => ({
- ...memo,
- byName: {
- ...memo.byName,
- [topic.name]: {
- ...memo.byName[topic.name],
- ...topic,
- id: v4(),
- },
- },
- allNames: [...memo.allNames, topic.name],
- }),
- {
- ...getState().topics,
- allNames: [],
- totalPages: pageCount || 1,
- }
- );
- dispatch(actions.fetchTopicsListAction.success(newState));
- } catch (e) {
- dispatch(actions.fetchTopicsListAction.failure());
- }
- };
- export const clearTopicsMessages =
- (clusterName: ClusterName, topicsName: TopicName[]): PromiseThunkResult =>
- async (dispatch) => {
- topicsName.forEach((topicName) => {
- dispatch(clearTopicMessages({ clusterName, topicName }));
- });
- };
- export const fetchTopicDetails =
- (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
- async (dispatch, getState) => {
- dispatch(actions.fetchTopicDetailsAction.request());
- try {
- const topicDetails = await topicsApiClient.getTopicDetails({
- clusterName,
- topicName,
- });
- const state = getState().topics;
- const newState = {
- ...state,
- byName: {
- ...state.byName,
- [topicName]: {
- ...state.byName[topicName],
- ...topicDetails,
- },
- },
- };
- dispatch(actions.fetchTopicDetailsAction.success(newState));
- } catch (e) {
- dispatch(actions.fetchTopicDetailsAction.failure());
- }
- };
- export const fetchTopicConfig =
- (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
- async (dispatch, getState) => {
- dispatch(actions.fetchTopicConfigAction.request());
- try {
- const config = await topicsApiClient.getTopicConfigs({
- clusterName,
- topicName,
- });
- const state = getState().topics;
- const newState = {
- ...state,
- byName: {
- ...state.byName,
- [topicName]: {
- ...state.byName[topicName],
- config: config.map((inputConfig) => ({
- ...inputConfig,
- })),
- },
- },
- };
- dispatch(actions.fetchTopicConfigAction.success(newState));
- } catch (e) {
- dispatch(actions.fetchTopicConfigAction.failure());
- }
- };
- const topicReducer = (
- result: TopicFormFormattedParams,
- customParam: TopicConfig
- ) => {
- return {
- ...result,
- [customParam.name]: customParam.value,
- };
- };
- export const formatTopicCreation = (form: TopicFormData): TopicCreation => {
- const {
- name,
- partitions,
- replicationFactor,
- cleanupPolicy,
- retentionBytes,
- retentionMs,
- maxMessageBytes,
- minInsyncReplicas,
- customParams,
- } = form;
- return {
- name,
- partitions,
- replicationFactor,
- configs: {
- 'cleanup.policy': cleanupPolicy,
- 'retention.ms': retentionMs.toString(),
- 'retention.bytes': retentionBytes.toString(),
- 'max.message.bytes': maxMessageBytes.toString(),
- 'min.insync.replicas': minInsyncReplicas.toString(),
- ...Object.values(customParams || {}).reduce(topicReducer, {}),
- },
- };
- };
- const formatTopicUpdate = (form: TopicFormDataRaw): TopicUpdate => {
- const {
- cleanupPolicy,
- retentionBytes,
- retentionMs,
- maxMessageBytes,
- minInsyncReplicas,
- customParams,
- } = form;
- return {
- configs: {
- 'cleanup.policy': cleanupPolicy,
- 'retention.ms': retentionMs,
- 'retention.bytes': retentionBytes,
- 'max.message.bytes': maxMessageBytes,
- 'min.insync.replicas': minInsyncReplicas,
- ...Object.values(customParams || {}).reduce(topicReducer, {}),
- },
- };
- };
- export const updateTopic =
- (
- clusterName: ClusterName,
- topicName: TopicName,
- form: TopicFormDataRaw
- ): PromiseThunkResult =>
- async (dispatch, getState) => {
- dispatch(actions.updateTopicAction.request());
- try {
- const topic: Topic = await topicsApiClient.updateTopic({
- clusterName,
- topicName,
- topicUpdate: formatTopicUpdate(form),
- });
- const state = getState().topics;
- const newState = {
- ...state,
- byName: {
- ...state.byName,
- [topic.name]: {
- ...state.byName[topic.name],
- ...topic,
- },
- },
- };
- dispatch(actions.updateTopicAction.success(newState));
- } catch (e) {
- dispatch(actions.updateTopicAction.failure());
- }
- };
- export const deleteTopic =
- (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
- async (dispatch) => {
- dispatch(actions.deleteTopicAction.request());
- try {
- await topicsApiClient.deleteTopic({
- clusterName,
- topicName,
- });
- dispatch(actions.deleteTopicAction.success(topicName));
- } catch (e) {
- dispatch(actions.deleteTopicAction.failure());
- }
- };
- export const recreateTopic =
- (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
- async (dispatch) => {
- dispatch(actions.recreateTopicAction.request());
- try {
- const topic = await topicsApiClient.recreateTopic({
- clusterName,
- topicName,
- });
- dispatch(actions.recreateTopicAction.success(topic));
- (dispatch as AppDispatch)(
- showSuccessAlert({
- id: topicName,
- message: 'Topic successfully recreated!',
- })
- );
- } catch (e) {
- dispatch(actions.recreateTopicAction.failure());
- }
- };
- export const deleteTopics =
- (clusterName: ClusterName, topicsName: TopicName[]): PromiseThunkResult =>
- async (dispatch) => {
- topicsName.forEach((topicName) => {
- dispatch(deleteTopic(clusterName, topicName));
- });
- };
- export const fetchTopicConsumerGroups =
- (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
- async (dispatch, getState) => {
- dispatch(actions.fetchTopicConsumerGroupsAction.request());
- try {
- const consumerGroups =
- await topicConsumerGroupsApiClient.getTopicConsumerGroups({
- clusterName,
- topicName,
- });
- const state = getState().topics;
- const newState = {
- ...state,
- byName: {
- ...state.byName,
- [topicName]: {
- ...state.byName[topicName],
- consumerGroups,
- },
- },
- };
- dispatch(actions.fetchTopicConsumerGroupsAction.success(newState));
- } catch (e) {
- dispatch(actions.fetchTopicConsumerGroupsAction.failure());
- }
- };
- export const fetchTopicMessageSchema =
- (clusterName: ClusterName, topicName: TopicName): PromiseThunkResult =>
- async (dispatch) => {
- dispatch(actions.fetchTopicMessageSchemaAction.request());
- try {
- const schema = await messagesApiClient.getTopicSchema({
- clusterName,
- topicName,
- });
- dispatch(
- actions.fetchTopicMessageSchemaAction.success({ topicName, schema })
- );
- } catch (e) {
- const response = await getResponse(e);
- const alert: FailurePayload = {
- subject: ['topic', topicName].join('-'),
- title: `Topic Schema ${topicName}`,
- response,
- };
- dispatch(actions.fetchTopicMessageSchemaAction.failure({ alert }));
- }
- };
- export const updateTopicPartitionsCount =
- (
- clusterName: ClusterName,
- topicName: TopicName,
- partitions: number
- ): PromiseThunkResult =>
- async (dispatch) => {
- dispatch(actions.updateTopicPartitionsCountAction.request());
- try {
- await topicsApiClient.increaseTopicPartitions({
- clusterName,
- topicName,
- partitionsIncrease: { totalPartitionsCount: partitions },
- });
- dispatch(actions.updateTopicPartitionsCountAction.success());
- } catch (error) {
- const response = await getResponse(error);
- const alert: FailurePayload = {
- subject: ['topic-partitions', topicName].join('-'),
- title: `Topic ${topicName} partitions count increase failed`,
- response,
- };
- dispatch(actions.updateTopicPartitionsCountAction.failure({ alert }));
- }
- };
- export const updateTopicReplicationFactor =
- (
- clusterName: ClusterName,
- topicName: TopicName,
- replicationFactor: number
- ): PromiseThunkResult =>
- async (dispatch) => {
- dispatch(actions.updateTopicReplicationFactorAction.request());
- try {
- await topicsApiClient.changeReplicationFactor({
- clusterName,
- topicName,
- replicationFactorChange: { totalReplicationFactor: replicationFactor },
- });
- dispatch(actions.updateTopicReplicationFactorAction.success());
- } catch (error) {
- const response = await getResponse(error);
- const alert: FailurePayload = {
- subject: ['topic-replication-factor', topicName].join('-'),
- title: `Topic ${topicName} replication factor change failed`,
- response,
- };
- dispatch(actions.updateTopicReplicationFactorAction.failure({ alert }));
- }
- };
|