|
@@ -3,6 +3,7 @@ import 'react-datepicker/dist/react-datepicker.css';
|
|
|
import {
|
|
|
MessageFilterType,
|
|
|
Partition,
|
|
|
+ PollingMode,
|
|
|
SeekDirection,
|
|
|
SeekType,
|
|
|
SerdeUsage,
|
|
@@ -10,6 +11,7 @@ import {
|
|
|
TopicMessageConsuming,
|
|
|
TopicMessageEvent,
|
|
|
TopicMessageEventTypeEnum,
|
|
|
+ TopicMessageNextPageCursor,
|
|
|
} from 'generated-sources';
|
|
|
import React, { useContext } from 'react';
|
|
|
import omitBy from 'lodash/omitBy';
|
|
@@ -35,18 +37,22 @@ import CloseIcon from 'components/common/Icons/CloseIcon';
|
|
|
import ClockIcon from 'components/common/Icons/ClockIcon';
|
|
|
import ArrowDownIcon from 'components/common/Icons/ArrowDownIcon';
|
|
|
import FileIcon from 'components/common/Icons/FileIcon';
|
|
|
-import { useTopicDetails } from 'lib/hooks/api/topics';
|
|
|
+import { useRegisterFilter, useTopicDetails } from 'lib/hooks/api/topics';
|
|
|
import { InputLabel } from 'components/common/Input/InputLabel.styled';
|
|
|
import { getSerdeOptions } from 'components/Topics/Topic/SendMessage/utils';
|
|
|
import { useSerdes } from 'lib/hooks/api/topicMessages';
|
|
|
+import { getTopicMessgesLastLoadedPage } from 'redux/reducers/topicMessages/selectors';
|
|
|
+import { useAppSelector } from 'lib/hooks/redux';
|
|
|
+import { showAlert } from 'lib/errorHandling';
|
|
|
|
|
|
-import * as S from './Filters.styled';
|
|
|
+import { getDefaultSerdeName } from './getDefaultSerdeName';
|
|
|
import {
|
|
|
filterOptions,
|
|
|
getOffsetFromSeekToParam,
|
|
|
getSelectedPartitionsFromSeekToParam,
|
|
|
getTimestampFromSeekToParam,
|
|
|
} from './utils';
|
|
|
+import * as S from './Filters.styled';
|
|
|
|
|
|
type Query = Record<string, string | string[] | number>;
|
|
|
|
|
@@ -55,12 +61,18 @@ export interface FiltersProps {
|
|
|
meta: TopicMessageConsuming;
|
|
|
isFetching: boolean;
|
|
|
messageEventType?: string;
|
|
|
+ cursor?: TopicMessageNextPageCursor;
|
|
|
+ currentPage: number;
|
|
|
addMessage(content: { message: TopicMessage; prepend: boolean }): void;
|
|
|
resetMessages(): void;
|
|
|
updatePhase(phase: string): void;
|
|
|
updateMeta(meta: TopicMessageConsuming): void;
|
|
|
setIsFetching(status: boolean): void;
|
|
|
setMessageType(messageType: string): void;
|
|
|
+ updateCursor(cursor?: TopicMessageNextPageCursor): void;
|
|
|
+ setCurrentPage(page: number): void;
|
|
|
+ setLastLoadedPage(page: number): void;
|
|
|
+ resetAllMessages(): void;
|
|
|
}
|
|
|
|
|
|
export interface MessageFilters {
|
|
@@ -85,6 +97,7 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
phaseMessage,
|
|
|
meta: { elapsedMs, bytesConsumed, messagesConsumed, filterApplyErrors },
|
|
|
isFetching,
|
|
|
+ currentPage,
|
|
|
addMessage,
|
|
|
resetMessages,
|
|
|
updatePhase,
|
|
@@ -92,19 +105,25 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
setIsFetching,
|
|
|
setMessageType,
|
|
|
messageEventType,
|
|
|
+ updateCursor,
|
|
|
+ setCurrentPage,
|
|
|
+ setLastLoadedPage,
|
|
|
+ resetAllMessages,
|
|
|
}) => {
|
|
|
const { clusterName, topicName } = useAppParams<RouteParamsClusterTopic>();
|
|
|
const location = useLocation();
|
|
|
const navigate = useNavigate();
|
|
|
const [searchParams] = useSearchParams();
|
|
|
|
|
|
- const page = searchParams.get('page');
|
|
|
-
|
|
|
const { data: topic } = useTopicDetails({ clusterName, topicName });
|
|
|
|
|
|
+ const registerFilter = useRegisterFilter({ clusterName, topicName });
|
|
|
+
|
|
|
+ const lastLoadedPage = useAppSelector(getTopicMessgesLastLoadedPage);
|
|
|
+
|
|
|
const partitions = topic?.partitions || [];
|
|
|
|
|
|
- const { seekDirection, isLive, changeSeekDirection } =
|
|
|
+ const { seekDirection, isLive, changeSeekDirection, page, setPage } =
|
|
|
useContext(TopicMessagesContext);
|
|
|
|
|
|
const { value: isOpen, toggle } = useBoolean();
|
|
@@ -131,11 +150,18 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
const [timestamp, setTimestamp] = React.useState<Date | null>(
|
|
|
getTimestampFromSeekToParam(searchParams)
|
|
|
);
|
|
|
+
|
|
|
+ const { data: serdes = {} } = useSerdes({
|
|
|
+ clusterName,
|
|
|
+ topicName,
|
|
|
+ use: SerdeUsage.DESERIALIZE,
|
|
|
+ });
|
|
|
+
|
|
|
const [keySerde, setKeySerde] = React.useState<string>(
|
|
|
- searchParams.get('keySerde') || ''
|
|
|
+ searchParams.get('keySerde') || getDefaultSerdeName(serdes.key || [])
|
|
|
);
|
|
|
const [valueSerde, setValueSerde] = React.useState<string>(
|
|
|
- searchParams.get('valueSerde') || ''
|
|
|
+ searchParams.get('valueSerde') || getDefaultSerdeName(serdes.value || [])
|
|
|
);
|
|
|
|
|
|
const [savedFilters, setSavedFilters] = React.useState<MessageFilters[]>(
|
|
@@ -155,7 +181,7 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
? MessageFilterType.GROOVY_SCRIPT
|
|
|
: MessageFilterType.STRING_CONTAINS
|
|
|
);
|
|
|
- const [query, setQuery] = React.useState<string>(searchParams.get('q') || '');
|
|
|
+ const [stringFilter, setStringFilter] = React.useState<string>('');
|
|
|
const [isTailing, setIsTailing] = React.useState<boolean>(isLive);
|
|
|
|
|
|
const isSeekTypeControlVisible = React.useMemo(
|
|
@@ -173,23 +199,12 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
return false;
|
|
|
}, [isSeekTypeControlVisible, currentSeekType, timestamp, isTailing]);
|
|
|
|
|
|
- const partitionMap = React.useMemo(
|
|
|
- () =>
|
|
|
- partitions.reduce<Record<string, Partition>>(
|
|
|
- (acc, partition) => ({
|
|
|
- ...acc,
|
|
|
- [partition.partition]: partition,
|
|
|
- }),
|
|
|
- {}
|
|
|
- ),
|
|
|
- [partitions]
|
|
|
- );
|
|
|
-
|
|
|
const handleClearAllFilters = () => {
|
|
|
setCurrentSeekType(SeekType.OFFSET);
|
|
|
setOffset('');
|
|
|
setTimestamp(null);
|
|
|
- setQuery('');
|
|
|
+ setStringFilter('');
|
|
|
+ setPage(1);
|
|
|
changeSeekDirection(SeekDirection.FORWARD);
|
|
|
getSelectedPartitionsFromSeekToParam(searchParams, partitions);
|
|
|
setSelectedPartitions(
|
|
@@ -202,65 +217,60 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
);
|
|
|
};
|
|
|
|
|
|
- const handleFiltersSubmit = (currentOffset: string) => {
|
|
|
- const nextAttempt = Number(searchParams.get('attempt') || 0) + 1;
|
|
|
+ const getPollingMode = (): PollingMode => {
|
|
|
+ if (seekDirection === SeekDirection.FORWARD) {
|
|
|
+ if (offset && currentSeekType === SeekType.OFFSET)
|
|
|
+ return PollingMode.FROM_OFFSET;
|
|
|
+ if (timestamp && currentSeekType === SeekType.TIMESTAMP)
|
|
|
+ return PollingMode.FROM_TIMESTAMP;
|
|
|
+ return PollingMode.EARLIEST;
|
|
|
+ }
|
|
|
+ if (seekDirection === SeekDirection.BACKWARD) {
|
|
|
+ if (offset && currentSeekType === SeekType.OFFSET)
|
|
|
+ return PollingMode.TO_OFFSET;
|
|
|
+ if (timestamp && currentSeekType === SeekType.TIMESTAMP)
|
|
|
+ return PollingMode.TO_TIMESTAMP;
|
|
|
+ return PollingMode.LATEST;
|
|
|
+ }
|
|
|
+ if (seekDirection === SeekDirection.TAILING) return PollingMode.TAILING;
|
|
|
+ return PollingMode.LATEST;
|
|
|
+ };
|
|
|
+
|
|
|
+ const getSmartFilterId = async (code: string) => {
|
|
|
+ try {
|
|
|
+ const filterId = await registerFilter.mutateAsync({
|
|
|
+ filterCode: code,
|
|
|
+ });
|
|
|
+ return filterId;
|
|
|
+ } catch (e) {
|
|
|
+ showAlert('error', {
|
|
|
+ message: 'Error occured while registering smart filter',
|
|
|
+ });
|
|
|
+ return '';
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ const handleFiltersSubmit = async (cursor?: TopicMessageNextPageCursor) => {
|
|
|
+ if (!keySerde || !valueSerde) return;
|
|
|
const props: Query = {
|
|
|
- q:
|
|
|
- queryType === MessageFilterType.GROOVY_SCRIPT
|
|
|
- ? activeFilter.code
|
|
|
- : query,
|
|
|
- filterQueryType: queryType,
|
|
|
- attempt: nextAttempt,
|
|
|
+ mode: getPollingMode(),
|
|
|
limit: PER_PAGE,
|
|
|
- page: page || 0,
|
|
|
- seekDirection,
|
|
|
+ stringFilter,
|
|
|
+ offset,
|
|
|
+ timestamp: timestamp?.getTime() || 0,
|
|
|
keySerde: keySerde || searchParams.get('keySerde') || '',
|
|
|
valueSerde: valueSerde || searchParams.get('valueSerde') || '',
|
|
|
};
|
|
|
|
|
|
- if (isSeekTypeControlVisible) {
|
|
|
- switch (seekDirection) {
|
|
|
- case SeekDirection.FORWARD:
|
|
|
- props.seekType = SeekType.BEGINNING;
|
|
|
- break;
|
|
|
- case SeekDirection.BACKWARD:
|
|
|
- case SeekDirection.TAILING:
|
|
|
- props.seekType = SeekType.LATEST;
|
|
|
- break;
|
|
|
- default:
|
|
|
- props.seekType = currentSeekType;
|
|
|
- }
|
|
|
-
|
|
|
- if (offset && currentSeekType === SeekType.OFFSET) {
|
|
|
- props.seekType = SeekType.OFFSET;
|
|
|
- }
|
|
|
+ if (cursor?.id) props.cursor = cursor?.id;
|
|
|
|
|
|
- if (timestamp && currentSeekType === SeekType.TIMESTAMP) {
|
|
|
- props.seekType = SeekType.TIMESTAMP;
|
|
|
- }
|
|
|
+ if (selectedPartitions.length !== partitions.length) {
|
|
|
+ props.partitions = selectedPartitions.map((p) => p.value);
|
|
|
+ }
|
|
|
|
|
|
- const isSeekTypeWithSeekTo =
|
|
|
- props.seekType === SeekType.TIMESTAMP ||
|
|
|
- props.seekType === SeekType.OFFSET;
|
|
|
-
|
|
|
- if (
|
|
|
- selectedPartitions.length !== partitions.length ||
|
|
|
- isSeekTypeWithSeekTo
|
|
|
- ) {
|
|
|
- // not everything in the partition is selected
|
|
|
- props.seekTo = selectedPartitions.map(({ value }) => {
|
|
|
- const offsetProperty =
|
|
|
- seekDirection === SeekDirection.FORWARD ? 'offsetMin' : 'offsetMax';
|
|
|
- const offsetBasedSeekTo =
|
|
|
- currentOffset || partitionMap[value][offsetProperty];
|
|
|
- const seekToOffset =
|
|
|
- currentSeekType === SeekType.OFFSET
|
|
|
- ? offsetBasedSeekTo
|
|
|
- : timestamp?.getTime();
|
|
|
-
|
|
|
- return `${value}::${seekToOffset || '0'}`;
|
|
|
- });
|
|
|
- }
|
|
|
+ if (queryType === MessageFilterType.GROOVY_SCRIPT) {
|
|
|
+ props.smartFilterId =
|
|
|
+ (await getSmartFilterId(activeFilter.code))?.id || '';
|
|
|
}
|
|
|
|
|
|
const newProps = omitBy(props, (v) => v === undefined || v === '');
|
|
@@ -272,6 +282,12 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
});
|
|
|
};
|
|
|
|
|
|
+ const handleSubmit = async () => {
|
|
|
+ setPage(1);
|
|
|
+ resetAllMessages();
|
|
|
+ handleFiltersSubmit();
|
|
|
+ };
|
|
|
+
|
|
|
const handleSSECancel = () => {
|
|
|
if (!source.current) return;
|
|
|
setIsFetching(false);
|
|
@@ -345,9 +361,15 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
// eslint-disable-next-line consistent-return
|
|
|
React.useEffect(() => {
|
|
|
if (location.search?.length !== 0) {
|
|
|
+ if (page === currentPage) return () => {};
|
|
|
+ if (page <= lastLoadedPage) {
|
|
|
+ setCurrentPage(page);
|
|
|
+ return () => {};
|
|
|
+ }
|
|
|
+
|
|
|
const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
|
|
|
clusterName
|
|
|
- )}/topics/${topicName}/messages${location.search}`;
|
|
|
+ )}/topics/${topicName}/messages/v2${location.search}`;
|
|
|
const sse = new EventSource(url);
|
|
|
|
|
|
source.current = sse;
|
|
@@ -358,7 +380,7 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
setIsFetching(true);
|
|
|
};
|
|
|
sse.onmessage = ({ data }) => {
|
|
|
- const { type, message, phase, consuming }: TopicMessageEvent =
|
|
|
+ const { type, message, phase, consuming, cursor }: TopicMessageEvent =
|
|
|
JSON.parse(data);
|
|
|
switch (type) {
|
|
|
case TopicMessageEventTypeEnum.MESSAGE:
|
|
@@ -381,6 +403,10 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
if (consuming && type) {
|
|
|
setMessageType(type);
|
|
|
updateMeta(consuming);
|
|
|
+ updateCursor(cursor);
|
|
|
+ setCurrentPage(page);
|
|
|
+ setLastLoadedPage(page);
|
|
|
+ handleFiltersSubmit(cursor);
|
|
|
}
|
|
|
break;
|
|
|
default:
|
|
@@ -407,10 +433,15 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
setIsFetching,
|
|
|
updateMeta,
|
|
|
updatePhase,
|
|
|
+ updateCursor,
|
|
|
+ setLastLoadedPage,
|
|
|
]);
|
|
|
+
|
|
|
React.useEffect(() => {
|
|
|
if (location.search?.length === 0) {
|
|
|
- handleFiltersSubmit(offset);
|
|
|
+ setPage(1);
|
|
|
+ resetAllMessages();
|
|
|
+ handleFiltersSubmit();
|
|
|
}
|
|
|
}, [
|
|
|
seekDirection,
|
|
@@ -418,32 +449,32 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
activeFilter,
|
|
|
currentSeekType,
|
|
|
timestamp,
|
|
|
- query,
|
|
|
+ stringFilter,
|
|
|
location,
|
|
|
]);
|
|
|
+
|
|
|
React.useEffect(() => {
|
|
|
- handleFiltersSubmit(offset);
|
|
|
+ setPage(1);
|
|
|
+ resetAllMessages();
|
|
|
+ handleFiltersSubmit();
|
|
|
}, [
|
|
|
seekDirection,
|
|
|
queryType,
|
|
|
- activeFilter,
|
|
|
currentSeekType,
|
|
|
- timestamp,
|
|
|
- query,
|
|
|
seekDirection,
|
|
|
- page,
|
|
|
+ keySerde,
|
|
|
+ valueSerde,
|
|
|
]);
|
|
|
|
|
|
+ React.useEffect(() => {
|
|
|
+ setPage(1);
|
|
|
+ resetAllMessages();
|
|
|
+ }, [selectedPartitions, offset, timestamp, stringFilter, activeFilter]);
|
|
|
+
|
|
|
React.useEffect(() => {
|
|
|
setIsTailing(isLive);
|
|
|
}, [isLive]);
|
|
|
|
|
|
- const { data: serdes = {} } = useSerdes({
|
|
|
- clusterName,
|
|
|
- topicName,
|
|
|
- use: SerdeUsage.DESERIALIZE,
|
|
|
- });
|
|
|
-
|
|
|
return (
|
|
|
<S.FiltersWrapper>
|
|
|
<div>
|
|
@@ -530,9 +561,7 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
buttonType="secondary"
|
|
|
buttonSize="M"
|
|
|
disabled={isSubmitDisabled}
|
|
|
- onClick={() =>
|
|
|
- isFetching ? handleSSECancel() : handleFiltersSubmit(offset)
|
|
|
- }
|
|
|
+ onClick={() => (isFetching ? handleSSECancel() : handleSubmit())}
|
|
|
style={{ fontWeight: 500 }}
|
|
|
>
|
|
|
{isFetching ? 'Cancel' : 'Submit'}
|
|
@@ -548,7 +577,11 @@ const Filters: React.FC<FiltersProps> = ({
|
|
|
/>
|
|
|
</div>
|
|
|
<S.ActiveSmartFilterWrapper>
|
|
|
- <Search placeholder="Search" disabled={isTailing} />
|
|
|
+ <Search
|
|
|
+ placeholder="Search"
|
|
|
+ disabled={isTailing}
|
|
|
+ onChange={setStringFilter}
|
|
|
+ />
|
|
|
|
|
|
<Button buttonType="secondary" buttonSize="M" onClick={toggle}>
|
|
|
<PlusIcon />
|