Resetting consumer group offsets (#774)

This commit is contained in:
Alexander Krivonosov 2021-08-16 07:39:35 +03:00 committed by GitHub
parent 4f14942c6d
commit 128c67c1cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 940 additions and 10 deletions

View file

@ -5,6 +5,8 @@ import PageLoader from 'components/common/PageLoader/PageLoader';
import DetailsContainer from 'components/ConsumerGroups/Details/DetailsContainer';
import ListContainer from 'components/ConsumerGroups/List/ListContainer';
import ResetOffsetsContainer from './Details/ResetOffsets/ResetOffsetsContainer';
interface Props {
clusterName: ClusterName;
isFetched: boolean;
@ -29,9 +31,14 @@ const ConsumerGroups: React.FC<Props> = ({
component={ListContainer}
/>
<Route
exact
path="/ui/clusters/:clusterName/consumer-groups/:consumerGroupID"
component={DetailsContainer}
/>
<Route
path="/ui/clusters/:clusterName/consumer-groups/:consumerGroupID/reset-offsets"
component={ResetOffsetsContainer}
/>
</Switch>
);
}

View file

@ -1,7 +1,10 @@
import React from 'react';
import { ClusterName } from 'redux/interfaces';
import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
import { clusterConsumerGroupsPath } from 'lib/paths';
import {
clusterConsumerGroupResetOffsetsPath,
clusterConsumerGroupsPath,
} from 'lib/paths';
import { ConsumerGroupID } from 'redux/interfaces/consumerGroup';
import {
ConsumerGroup,
@ -53,6 +56,10 @@ const Details: React.FC<Props> = ({
}
}, [isDeleted]);
const onResetOffsets = () => {
history.push(clusterConsumerGroupResetOffsetsPath(clusterName, groupId));
};
return (
<div className="section">
<div className="level">
@ -74,6 +81,9 @@ const Details: React.FC<Props> = ({
<div className="box">
<div className="level">
<div className="level-item level-right buttons">
<button type="button" className="button" onClick={onResetOffsets}>
Reset offsets
</button>
<button
type="button"
className="button is-danger"

View file

@ -0,0 +1,324 @@
import Breadcrumb from 'components/common/Breadcrumb/Breadcrumb';
import {
ConsumerGroupDetails,
ConsumerGroupOffsetsResetType,
} from 'generated-sources';
import {
clusterConsumerGroupsPath,
clusterConsumerGroupDetailsPath,
} from 'lib/paths';
import React from 'react';
import { Controller, useFieldArray, useForm } from 'react-hook-form';
import { ClusterName, ConsumerGroupID } from 'redux/interfaces';
import MultiSelect from 'react-multi-select-component';
import { Option } from 'react-multi-select-component/dist/lib/interfaces';
import DatePicker from 'react-datepicker';
import 'react-datepicker/dist/react-datepicker.css';
import { groupBy } from 'lodash';
import PageLoader from 'components/common/PageLoader/PageLoader';
import { ErrorMessage } from '@hookform/error-message';
import { useHistory } from 'react-router';
export interface Props {
clusterName: ClusterName;
consumerGroupID: ConsumerGroupID;
consumerGroup: ConsumerGroupDetails;
detailsAreFetched: boolean;
IsOffsetReset: boolean;
fetchConsumerGroupDetails(
clusterName: ClusterName,
consumerGroupID: ConsumerGroupID
): void;
resetConsumerGroupOffsets(
clusterName: ClusterName,
consumerGroupID: ConsumerGroupID,
requestBody: {
topic: string;
resetType: ConsumerGroupOffsetsResetType;
partitionsOffsets?: { offset: string; partition: number }[];
resetToTimestamp?: Date;
partitions: number[];
}
): void;
}
interface FormType {
topic: string;
resetType: ConsumerGroupOffsetsResetType;
partitionsOffsets: { offset: string | undefined; partition: number }[];
resetToTimestamp: Date;
}
const ResetOffsets: React.FC<Props> = ({
clusterName,
consumerGroupID,
consumerGroup,
detailsAreFetched,
IsOffsetReset,
fetchConsumerGroupDetails,
resetConsumerGroupOffsets,
}) => {
React.useEffect(() => {
fetchConsumerGroupDetails(clusterName, consumerGroupID);
}, [clusterName, consumerGroupID]);
const [uniqueTopics, setUniqueTopics] = React.useState<string[]>([]);
const [selectedPartitions, setSelectedPartitions] = React.useState<Option[]>(
[]
);
const {
register,
handleSubmit,
setValue,
watch,
control,
setError,
clearErrors,
formState: { errors },
} = useForm<FormType>({
defaultValues: {
resetType: ConsumerGroupOffsetsResetType.EARLIEST,
topic: '',
partitionsOffsets: [],
},
});
const { fields } = useFieldArray({
control,
name: 'partitionsOffsets',
});
const resetTypeValue = watch('resetType');
const topicValue = watch('topic');
const offsetsValue = watch('partitionsOffsets');
React.useEffect(() => {
if (detailsAreFetched && consumerGroup.partitions) {
setValue('topic', consumerGroup.partitions[0].topic);
setUniqueTopics(Object.keys(groupBy(consumerGroup.partitions, 'topic')));
}
}, [detailsAreFetched]);
const onSelectedPartitionsChange = (value: Option[]) => {
clearErrors();
setValue(
'partitionsOffsets',
value.map((partition) => {
const currentOffset = offsetsValue.find(
(offset) => offset.partition === partition.value
);
return {
offset: currentOffset ? currentOffset?.offset : undefined,
partition: partition.value,
};
})
);
setSelectedPartitions(value);
};
React.useEffect(() => {
onSelectedPartitionsChange([]);
}, [topicValue]);
const onSubmit = (data: FormType) => {
const augmentedData = {
...data,
partitions: selectedPartitions.map((partition) => partition.value),
partitionsOffsets: data.partitionsOffsets as {
offset: string;
partition: number;
}[],
};
let isValid = true;
if (augmentedData.resetType === ConsumerGroupOffsetsResetType.OFFSET) {
augmentedData.partitionsOffsets.forEach((offset, index) => {
if (!offset.offset) {
setError(`partitionsOffsets.${index}.offset`, {
type: 'manual',
message: "This field shouldn't be empty!",
});
isValid = false;
}
});
} else if (
augmentedData.resetType === ConsumerGroupOffsetsResetType.TIMESTAMP
) {
if (!augmentedData.resetToTimestamp) {
setError(`resetToTimestamp`, {
type: 'manual',
message: "This field shouldn't be empty!",
});
isValid = false;
}
}
if (isValid) {
resetConsumerGroupOffsets(clusterName, consumerGroupID, augmentedData);
}
};
const history = useHistory();
React.useEffect(() => {
if (IsOffsetReset) {
history.push(
clusterConsumerGroupDetailsPath(clusterName, consumerGroupID)
);
}
}, [IsOffsetReset]);
if (!detailsAreFetched) {
return <PageLoader />;
}
return (
<div className="section">
<div className="level">
<div className="level-item level-left">
<Breadcrumb
links={[
{
href: clusterConsumerGroupsPath(clusterName),
label: 'All Consumer Groups',
},
{
href: clusterConsumerGroupDetailsPath(
clusterName,
consumerGroupID
),
label: consumerGroupID,
},
]}
>
Reset Offsets
</Breadcrumb>
</div>
</div>
<div className="box">
<form onSubmit={handleSubmit(onSubmit)}>
<div className="columns">
<div className="column is-one-third">
<label className="label" htmlFor="topic">
Topic
</label>
<div className="select">
<select {...register('topic')} id="topic">
{uniqueTopics.map((topic) => (
<option key={topic} value={topic}>
{topic}
</option>
))}
</select>
</div>
</div>
<div className="column is-one-third">
<label className="label" htmlFor="resetType">
Reset Type
</label>
<div className="select">
<select {...register('resetType')} id="resetType">
{Object.values(ConsumerGroupOffsetsResetType).map((type) => (
<option key={type} value={type}>
{type}
</option>
))}
</select>
</div>
</div>
<div className="column is-one-third">
<label className="label">Partitions</label>
<div className="select">
<MultiSelect
options={
consumerGroup.partitions
?.filter((p) => p.topic === topicValue)
.map((p) => ({
label: `Partition #${p.partition.toString()}`,
value: p.partition,
})) || []
}
value={selectedPartitions}
onChange={onSelectedPartitionsChange}
labelledBy="Select partitions"
/>
</div>
</div>
</div>
{resetTypeValue === ConsumerGroupOffsetsResetType.TIMESTAMP &&
selectedPartitions.length > 0 && (
<div className="columns">
<div className="column is-half">
<label className="label">Timestamp</label>
<Controller
control={control}
name="resetToTimestamp"
render={({ field: { onChange, onBlur, value, ref } }) => (
<DatePicker
ref={ref}
selected={value}
onChange={onChange}
onBlur={onBlur}
showTimeInput
timeInputLabel="Time:"
dateFormat="MMMM d, yyyy h:mm aa"
className="input"
/>
)}
/>
<ErrorMessage
errors={errors}
name="resetToTimestamp"
render={({ message }) => (
<p className="help is-danger">{message}</p>
)}
/>
</div>
</div>
)}
{resetTypeValue === ConsumerGroupOffsetsResetType.OFFSET &&
selectedPartitions.length > 0 && (
<div className="columns">
<div className="column is-one-third">
<label className="label">Offsets</label>
{fields.map((field, index) => (
<div key={field.id} className="mb-2">
<label
className="subtitle is-6"
htmlFor={`partitionsOffsets.${index}.offset`}
>
Partition #{field.partition}
</label>
<input
id={`partitionsOffsets.${index}.offset`}
type="number"
className="input"
{...register(
`partitionsOffsets.${index}.offset` as const,
{ shouldUnregister: true }
)}
defaultValue={field.offset}
/>
<ErrorMessage
errors={errors}
name={`partitionsOffsets.${index}.offset`}
render={({ message }) => (
<p className="help is-danger">{message}</p>
)}
/>
</div>
))}
</div>
</div>
)}
<button
className="button is-primary"
type="submit"
disabled={selectedPartitions.length === 0}
>
Submit
</button>
</form>
</div>
</div>
);
};
export default ResetOffsets;

View file

@ -0,0 +1,45 @@
import { RouteComponentProps, withRouter } from 'react-router-dom';
import { connect } from 'react-redux';
import { ClusterName, ConsumerGroupID, RootState } from 'redux/interfaces';
import {
getConsumerGroupByID,
getIsConsumerGroupDetailsFetched,
getOffsetReset,
} from 'redux/reducers/consumerGroups/selectors';
import {
fetchConsumerGroupDetails,
resetConsumerGroupOffsets,
} from 'redux/actions';
import ResetOffsets from './ResetOffsets';
interface RouteProps {
clusterName: ClusterName;
consumerGroupID: ConsumerGroupID;
}
type OwnProps = RouteComponentProps<RouteProps>;
const mapStateToProps = (
state: RootState,
{
match: {
params: { consumerGroupID, clusterName },
},
}: OwnProps
) => ({
clusterName,
consumerGroupID,
consumerGroup: getConsumerGroupByID(state, consumerGroupID),
detailsAreFetched: getIsConsumerGroupDetailsFetched(state),
IsOffsetReset: getOffsetReset(state),
});
const mapDispatchToProps = {
fetchConsumerGroupDetails,
resetConsumerGroupOffsets,
};
export default withRouter(
connect(mapStateToProps, mapDispatchToProps)(ResetOffsets)
);

View file

@ -0,0 +1,178 @@
import { fireEvent, render, screen, waitFor } from '@testing-library/react';
import ResetOffsets, {
Props,
} from 'components/ConsumerGroups/Details/ResetOffsets/ResetOffsets';
import { ConsumerGroupState } from 'generated-sources';
import React from 'react';
import { StaticRouter } from 'react-router';
import { expectedOutputs } from './fixtures';
const setupWrapper = (props?: Partial<Props>) => (
<StaticRouter>
<ResetOffsets
clusterName="testCluster"
consumerGroupID="testGroup"
consumerGroup={{
groupId: 'amazon.msk.canary.group.broker-1',
members: 0,
topics: 2,
simple: false,
partitionAssignor: '',
state: ConsumerGroupState.EMPTY,
coordinator: {
id: 2,
host: 'b-2.kad-msk.st2jzq.c6.kafka.eu-west-1.amazonaws.com',
},
messagesBehind: 0,
partitions: [
{
topic: '__amazon_msk_canary',
partition: 1,
currentOffset: 0,
endOffset: 0,
messagesBehind: 0,
consumerId: undefined,
host: undefined,
},
{
topic: '__amazon_msk_canary',
partition: 0,
currentOffset: 56932,
endOffset: 56932,
messagesBehind: 0,
consumerId: undefined,
host: undefined,
},
{
topic: 'other_topic',
partition: 3,
currentOffset: 56932,
endOffset: 56932,
messagesBehind: 0,
consumerId: undefined,
host: undefined,
},
{
topic: 'other_topic',
partition: 4,
currentOffset: 56932,
endOffset: 56932,
messagesBehind: 0,
consumerId: undefined,
host: undefined,
},
],
}}
detailsAreFetched
IsOffsetReset={false}
fetchConsumerGroupDetails={jest.fn()}
resetConsumerGroupOffsets={jest.fn()}
{...props}
/>
</StaticRouter>
);
const selectresetTypeAndPartitions = async (resetType: string) => {
fireEvent.change(screen.getByLabelText('Reset Type'), {
target: { value: resetType },
});
await waitFor(() => {
fireEvent.click(screen.getByText('Select...'));
});
await waitFor(() => {
fireEvent.click(screen.getByText('Partition #0'));
});
};
describe('ResetOffsets', () => {
describe('on initial render', () => {
const component = render(setupWrapper());
it('matches the snapshot', () => {
expect(component.baseElement).toMatchSnapshot();
});
});
describe('on submit', () => {
describe('with the default ResetType', () => {
it('calls resetConsumerGroupOffsets', async () => {
const mockResetConsumerGroupOffsets = jest.fn();
render(
setupWrapper({
resetConsumerGroupOffsets: mockResetConsumerGroupOffsets,
})
);
await selectresetTypeAndPartitions('EARLIEST');
await waitFor(() => {
fireEvent.click(screen.getByText('Submit'));
});
expect(mockResetConsumerGroupOffsets).toHaveBeenCalledTimes(1);
expect(mockResetConsumerGroupOffsets).toHaveBeenCalledWith(
'testCluster',
'testGroup',
expectedOutputs.EARLIEST
);
});
});
describe('with the ResetType set to LATEST', () => {
it('calls resetConsumerGroupOffsets', async () => {
const mockResetConsumerGroupOffsets = jest.fn();
render(
setupWrapper({
resetConsumerGroupOffsets: mockResetConsumerGroupOffsets,
})
);
await selectresetTypeAndPartitions('LATEST');
await waitFor(() => {
fireEvent.click(screen.getByText('Submit'));
});
expect(mockResetConsumerGroupOffsets).toHaveBeenCalledTimes(1);
expect(mockResetConsumerGroupOffsets).toHaveBeenCalledWith(
'testCluster',
'testGroup',
expectedOutputs.LATEST
);
});
});
describe('with the ResetType set to OFFSET', () => {
it('calls resetConsumerGroupOffsets', async () => {
const mockResetConsumerGroupOffsets = jest.fn();
render(
setupWrapper({
resetConsumerGroupOffsets: mockResetConsumerGroupOffsets,
})
);
await selectresetTypeAndPartitions('OFFSET');
await waitFor(() => {
fireEvent.change(screen.getAllByLabelText('Partition #0')[1], {
target: { value: '10' },
});
});
await waitFor(() => {
fireEvent.click(screen.getByText('Submit'));
});
expect(mockResetConsumerGroupOffsets).toHaveBeenCalledTimes(1);
expect(mockResetConsumerGroupOffsets).toHaveBeenCalledWith(
'testCluster',
'testGroup',
expectedOutputs.OFFSET
);
});
});
describe('with the ResetType set to TIMESTAMP', () => {
it('adds error to the page when the input is left empty', async () => {
const mockResetConsumerGroupOffsets = jest.fn();
render(setupWrapper());
await selectresetTypeAndPartitions('TIMESTAMP');
await waitFor(() => {
fireEvent.click(screen.getByText('Submit'));
});
expect(mockResetConsumerGroupOffsets).toHaveBeenCalledTimes(0);
expect(screen.getByText("This field shouldn't be empty!")).toBeTruthy();
});
});
});
});

View file

@ -0,0 +1,184 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`ResetOffsets on initial render matches the snapshot 1`] = `
<body>
<div>
<div
class="section"
>
<div
class="level"
>
<div
class="level-item level-left"
>
<nav
aria-label="breadcrumbs"
class="breadcrumb"
>
<ul>
<li>
<a
href="/ui/clusters/testCluster/consumer-groups"
>
All Consumer Groups
</a>
</li>
<li>
<a
href="/ui/clusters/testCluster/consumer-groups/testGroup"
>
testGroup
</a>
</li>
<li
class="is-active"
>
<span
class=""
>
Reset Offsets
</span>
</li>
</ul>
</nav>
</div>
</div>
<div
class="box"
>
<form>
<div
class="columns"
>
<div
class="column is-one-third"
>
<label
class="label"
for="topic"
>
Topic
</label>
<div
class="select"
>
<select
id="topic"
name="topic"
>
<option
value="__amazon_msk_canary"
>
__amazon_msk_canary
</option>
<option
value="other_topic"
>
other_topic
</option>
</select>
</div>
</div>
<div
class="column is-one-third"
>
<label
class="label"
for="resetType"
>
Reset Type
</label>
<div
class="select"
>
<select
id="resetType"
name="resetType"
>
<option
value="EARLIEST"
>
EARLIEST
</option>
<option
value="LATEST"
>
LATEST
</option>
<option
value="TIMESTAMP"
>
TIMESTAMP
</option>
<option
value="OFFSET"
>
OFFSET
</option>
</select>
</div>
</div>
<div
class="column is-one-third"
>
<label
class="label"
>
Partitions
</label>
<div
class="select"
>
<div
class="rmsc multi-select"
>
<div
aria-labelledby="Select partitions"
aria-readonly="true"
class="dropdown-container"
tabindex="0"
>
<div
class="dropdown-heading"
>
<div
class="dropdown-heading-value"
>
<span
class="gray"
>
Select...
</span>
</div>
<svg
class="dropdown-heading-dropdown-arrow gray"
fill="none"
height="24"
stroke="currentColor"
stroke-width="2"
width="24"
>
<path
d="M6 9L12 15 18 9"
/>
</svg>
</div>
</div>
</div>
</div>
</div>
</div>
<button
class="button is-primary"
disabled=""
type="submit"
>
Submit
</button>
</form>
</div>
</div>
</div>
</body>
`;

View file

@ -0,0 +1,35 @@
export const expectedOutputs = {
EARLIEST: {
partitions: [0],
partitionsOffsets: [
{
offset: undefined,
partition: 0,
},
],
resetType: 'EARLIEST',
topic: '__amazon_msk_canary',
},
LATEST: {
partitions: [0],
partitionsOffsets: [
{
offset: undefined,
partition: 0,
},
],
resetType: 'LATEST',
topic: '__amazon_msk_canary',
},
OFFSET: {
partitions: [0],
partitionsOffsets: [
{
offset: '10',
partition: 0,
},
],
resetType: 'OFFSET',
topic: '__amazon_msk_canary',
},
};

View file

@ -62,7 +62,7 @@ describe('Details component', () => {
const component = mount(
<StaticRouter>{setupWrapper({ deleteConsumerGroup })}</StaticRouter>
);
component.find('button').at(0).simulate('click');
component.find('button').at(1).simulate('click');
component.update();
component
.find('ConfirmationModal')
@ -78,7 +78,7 @@ describe('Details component', () => {
const component = mount(
<StaticRouter>{setupWrapper({ deleteConsumerGroup })}</StaticRouter>
);
component.find('button').at(0).simulate('click');
component.find('button').at(1).simulate('click');
component.update();
component
.find('ConfirmationModal')

View file

@ -68,6 +68,13 @@ exports[`Details component when consumer gruops are fetched Matches the snapshot
<div
className="level-item level-right buttons"
>
<button
className="button"
onClick={[Function]}
type="button"
>
Reset offsets
</button>
<button
className="button is-danger"
onClick={[Function]}

View file

@ -20,6 +20,14 @@ export const clusterBrokersPath = (clusterName: ClusterName) =>
// Consumer Groups
export const clusterConsumerGroupsPath = (clusterName: ClusterName) =>
`${clusterPath(clusterName)}/consumer-groups`;
export const clusterConsumerGroupDetailsPath = (
clusterName: ClusterName,
groupId: string
) => `${clusterPath(clusterName)}/consumer-groups/${groupId}`;
export const clusterConsumerGroupResetOffsetsPath = (
clusterName: ClusterName,
groupId: string
) => `${clusterPath(clusterName)}/consumer-groups/${groupId}/reset-offsets`;
// Schemas
export const clusterSchemasPath = (clusterName: ClusterName) =>

View file

@ -1,4 +1,5 @@
import fetchMock from 'fetch-mock-jest';
import { ConsumerGroupOffsetsResetType } from 'generated-sources';
import * as actions from 'redux/actions/actions';
import * as thunks from 'redux/actions/thunks';
import { FailurePayload } from 'redux/interfaces';
@ -15,7 +16,7 @@ describe('Consumer Groups Thunks', () => {
});
describe('deleting consumer groups', () => {
it('calls DELETE_CONSUMER_GROUP__SUCCESS after successful delete', async () => {
it('calls DELETE_CONSUMER_GROUP__SUCCESS after a successful delete', async () => {
fetchMock.deleteOnce(
`/api/clusters/${clusterName}/consumer-groups/${id}`,
200
@ -28,7 +29,7 @@ describe('Consumer Groups Thunks', () => {
]);
});
it('calls DELETE_CONSUMER_GROUP__FAILURE after successful delete', async () => {
it('calls DELETE_CONSUMER_GROUP__FAILURE after an unsuccessful delete', async () => {
fetchMock.deleteOnce(
`/api/clusters/${clusterName}/consumer-groups/${id}`,
500
@ -50,4 +51,65 @@ describe('Consumer Groups Thunks', () => {
]);
});
});
describe('resetting consumer groups offset', () => {
it('calls RESET_OFFSETS__SUCCESS after successful reset', async () => {
fetchMock.postOnce(
`/api/clusters/${clusterName}/consumer-groups/${id}/offsets`,
200
);
await store.dispatch(
thunks.resetConsumerGroupOffsets(clusterName, id, {
partitions: [1],
partitionsOffsets: [
{
offset: '10',
partition: 1,
},
],
resetType: ConsumerGroupOffsetsResetType.OFFSET,
topic: '__amazon_msk_canary',
})
);
expect(store.getActions()).toEqual([
actions.resetConsumerGroupOffsetsAction.request(),
actions.resetConsumerGroupOffsetsAction.success(),
]);
});
it('calls RESET_OFFSETS__FAILURE after an unsuccessful reset', async () => {
fetchMock.postOnce(
`/api/clusters/${clusterName}/consumer-groups/${id}/offsets`,
500
);
await store.dispatch(
thunks.resetConsumerGroupOffsets(clusterName, id, {
partitions: [1],
partitionsOffsets: [
{
offset: '10',
partition: 1,
},
],
resetType: ConsumerGroupOffsetsResetType.OFFSET,
topic: '__amazon_msk_canary',
})
);
const alert: FailurePayload = {
subject: ['consumer-group', id].join('-'),
title: `Consumer Gropup ${id}`,
response: {
body: undefined,
status: 500,
statusText: 'Internal Server Error',
},
};
expect(store.getActions()).toEqual([
actions.resetConsumerGroupOffsetsAction.request(),
actions.resetConsumerGroupOffsetsAction.failure({ alert }),
]);
});
});
});

View file

@ -103,7 +103,7 @@ export const fetchConsumerGroupsAction = createAsyncAction(
'GET_CONSUMER_GROUPS__REQUEST',
'GET_CONSUMER_GROUPS__SUCCESS',
'GET_CONSUMER_GROUPS__FAILURE'
)<undefined, ConsumerGroup[], undefined>();
)<undefined, ConsumerGroup[], { alert?: FailurePayload }>();
export const fetchConsumerGroupDetailsAction = createAsyncAction(
'GET_CONSUMER_GROUP_DETAILS__REQUEST',
@ -112,7 +112,7 @@ export const fetchConsumerGroupDetailsAction = createAsyncAction(
)<
undefined,
{ consumerGroupID: ConsumerGroupID; details: ConsumerGroupDetails },
undefined
{ alert?: FailurePayload }
>();
export const deleteConsumerGroupAction = createAsyncAction(
@ -294,3 +294,9 @@ export const updateTopicReplicationFactorAction = createAsyncAction(
'UPDATE_REPLICATION_FACTOR__SUCCESS',
'UPDATE_REPLICATION_FACTOR__FAILURE'
)<undefined, undefined, { alert?: FailurePayload }>();
export const resetConsumerGroupOffsetsAction = createAsyncAction(
'RESET_OFFSETS__REQUEST',
'RESET_OFFSETS__SUCCESS',
'RESET_OFFSETS__FAILURE'
)<undefined, undefined, { alert?: FailurePayload }>();

View file

@ -1,4 +1,8 @@
import { ConsumerGroupsApi, Configuration } from 'generated-sources';
import {
ConsumerGroupsApi,
Configuration,
ConsumerGroupOffsetsResetType,
} from 'generated-sources';
import {
ConsumerGroupID,
PromiseThunkResult,
@ -22,7 +26,13 @@ export const fetchConsumerGroupsList =
});
dispatch(actions.fetchConsumerGroupsAction.success(consumerGroups));
} catch (e) {
dispatch(actions.fetchConsumerGroupsAction.failure());
const response = await getResponse(e);
const alert: FailurePayload = {
subject: ['consumer-groups', clusterName].join('-'),
title: `Consumer Gropups`,
response,
};
dispatch(actions.fetchConsumerGroupsAction.failure({ alert }));
}
};
@ -46,7 +56,13 @@ export const fetchConsumerGroupDetails =
})
);
} catch (e) {
dispatch(actions.fetchConsumerGroupDetailsAction.failure());
const response = await getResponse(e);
const alert: FailurePayload = {
subject: ['consumer-group', consumerGroupID].join('-'),
title: `Consumer Gropup ${consumerGroupID}`,
response,
};
dispatch(actions.fetchConsumerGroupDetailsAction.failure({ alert }));
}
};
@ -73,3 +89,44 @@ export const deleteConsumerGroup =
dispatch(actions.deleteConsumerGroupAction.failure({ alert }));
}
};
export const resetConsumerGroupOffsets =
(
clusterName: ClusterName,
consumerGroupID: ConsumerGroupID,
requestBody: {
topic: string;
resetType: ConsumerGroupOffsetsResetType;
partitionsOffsets?: { offset: string; partition: number }[];
resetToTimestamp?: Date;
partitions: number[];
}
): PromiseThunkResult =>
async (dispatch) => {
dispatch(actions.resetConsumerGroupOffsetsAction.request());
try {
await consumerGroupsApiClient.resetConsumerGroupOffsets({
clusterName,
id: consumerGroupID,
consumerGroupOffsetsReset: {
topic: requestBody.topic,
resetType: requestBody.resetType,
partitions: requestBody.partitions,
partitionsOffsets: requestBody.partitionsOffsets?.map((offset) => ({
...offset,
offset: +offset.offset,
})),
resetToTimestamp: requestBody.resetToTimestamp?.getTime(),
},
});
dispatch(actions.resetConsumerGroupOffsetsAction.success());
} catch (e) {
const response = await getResponse(e);
const alert: FailurePayload = {
subject: ['consumer-group', consumerGroupID].join('-'),
title: `Consumer Gropup ${consumerGroupID}`,
response,
};
dispatch(actions.resetConsumerGroupOffsetsAction.failure({ alert }));
}
};

View file

@ -25,6 +25,8 @@ const getConsumerGroupDeletingStatus = createFetchingSelector(
'DELETE_CONSUMER_GROUP'
);
const getOffsetResettingStatus = createFetchingSelector('RESET_OFFSET_GROUP');
export const getIsConsumerGroupsListFetched = createSelector(
getConsumerGroupsListFetchingStatus,
(status) => status === 'fetched'
@ -40,6 +42,11 @@ export const getIsConsumerGroupDetailsFetched = createSelector(
(status) => status === 'fetched'
);
export const getOffsetReset = createSelector(
getOffsetResettingStatus,
(status) => status === 'fetched'
);
export const getConsumerGroupsList = createSelector(
getIsConsumerGroupsListFetched,
getConsumerGroupsMap,