浏览代码

Track and show information regarding last KC restart

When connector restart failed, show error message
Junseok-Hur 2 年之前
父节点
当前提交
d3fb0e4d7b

+ 3 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
 
 import com.google.common.collect.Table;
 import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
+import com.provectus.kafka.ui.model.InternalClusterState;
 import com.provectus.kafka.ui.model.InternalConsumerGroup;
 import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
 import com.provectus.kafka.ui.model.KafkaCluster;
@@ -37,6 +38,7 @@ import reactor.core.publisher.Mono;
 @RequiredArgsConstructor
 public class ConsumerGroupService {
 
+  public static final int CONSUMER_MAX_POLL_RECORDS = 100;
   private final AdminClientService adminClientService;
   private final AccessControlService accessControlService;
 
@@ -224,6 +226,7 @@ public class ConsumerGroupService {
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
     props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
+    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, CONSUMER_MAX_POLL_RECORDS);
     props.putAll(properties);
 
     return new KafkaConsumer<>(props);

+ 4 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java

@@ -265,11 +265,13 @@ public class KafkaConnectService {
                                   String connectorName, Predicate<TaskDTO> taskFilter) {
     return getConnectorTasks(cluster, connectName, connectorName)
         .filter(taskFilter)
-        .flatMap(t ->
-            restartConnectorTask(cluster, connectName, connectorName, t.getId().getTask()))
+        .flatMap(t -> restartConnectorTask(cluster, connectName, connectorName, t.getId().getTask())
+            .onErrorResume(e -> Mono.error(new RuntimeException("Failed to restart task", e)))
+        )
         .then();
   }
 
+
   public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName, String connectorName) {
     return api(cluster, connectName)
         .flux(client ->

+ 197 - 0
kafka-ui-react-app/src/components/ConsumerGroups/Details/ResetOffsets/Form 2.tsx

@@ -0,0 +1,197 @@
+import React from 'react';
+import { useNavigate } from 'react-router-dom';
+import {
+  ConsumerGroupDetails,
+  ConsumerGroupOffsetsReset,
+  ConsumerGroupOffsetsResetType,
+} from 'generated-sources';
+import { ClusterGroupParam } from 'lib/paths';
+import {
+  Controller,
+  FormProvider,
+  useFieldArray,
+  useForm,
+} from 'react-hook-form';
+import { MultiSelect, Option } from 'react-multi-select-component';
+import 'react-datepicker/dist/react-datepicker.css';
+import { ErrorMessage } from '@hookform/error-message';
+import { InputLabel } from 'components/common/Input/InputLabel.styled';
+import { Button } from 'components/common/Button/Button';
+import Input from 'components/common/Input/Input';
+import { FormError } from 'components/common/Input/Input.styled';
+import useAppParams from 'lib/hooks/useAppParams';
+import { useResetConsumerGroupOffsetsMutation } from 'lib/hooks/api/consumers';
+import { FlexFieldset, StyledForm } from 'components/common/Form/Form.styled';
+import ControlledSelect from 'components/common/Select/ControlledSelect';
+
+import * as S from './ResetOffsets.styled';
+
+interface FormProps {
+  defaultValues: ConsumerGroupOffsetsReset;
+  topics: string[];
+  partitions: ConsumerGroupDetails['partitions'];
+}
+
+const resetTypeOptions = Object.values(ConsumerGroupOffsetsResetType).map(
+  (value) => ({ value, label: value })
+);
+
+const Form: React.FC<FormProps> = ({ defaultValues, partitions, topics }) => {
+  const navigate = useNavigate();
+  const routerParams = useAppParams<ClusterGroupParam>();
+  const reset = useResetConsumerGroupOffsetsMutation(routerParams);
+  const topicOptions = React.useMemo(
+    () => topics.map((value) => ({ value, label: value })),
+    [topics]
+  );
+  const methods = useForm<ConsumerGroupOffsetsReset>({
+    mode: 'onChange',
+    defaultValues,
+  });
+
+  const {
+    handleSubmit,
+    setValue,
+    watch,
+    control,
+    formState: { errors },
+  } = methods;
+  const { fields } = useFieldArray({
+    control,
+    name: 'partitionsOffsets',
+  });
+
+  const resetTypeValue = watch('resetType');
+  const topicValue = watch('topic');
+  const offsetsValue = watch('partitionsOffsets');
+  const partitionsValue = watch('partitions') || [];
+
+  const partitionOptions =
+    partitions
+      ?.filter((p) => p.topic === topicValue)
+      .map((p) => ({
+        label: `Partition #${p.partition.toString()}`,
+        value: p.partition,
+      })) || [];
+
+  const onSelectedPartitionsChange = (selected: Option[]) => {
+    setValue(
+      'partitions',
+      selected.map(({ value }) => value)
+    );
+
+    setValue(
+      'partitionsOffsets',
+      selected.map(({ value }) => {
+        const currentOffset = offsetsValue?.find(
+          ({ partition }) => partition === value
+        );
+        return { offset: currentOffset?.offset, partition: value };
+      })
+    );
+  };
+
+  React.useEffect(() => {
+    onSelectedPartitionsChange([]);
+    // eslint-disable-next-line react-hooks/exhaustive-deps
+  }, [topicValue]);
+
+  const onSubmit = async (data: ConsumerGroupOffsetsReset) => {
+    await reset.mutateAsync(data);
+    navigate('../');
+  };
+
+  return (
+    <FormProvider {...methods}>
+      <StyledForm onSubmit={handleSubmit(onSubmit)}>
+        <FlexFieldset>
+          <ControlledSelect
+            name="topic"
+            label="Topic"
+            placeholder="Select Topic"
+            options={topicOptions}
+          />
+          <ControlledSelect
+            name="resetType"
+            label="Reset Type"
+            placeholder="Select Reset Type"
+            options={resetTypeOptions}
+          />
+          <div>
+            <InputLabel>Partitions</InputLabel>
+            <MultiSelect
+              options={partitionOptions}
+              value={partitionsValue.map((p) => ({
+                value: p,
+                label: String(p),
+              }))}
+              onChange={onSelectedPartitionsChange}
+              labelledBy="Select partitions"
+            />
+          </div>
+          {resetTypeValue === ConsumerGroupOffsetsResetType.TIMESTAMP &&
+            partitionsValue.length > 0 && (
+              <div>
+                <InputLabel>Timestamp</InputLabel>
+                <Controller
+                  control={control}
+                  name="resetToTimestamp"
+                  rules={{
+                    required: 'Timestamp is required',
+                  }}
+                  render={({ field: { onChange, onBlur, value, ref } }) => (
+                    <S.DatePickerInput
+                      ref={ref}
+                      selected={new Date(value as number)}
+                      onChange={(e: Date | null) => onChange(e?.getTime())}
+                      onBlur={onBlur}
+                    />
+                  )}
+                />
+                <ErrorMessage
+                  errors={errors}
+                  name="resetToTimestamp"
+                  render={({ message }) => <FormError>{message}</FormError>}
+                />
+              </div>
+            )}
+
+          {resetTypeValue === ConsumerGroupOffsetsResetType.OFFSET &&
+            partitionsValue.length > 0 && (
+              <S.OffsetsWrapper>
+                {fields.map((field, index) => (
+                  <Input
+                    key={field.id}
+                    label={`Partition #${field.partition} Offset`}
+                    type="number"
+                    name={`partitionsOffsets.${index}.offset` as const}
+                    hookFormOptions={{
+                      shouldUnregister: true,
+                      required: 'Offset is required',
+                      min: {
+                        value: 0,
+                        message: 'must be greater than or equal to 0',
+                      },
+                    }}
+                    withError
+                  />
+                ))}
+              </S.OffsetsWrapper>
+            )}
+        </FlexFieldset>
+        <div>
+          <Button
+            buttonSize="M"
+            buttonType="primary"
+            type="submit"
+            disabled={partitionsValue.length === 0}
+          >
+            Submit
+          </Button>
+        </div>
+      </StyledForm>
+    </FormProvider>
+  );
+};
+
+export default Form;

+ 106 - 0
kafka-ui-react-app/src/components/ConsumerGroups/List 2.tsx

@@ -0,0 +1,106 @@
+import React from 'react';
+import PageHeading from 'components/common/PageHeading/PageHeading';
+import Search from 'components/common/Search/Search';
+import { ControlPanelWrapper } from 'components/common/ControlPanel/ControlPanel.styled';
+import {
+  ConsumerGroupDetails,
+  ConsumerGroupOrdering,
+  SortOrder,
+} from 'generated-sources';
+import useAppParams from 'lib/hooks/useAppParams';
+import { clusterConsumerGroupDetailsPath, ClusterNameRoute } from 'lib/paths';
+import { ColumnDef } from '@tanstack/react-table';
+import Table, { TagCell, LinkCell } from 'components/common/NewTable';
+import { useNavigate, useSearchParams } from 'react-router-dom';
+import { PER_PAGE } from 'lib/constants';
+import { useConsumerGroups } from 'lib/hooks/api/consumers';
+
+const List = () => {
+  const { clusterName } = useAppParams<ClusterNameRoute>();
+  const [searchParams] = useSearchParams();
+  const navigate = useNavigate();
+
+  const consumerGroups = useConsumerGroups({
+    clusterName,
+    orderBy: (searchParams.get('sortBy') as ConsumerGroupOrdering) || undefined,
+    sortOrder:
+      (searchParams.get('sortDirection')?.toUpperCase() as SortOrder) ||
+      undefined,
+    page: Number(searchParams.get('page') || 1),
+    perPage: Number(searchParams.get('perPage') || PER_PAGE),
+    search: searchParams.get('q') || '',
+  });
+
+  const columns = React.useMemo<ColumnDef<ConsumerGroupDetails>[]>(
+    () => [
+      {
+        id: ConsumerGroupOrdering.NAME,
+        header: 'Group ID',
+        accessorKey: 'groupId',
+        // eslint-disable-next-line react/no-unstable-nested-components
+        cell: ({ getValue }) => (
+          <LinkCell
+            value={`${getValue<string | number>()}`}
+            to={encodeURIComponent(`${getValue<string | number>()}`)}
+          />
+        ),
+      },
+      {
+        id: ConsumerGroupOrdering.MEMBERS,
+        header: 'Num Of Members',
+        accessorKey: 'members',
+      },
+      {
+        header: 'Num Of Topics',
+        accessorKey: 'topics',
+        enableSorting: false,
+      },
+      {
+        header: 'Messages Behind',
+        accessorKey: 'messagesBehind',
+        enableSorting: false,
+      },
+      {
+        header: 'Coordinator',
+        accessorKey: 'coordinator.id',
+        enableSorting: false,
+      },
+      {
+        id: ConsumerGroupOrdering.STATE,
+        header: 'State',
+        accessorKey: 'state',
+        cell: TagCell,
+      },
+    ],
+    []
+  );
+
+  return (
+    <>
+      <PageHeading text="Consumers" />
+      <ControlPanelWrapper hasInput>
+        <Search placeholder="Search by Consumer Group ID" />
+      </ControlPanelWrapper>
+      <Table
+        columns={columns}
+        pageCount={consumerGroups.data?.pageCount || 0}
+        data={consumerGroups.data?.consumerGroups || []}
+        emptyMessage={
+          consumerGroups.isSuccess
+            ? 'No active consumer groups found'
+            : 'Loading...'
+        }
+        serverSideProcessing
+        enableSorting
+        onRowClick={({ original }) =>
+          navigate(
+            clusterConsumerGroupDetailsPath(clusterName, original.groupId)
+          )
+        }
+        disabled={consumerGroups.isFetching}
+      />
+    </>
+  );
+};
+
+export default List;

+ 39 - 0
kafka-ui-react-app/src/components/KsqlDb/TableView 2.tsx

@@ -0,0 +1,39 @@
+import React from 'react';
+import { KsqlStreamDescription, KsqlTableDescription } from 'generated-sources';
+import Table from 'components/common/NewTable';
+import { ColumnDef } from '@tanstack/react-table';
+
+interface TableViewProps {
+  fetching: boolean;
+  rows: KsqlTableDescription[] | KsqlStreamDescription[];
+}
+
+const TableView: React.FC<TableViewProps> = ({ fetching, rows }) => {
+  const columns = React.useMemo<
+    ColumnDef<KsqlTableDescription | KsqlStreamDescription>[]
+  >(
+    () => [
+      { header: 'Name', accessorKey: 'name' },
+      { header: 'Topic', accessorKey: 'topic' },
+      { header: 'Key Format', accessorKey: 'keyFormat' },
+      { header: 'Value Format', accessorKey: 'valueFormat' },
+      {
+        header: 'Is Windowed',
+        accessorKey: 'isWindowed',
+        cell: ({ row }) =>
+          'isWindowed' in row.original ? String(row.original.isWindowed) : '-',
+      },
+    ],
+    []
+  );
+  return (
+    <Table
+      data={rows || []}
+      columns={columns}
+      emptyMessage={fetching ? 'Loading...' : 'No rows found'}
+      enableSorting={false}
+    />
+  );
+};
+
+export default TableView;

+ 53 - 0
kafka-ui-react-app/src/lib/fixtures/consumerGroups 2.ts

@@ -0,0 +1,53 @@
+import { ConsumerGroupState } from 'generated-sources';
+
+export const consumerGroupPayload = {
+  groupId: 'amazon.msk.canary.group.broker-1',
+  members: 0,
+  topics: 2,
+  simple: false,
+  partitionAssignor: '',
+  state: ConsumerGroupState.EMPTY,
+  coordinator: {
+    id: 2,
+    host: 'b-2.kad-msk.st2jzq.c6.kafka.eu-west-1.amazonaws.com',
+  },
+  messagesBehind: 0,
+  partitions: [
+    {
+      topic: '__amazon_msk_canary',
+      partition: 1,
+      currentOffset: 0,
+      endOffset: 0,
+      messagesBehind: 0,
+      consumerId: undefined,
+      host: undefined,
+    },
+    {
+      topic: '__amazon_msk_canary',
+      partition: 0,
+      currentOffset: 56932,
+      endOffset: 56932,
+      messagesBehind: 0,
+      consumerId: undefined,
+      host: undefined,
+    },
+    {
+      topic: 'other_topic',
+      partition: 3,
+      currentOffset: 56932,
+      endOffset: 56932,
+      messagesBehind: 0,
+      consumerId: undefined,
+      host: undefined,
+    },
+    {
+      topic: 'other_topic',
+      partition: 4,
+      currentOffset: 56932,
+      endOffset: 56932,
+      messagesBehind: 0,
+      consumerId: undefined,
+      host: undefined,
+    },
+  ],
+};