import React, { useEffect, useRef } from 'react'; import { ClusterName, SeekType, SeekTypes, TopicMessage, TopicMessageQueryParams, TopicName, TopicPartition, } from 'redux/interfaces'; import PageLoader from 'components/common/PageLoader/PageLoader'; import { format } from 'date-fns'; import DatePicker from 'react-datepicker'; import 'react-datepicker/dist/react-datepicker.css'; import CustomParamButton, { CustomParamButtonType, } from 'components/Topics/shared/Form/CustomParams/CustomParamButton'; import MultiSelect from 'react-multi-select-component'; import * as _ from 'lodash'; import { useDebouncedCallback } from 'use-debounce'; import { Option } from 'react-multi-select-component/dist/lib/interfaces'; interface Props { clusterName: ClusterName; topicName: TopicName; isFetched: boolean; fetchTopicMessages: ( clusterName: ClusterName, topicName: TopicName, queryParams: Partial ) => void; messages: TopicMessage[]; partitions: TopicPartition[]; } interface FilterProps { offset: number; partition: number; } function usePrevious(value: any) { const ref = useRef(); useEffect(() => { ref.current = value; }); return ref.current; } const Messages: React.FC = ({ isFetched, clusterName, topicName, messages, partitions, fetchTopicMessages, }) => { const [searchQuery, setSearchQuery] = React.useState(''); const [searchTimestamp, setSearchTimestamp] = React.useState( null ); const [filterProps, setFilterProps] = React.useState([]); const [selectedSeekType, setSelectedSeekType] = React.useState( SeekTypes.OFFSET ); const [searchOffset, setSearchOffset] = React.useState('0'); const [selectedPartitions, setSelectedPartitions] = React.useState( partitions.map((p) => ({ value: p.partition, label: p.partition.toString(), })) ); const [queryParams, setQueryParams] = React.useState< Partial >({ limit: 100 }); const [debouncedCallback] = useDebouncedCallback( (query: any) => setQueryParams({ ...queryParams, ...query }), 1000 ); const prevSearchTimestamp = usePrevious(searchTimestamp); const getUniqueDataForEachPartition: FilterProps[] = React.useMemo(() => { const partitionUniqs: FilterProps[] = partitions.map((p) => ({ offset: 0, partition: p.partition, })); const messageUniqs: FilterProps[] = _.map( _.groupBy(messages, 'partition'), (v) => _.maxBy(v, 'offset') ).map((v) => ({ offset: v ? v.offset : 0, partition: v ? v.partition : 0, })); return _.map( _.groupBy(_.concat(partitionUniqs, messageUniqs), 'partition'), (v) => _.maxBy(v, 'offset') as FilterProps ); }, [messages, partitions]); const getSeekToValuesForPartitions = (partition: any) => { const foundedValues = filterProps.find( (prop) => prop.partition === partition.value ); if (selectedSeekType === SeekTypes.OFFSET) { return foundedValues ? foundedValues.offset : 0; } return searchTimestamp ? searchTimestamp.getTime() : null; }; React.useEffect(() => { fetchTopicMessages(clusterName, topicName, queryParams); }, [fetchTopicMessages, clusterName, topicName, queryParams]); React.useEffect(() => { setFilterProps(getUniqueDataForEachPartition); }, [messages, partitions]); const handleQueryChange = (event: React.ChangeEvent) => { const query = event.target.value; setSearchQuery(query); debouncedCallback({ q: query }); }; const handleDateTimeChange = () => { if (searchTimestamp !== prevSearchTimestamp) { if (searchTimestamp) { const timestamp: number = searchTimestamp.getTime(); setSearchTimestamp(searchTimestamp); setQueryParams({ ...queryParams, seekType: SeekTypes.TIMESTAMP, seekTo: selectedPartitions.map((p) => `${p.value}::${timestamp}`), }); } else { setSearchTimestamp(null); const { seekTo, seekType, ...queryParamsWithoutSeek } = queryParams; setQueryParams(queryParamsWithoutSeek); } } }; const handleSeekTypeChange = ( event: React.ChangeEvent ) => { setSelectedSeekType(event.target.value as SeekType); }; const handleOffsetChange = (event: React.ChangeEvent) => { const offset = event.target.value || '0'; setSearchOffset(offset); debouncedCallback({ seekType: SeekTypes.OFFSET, seekTo: selectedPartitions.map((p) => `${p.value}::${offset}`), }); }; const handlePartitionsChange = (options: Option[]) => { setSelectedPartitions(options); debouncedCallback({ seekType: options.length > 0 ? selectedSeekType : undefined, seekTo: options.length > 0 ? options.map((p) => `${p.value}::${getSeekToValuesForPartitions(p)}`) : undefined, }); }; const getTimestampDate = (timestamp: number) => { return format(new Date(timestamp * 1000), 'MM.dd.yyyy HH:mm:ss'); }; const getMessageContentHeaders = React.useMemo(() => { const message = messages[0]; const headers: JSX.Element[] = []; try { const content = typeof message.content !== 'object' ? JSON.parse(message.content) : message.content; Object.keys(content).forEach((k) => headers.push({`content.${k}`}) ); } catch (e) { headers.push(Content); } return headers; }, [messages]); const getMessageContentBody = (content: any) => { const columns: JSX.Element[] = []; try { const c = typeof content !== 'object' ? JSON.parse(content) : content; Object.values(c).map((v) => columns.push({JSON.stringify(v)}) ); } catch (e) { columns.push({content}); } return columns; }; const onNext = (event: React.MouseEvent) => { event.preventDefault(); const seekTo: string[] = filterProps .filter( (value) => selectedPartitions.findIndex((p) => p.value === value.partition) > -1 ) .map((p) => `${p.partition}::${p.offset}`); setQueryParams({ ...queryParams, seekType: SeekTypes.OFFSET, seekTo, }); }; const filterOptions = (options: Option[], filter: any) => { if (!filter) { return options; } return options.filter( ({ value }) => value.toString() && value.toString() === filter ); }; const getTopicMessagesTable = () => { return messages.length > 0 ? (
{getMessageContentHeaders} {messages.map((message) => ( {getMessageContentBody(message.content)} ))}
Timestamp Offset Partition
{getTimestampDate(message.timestamp)} {message.offset} {message.partition}
) : (
No messages at selected topic
); }; if (!isFetched) { return ; } return (
({ label: `Partition #${p.partition.toString()}`, value: p.partition, }))} filterOptions={filterOptions} value={selectedPartitions} onChange={handlePartitionsChange} labelledBy="Select partitions" />
{selectedSeekType === SeekTypes.OFFSET ? ( <> ) : ( <> setSearchTimestamp(date)} onCalendarClose={handleDateTimeChange} showTimeInput timeInputLabel="Time:" dateFormat="MMMM d, yyyy h:mm aa" className="input" /> )}
{getTopicMessagesTable()}
); }; export default Messages;