Browse Source

Revert "[ISSUE 1046] UI allows to submit message with empty key & value (#1264)"

This reverts commit 4b730eb288b1d157a2867105c6f693d117bcf1cc.
Roman Zabaluev 3 years ago
parent
commit
fd83f190f5

+ 1 - 7
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -1870,18 +1870,12 @@ components:
           type: integer
           type: integer
         key:
         key:
           type: string
           type: string
-          nullable: true
         headers:
         headers:
           type: object
           type: object
           additionalProperties:
           additionalProperties:
             type: string
             type: string
         content:
         content:
           type: string
           type: string
-          nullable: true
-      required:
-        - key
-        - content
-        - partition
 
 
     TopicMessageSchema:
     TopicMessageSchema:
       type: object
       type: object
@@ -2641,4 +2635,4 @@ components:
         - DELETE
         - DELETE
         - COMPACT
         - COMPACT
         - COMPACT_DELETE
         - COMPACT_DELETE
-        - UNKNOWN
+        - UNKNOWN

+ 2 - 8
kafka-ui-react-app/src/components/Alerts/Alert.styled.ts

@@ -3,7 +3,7 @@ import styled from 'styled-components';
 
 
 export const Alert = styled.div<{ $type: AlertType }>`
 export const Alert = styled.div<{ $type: AlertType }>`
   background-color: ${({ $type, theme }) => theme.alert.color[$type]};
   background-color: ${({ $type, theme }) => theme.alert.color[$type]};
-  min-width: 400px;
+  width: 400px;
   min-height: 64px;
   min-height: 64px;
   border-radius: 8px;
   border-radius: 8px;
   padding: 12px;
   padding: 12px;
@@ -20,14 +20,8 @@ export const Title = styled.div`
   font-size: 14px;
   font-size: 14px;
 `;
 `;
 
 
-export const Message = styled.div`
+export const Message = styled.p`
   font-weight: normal;
   font-weight: normal;
   font-size: 14px;
   font-size: 14px;
   margin: 3px 0;
   margin: 3px 0;
-
-  ol,
-  ul {
-    padding-left: 25px;
-    list-style: auto;
-  }
 `;
 `;

+ 1 - 1
kafka-ui-react-app/src/components/App.styled.ts

@@ -168,7 +168,7 @@ export const AlertsContainer = styled.div`
   width: 500px;
   width: 500px;
   position: fixed;
   position: fixed;
   bottom: 15px;
   bottom: 15px;
-  right: 15px;
+  left: 15px;
   z-index: 1000;
   z-index: 1000;
 
 
   @media screen and (max-width: 1023px) {
   @media screen and (max-width: 1023px) {

+ 88 - 101
kafka-ui-react-app/src/components/Topics/Topic/SendMessage/SendMessage.tsx

@@ -1,32 +1,47 @@
 import JSONEditor from 'components/common/JSONEditor/JSONEditor';
 import JSONEditor from 'components/common/JSONEditor/JSONEditor';
 import PageLoader from 'components/common/PageLoader/PageLoader';
 import PageLoader from 'components/common/PageLoader/PageLoader';
+import {
+  CreateTopicMessage,
+  Partition,
+  TopicMessageSchema,
+} from 'generated-sources';
 import React from 'react';
 import React from 'react';
 import { useForm, Controller } from 'react-hook-form';
 import { useForm, Controller } from 'react-hook-form';
-import { useHistory, useParams } from 'react-router';
+import { useHistory } from 'react-router';
 import { clusterTopicMessagesPath } from 'lib/paths';
 import { clusterTopicMessagesPath } from 'lib/paths';
 import jsf from 'json-schema-faker';
 import jsf from 'json-schema-faker';
-import { fetchTopicMessageSchema, messagesApiClient } from 'redux/actions';
-import { useAppDispatch, useAppSelector } from 'lib/hooks/redux';
-import { alertAdded } from 'redux/reducers/alerts/alertsSlice';
-import { now } from 'lodash';
-import { Button } from 'components/common/Button/Button';
-import { ClusterName, TopicName } from 'redux/interfaces';
-import {
-  getMessageSchemaByTopicName,
-  getPartitionsByTopicName,
-  getTopicMessageSchemaFetched,
-} from 'redux/reducers/topics/selectors';
 
 
 import validateMessage from './validateMessage';
 import validateMessage from './validateMessage';
 
 
-interface RouterParams {
-  clusterName: ClusterName;
-  topicName: TopicName;
+export interface Props {
+  clusterName: string;
+  topicName: string;
+  fetchTopicMessageSchema: (clusterName: string, topicName: string) => void;
+  sendTopicMessage: (
+    clusterName: string,
+    topicName: string,
+    payload: CreateTopicMessage
+  ) => void;
+  messageSchema: TopicMessageSchema | undefined;
+  schemaIsFetched: boolean;
+  messageIsSending: boolean;
+  partitions: Partition[];
 }
 }
 
 
-const SendMessage: React.FC = () => {
-  const dispatch = useAppDispatch();
-  const { clusterName, topicName } = useParams<RouterParams>();
+const SendMessage: React.FC<Props> = ({
+  clusterName,
+  topicName,
+  fetchTopicMessageSchema,
+  sendTopicMessage,
+  messageSchema,
+  schemaIsFetched,
+  messageIsSending,
+  partitions,
+}) => {
+  const [keyExampleValue, setKeyExampleValue] = React.useState('');
+  const [contentExampleValue, setContentExampleValue] = React.useState('');
+  const [schemaIsReady, setSchemaIsReady] = React.useState(false);
+  const [schemaErrors, setSchemaErrors] = React.useState<string[]>([]);
   const {
   const {
     register,
     register,
     handleSubmit,
     handleSubmit,
@@ -39,38 +54,27 @@ const SendMessage: React.FC = () => {
   jsf.option('alwaysFakeOptionals', true);
   jsf.option('alwaysFakeOptionals', true);
 
 
   React.useEffect(() => {
   React.useEffect(() => {
-    dispatch(fetchTopicMessageSchema(clusterName, topicName));
+    fetchTopicMessageSchema(clusterName, topicName);
   }, []);
   }, []);
-
-  const messageSchema = useAppSelector((state) =>
-    getMessageSchemaByTopicName(state, topicName)
-  );
-  const partitions = useAppSelector((state) =>
-    getPartitionsByTopicName(state, topicName)
-  );
-  const schemaIsFetched = useAppSelector(getTopicMessageSchemaFetched);
-
-  const keyDefaultValue = React.useMemo(() => {
-    if (!schemaIsFetched || !messageSchema) {
-      return undefined;
-    }
-    return JSON.stringify(
-      jsf.generate(JSON.parse(messageSchema.key.schema)),
-      null,
-      '\t'
-    );
-  }, [messageSchema, schemaIsFetched]);
-
-  const contentDefaultValue = React.useMemo(() => {
-    if (!schemaIsFetched || !messageSchema) {
-      return undefined;
+  React.useEffect(() => {
+    if (schemaIsFetched && messageSchema) {
+      setKeyExampleValue(
+        JSON.stringify(
+          jsf.generate(JSON.parse(messageSchema.key.schema)),
+          null,
+          '\t'
+        )
+      );
+      setContentExampleValue(
+        JSON.stringify(
+          jsf.generate(JSON.parse(messageSchema.value.schema)),
+          null,
+          '\t'
+        )
+      );
+      setSchemaIsReady(true);
     }
     }
-    return JSON.stringify(
-      jsf.generate(JSON.parse(messageSchema.value.schema)),
-      null,
-      '\t'
-    );
-  }, [messageSchema, schemaIsFetched]);
+  }, [schemaIsFetched]);
 
 
   const onSubmit = async (data: {
   const onSubmit = async (data: {
     key: string;
     key: string;
@@ -79,55 +83,30 @@ const SendMessage: React.FC = () => {
     partition: number;
     partition: number;
   }) => {
   }) => {
     if (messageSchema) {
     if (messageSchema) {
-      const { partition, key, content } = data;
+      const key = data.key || keyExampleValue;
+      const content = data.content || contentExampleValue;
+      const { partition } = data;
       const headers = data.headers ? JSON.parse(data.headers) : undefined;
       const headers = data.headers ? JSON.parse(data.headers) : undefined;
-      const errors = validateMessage(key, content, messageSchema);
-      if (errors.length > 0) {
-        dispatch(
-          alertAdded({
-            id: `${clusterName}-${topicName}-createTopicMessageError`,
-            type: 'error',
-            title: 'Validation Error',
-            message: (
-              <ul>
-                {errors.map((e) => (
-                  <li>{e}</li>
-                ))}
-              </ul>
-            ),
-            createdAt: now(),
-          })
-        );
-        return;
-      }
+      const messageIsValid = await validateMessage(
+        key,
+        content,
+        messageSchema,
+        setSchemaErrors
+      );
 
 
-      try {
-        await messagesApiClient.sendTopicMessages({
-          clusterName,
-          topicName,
-          createTopicMessage: {
-            key: !key ? null : key,
-            content: !content ? null : content,
-            headers,
-            partition,
-          },
+      if (messageIsValid) {
+        sendTopicMessage(clusterName, topicName, {
+          key,
+          content,
+          headers,
+          partition,
         });
         });
-      } catch (e) {
-        dispatch(
-          alertAdded({
-            id: `${clusterName}-${topicName}-sendTopicMessagesError`,
-            type: 'error',
-            title: `Error in sending a message to ${topicName}`,
-            message: e?.message,
-            createdAt: now(),
-          })
-        );
+        history.push(clusterTopicMessagesPath(clusterName, topicName));
       }
       }
-      history.push(clusterTopicMessagesPath(clusterName, topicName));
     }
     }
   };
   };
 
 
-  if (!schemaIsFetched) {
+  if (!schemaIsReady) {
     return <PageLoader />;
     return <PageLoader />;
   }
   }
   return (
   return (
@@ -142,7 +121,7 @@ const SendMessage: React.FC = () => {
               <select
               <select
                 id="select"
                 id="select"
                 defaultValue={partitions[0].partition}
                 defaultValue={partitions[0].partition}
-                disabled={isSubmitting}
+                disabled={isSubmitting || messageIsSending}
                 {...register('partition')}
                 {...register('partition')}
               >
               >
                 {partitions.map((partition) => (
                 {partitions.map((partition) => (
@@ -163,8 +142,8 @@ const SendMessage: React.FC = () => {
               name="key"
               name="key"
               render={({ field: { name, onChange } }) => (
               render={({ field: { name, onChange } }) => (
                 <JSONEditor
                 <JSONEditor
-                  readOnly={isSubmitting}
-                  defaultValue={keyDefaultValue}
+                  readOnly={isSubmitting || messageIsSending}
+                  defaultValue={keyExampleValue}
                   name={name}
                   name={name}
                   onChange={onChange}
                   onChange={onChange}
                 />
                 />
@@ -178,8 +157,8 @@ const SendMessage: React.FC = () => {
               name="content"
               name="content"
               render={({ field: { name, onChange } }) => (
               render={({ field: { name, onChange } }) => (
                 <JSONEditor
                 <JSONEditor
-                  readOnly={isSubmitting}
-                  defaultValue={contentDefaultValue}
+                  readOnly={isSubmitting || messageIsSending}
+                  defaultValue={contentExampleValue}
                   name={name}
                   name={name}
                   onChange={onChange}
                   onChange={onChange}
                 />
                 />
@@ -195,7 +174,7 @@ const SendMessage: React.FC = () => {
               name="headers"
               name="headers"
               render={({ field: { name, onChange } }) => (
               render={({ field: { name, onChange } }) => (
                 <JSONEditor
                 <JSONEditor
-                  readOnly={isSubmitting}
+                  readOnly={isSubmitting || messageIsSending}
                   defaultValue="{}"
                   defaultValue="{}"
                   name={name}
                   name={name}
                   onChange={onChange}
                   onChange={onChange}
@@ -205,14 +184,22 @@ const SendMessage: React.FC = () => {
             />
             />
           </div>
           </div>
         </div>
         </div>
-        <Button
-          buttonSize="M"
-          buttonType="primary"
+        {schemaErrors && (
+          <div className="mb-4">
+            {schemaErrors.map((err) => (
+              <p className="help is-danger" key={err}>
+                {err}
+              </p>
+            ))}
+          </div>
+        )}
+        <button
           type="submit"
           type="submit"
-          disabled={!isDirty || isSubmitting}
+          className="button is-primary"
+          disabled={!isDirty || isSubmitting || messageIsSending}
         >
         >
           Send
           Send
-        </Button>
+        </button>
       </form>
       </form>
     </div>
     </div>
   );
   );

+ 44 - 0
kafka-ui-react-app/src/components/Topics/Topic/SendMessage/SendMessageContainer.ts

@@ -0,0 +1,44 @@
+import { connect } from 'react-redux';
+import { RootState, ClusterName, TopicName } from 'redux/interfaces';
+import { withRouter, RouteComponentProps } from 'react-router-dom';
+import { fetchTopicMessageSchema, sendTopicMessage } from 'redux/actions';
+import {
+  getMessageSchemaByTopicName,
+  getPartitionsByTopicName,
+  getTopicMessageSchemaFetched,
+  getTopicMessageSending,
+} from 'redux/reducers/topics/selectors';
+
+import SendMessage from './SendMessage';
+
+interface RouteProps {
+  clusterName: ClusterName;
+  topicName: TopicName;
+}
+
+type OwnProps = RouteComponentProps<RouteProps>;
+
+const mapStateToProps = (
+  state: RootState,
+  {
+    match: {
+      params: { topicName, clusterName },
+    },
+  }: OwnProps
+) => ({
+  clusterName,
+  topicName,
+  messageSchema: getMessageSchemaByTopicName(state, topicName),
+  schemaIsFetched: getTopicMessageSchemaFetched(state),
+  messageIsSending: getTopicMessageSending(state),
+  partitions: getPartitionsByTopicName(state, topicName),
+});
+
+const mapDispatchToProps = {
+  fetchTopicMessageSchema,
+  sendTopicMessage,
+};
+
+export default withRouter(
+  connect(mapStateToProps, mapDispatchToProps)(SendMessage)
+);

+ 109 - 73
kafka-ui-react-app/src/components/Topics/Topic/SendMessage/__test__/SendMessage.spec.tsx

@@ -1,25 +1,11 @@
 import React from 'react';
 import React from 'react';
-import SendMessage from 'components/Topics/Topic/SendMessage/SendMessage';
-import {
-  screen,
-  waitFor,
-  waitForElementToBeRemoved,
-} from '@testing-library/react';
+import SendMessage, {
+  Props,
+} from 'components/Topics/Topic/SendMessage/SendMessage';
+import { MessageSchemaSourceEnum } from 'generated-sources';
+import { screen, waitFor } from '@testing-library/react';
 import userEvent from '@testing-library/user-event';
 import userEvent from '@testing-library/user-event';
-import fetchMock from 'fetch-mock';
-import { createMemoryHistory } from 'history';
 import { render } from 'lib/testHelpers';
 import { render } from 'lib/testHelpers';
-import { Route, Router } from 'react-router';
-import {
-  clusterTopicMessagesPath,
-  clusterTopicSendMessagePath,
-} from 'lib/paths';
-import { store } from 'redux/store';
-import { fetchTopicDetailsAction } from 'redux/actions';
-import { initialState } from 'redux/reducers/topics/reducer';
-import { externalTopicPayload } from 'redux/reducers/topics/__test__/fixtures';
-
-import { testSchema } from './fixtures';
 
 
 jest.mock('json-schema-faker', () => ({
 jest.mock('json-schema-faker', () => ({
   generate: () => ({
   generate: () => ({
@@ -30,68 +16,118 @@ jest.mock('json-schema-faker', () => ({
   option: jest.fn(),
   option: jest.fn(),
 }));
 }));
 
 
-const clusterName = 'testCluster';
-const topicName = externalTopicPayload.name;
-const history = createMemoryHistory();
-
-const renderComponent = () => {
-  history.push(clusterTopicSendMessagePath(clusterName, topicName));
-  render(
-    <Router history={history}>
-      <Route path={clusterTopicSendMessagePath(':clusterName', ':topicName')}>
-        <SendMessage />
-      </Route>
-    </Router>
-  );
-};
+const setupWrapper = (props?: Partial<Props>) => (
+  <SendMessage
+    clusterName="testCluster"
+    topicName="testTopic"
+    fetchTopicMessageSchema={jest.fn()}
+    sendTopicMessage={jest.fn()}
+    messageSchema={{
+      key: {
+        name: 'key',
+        source: MessageSchemaSourceEnum.SCHEMA_REGISTRY,
+        schema: `{
+          "$schema": "https://json-schema.org/draft/2020-12/schema",
+          "$id": "http://example.com/myURI.schema.json",
+          "title": "TestRecord",
+          "type": "object",
+          "additionalProperties": false,
+          "properties": {
+            "f1": {
+              "type": "integer"
+            },
+            "f2": {
+              "type": "string"
+            },
+            "schema": {
+              "type": "string"
+            }
+          }
+        }
+        `,
+      },
+      value: {
+        name: 'value',
+        source: MessageSchemaSourceEnum.SCHEMA_REGISTRY,
+        schema: `{
+          "$schema": "https://json-schema.org/draft/2020-12/schema",
+          "$id": "http://example.com/myURI1.schema.json",
+          "title": "TestRecord",
+          "type": "object",
+          "additionalProperties": false,
+          "properties": {
+            "f1": {
+              "type": "integer"
+            },
+            "f2": {
+              "type": "string"
+            },
+            "schema": {
+              "type": "string"
+            }
+          }
+        }
+        `,
+      },
+    }}
+    schemaIsFetched={false}
+    messageIsSending={false}
+    partitions={[
+      {
+        partition: 0,
+        leader: 2,
+        replicas: [
+          {
+            broker: 2,
+            leader: false,
+            inSync: true,
+          },
+        ],
+        offsetMax: 0,
+        offsetMin: 0,
+      },
+      {
+        partition: 1,
+        leader: 1,
+        replicas: [
+          {
+            broker: 1,
+            leader: false,
+            inSync: true,
+          },
+        ],
+        offsetMax: 0,
+        offsetMin: 0,
+      },
+    ]}
+    {...props}
+  />
+);
 
 
 describe('SendMessage', () => {
 describe('SendMessage', () => {
-  beforeAll(() => {
-    store.dispatch(
-      fetchTopicDetailsAction.success({
-        ...initialState,
-        byName: {
-          [externalTopicPayload.name]: externalTopicPayload,
-        },
-      })
-    );
-  });
-  afterEach(() => {
-    fetchMock.reset();
-  });
-
-  it('fetches schema on first render', () => {
-    const fetchTopicMessageSchemaMock = fetchMock.getOnce(
-      `/api/clusters/${clusterName}/topics/${topicName}/messages/schema`,
-      testSchema
+  it('calls fetchTopicMessageSchema on first render', () => {
+    const fetchTopicMessageSchemaMock = jest.fn();
+    render(
+      setupWrapper({ fetchTopicMessageSchema: fetchTopicMessageSchemaMock })
     );
     );
-    renderComponent();
-    expect(fetchTopicMessageSchemaMock.called()).toBeTruthy();
+    expect(fetchTopicMessageSchemaMock).toHaveBeenCalledTimes(1);
   });
   });
 
 
   describe('when schema is fetched', () => {
   describe('when schema is fetched', () => {
-    beforeEach(() => {
-      fetchMock.getOnce(
-        `/api/clusters/${clusterName}/topics/${topicName}/messages/schema`,
-        testSchema
-      );
-    });
-
     it('calls sendTopicMessage on submit', async () => {
     it('calls sendTopicMessage on submit', async () => {
-      const sendTopicMessageMock = fetchMock.postOnce(
-        `/api/clusters/${clusterName}/topics/${topicName}/messages`,
-        200
-      );
-      renderComponent();
-      await waitForElementToBeRemoved(() => screen.getByRole('progressbar'));
-
-      userEvent.selectOptions(screen.getByLabelText('Partition'), '0');
-      await screen.findByText('Send');
-      userEvent.click(screen.getByText('Send'));
-      await waitFor(() => expect(sendTopicMessageMock.called()).toBeTruthy());
-      expect(history.location.pathname).toEqual(
-        clusterTopicMessagesPath(clusterName, topicName)
+      jest.mock('../validateMessage', () => jest.fn().mockReturnValue(true));
+      const mockSendTopicMessage = jest.fn();
+      render(
+        setupWrapper({
+          schemaIsFetched: true,
+          sendTopicMessage: mockSendTopicMessage,
+        })
       );
       );
+      userEvent.selectOptions(screen.getByLabelText('Partition'), '1');
+      await waitFor(async () => {
+        userEvent.click(await screen.findByText('Send'));
+        expect(mockSendTopicMessage).toHaveBeenCalledTimes(1);
+      });
     });
     });
   });
   });
 });
 });

+ 38 - 27
kafka-ui-react-app/src/components/Topics/Topic/SendMessage/__test__/validateMessage.spec.ts

@@ -3,34 +3,45 @@ import validateMessage from 'components/Topics/Topic/SendMessage/validateMessage
 import { testSchema } from './fixtures';
 import { testSchema } from './fixtures';
 
 
 describe('validateMessage', () => {
 describe('validateMessage', () => {
-  it('returns no errors on correct input data', () => {
-    const key = `{"f1": 32, "f2": "multi-state", "schema": "Bedfordshire violet SAS"}`;
-    const content = `{"f1": 21128, "f2": "Health Berkshire", "schema": "Dynamic"}`;
-    expect(validateMessage(key, content, testSchema)).toEqual([]);
+  it('returns true on correct input data', async () => {
+    const mockSetError = jest.fn();
+    expect(
+      await validateMessage(
+        `{
+      "f1": 32,
+      "f2": "multi-state",
+      "schema": "Bedfordshire violet SAS"
+    }`,
+        `{
+      "f1": 21128,
+      "f2": "Health Berkshire Re-engineered",
+      "schema": "Dynamic Greenland Beauty"
+    }`,
+        testSchema,
+        mockSetError
+      )
+    ).toBe(true);
+    expect(mockSetError).toHaveBeenCalledTimes(1);
   });
   });
 
 
-  it('returns errors on invalid input data', () => {
-    const key = `{"f1": "32", "f2": "multi-state", "schema": "Bedfordshire violet SAS"}`;
-    const content = `{"f1": "21128", "f2": "Health Berkshire", "schema": "Dynamic"}`;
-    expect(validateMessage(key, content, testSchema)).toEqual([
-      'Key/properties/f1/type - must be integer',
-      'Content/properties/f1/type - must be integer',
-    ]);
-  });
-
-  it('returns error on broken key value', () => {
-    const key = `{"f1": "32", "f2": "multi-state", "schema": "Bedfordshire violet SAS"`;
-    const content = `{"f1": 21128, "f2": "Health Berkshire", "schema": "Dynamic"}`;
-    expect(validateMessage(key, content, testSchema)).toEqual([
-      'Error in parsing the "key" field value',
-    ]);
-  });
-
-  it('returns error on broken content value', () => {
-    const key = `{"f1": 32, "f2": "multi-state", "schema": "Bedfordshire violet SAS"}`;
-    const content = `{"f1": 21128, "f2": "Health Berkshire", "schema": "Dynamic"`;
-    expect(validateMessage(key, content, testSchema)).toEqual([
-      'Error in parsing the "content" field value',
-    ]);
+  it('returns false on incorrect input data', async () => {
+    const mockSetError = jest.fn();
+    expect(
+      await validateMessage(
+        `{
+      "f1": "32",
+      "f2": "multi-state",
+      "schema": "Bedfordshire violet SAS"
+    }`,
+        `{
+      "f1": "21128",
+      "f2": "Health Berkshire Re-engineered",
+      "schema": "Dynamic Greenland Beauty"
+    }`,
+        testSchema,
+        mockSetError
+      )
+    ).toBe(false);
+    expect(mockSetError).toHaveBeenCalledTimes(3);
   });
   });
 });
 });

+ 61 - 47
kafka-ui-react-app/src/components/Topics/Topic/SendMessage/validateMessage.ts

@@ -1,57 +1,71 @@
 import { TopicMessageSchema } from 'generated-sources';
 import { TopicMessageSchema } from 'generated-sources';
 import Ajv from 'ajv/dist/2020';
 import Ajv from 'ajv/dist/2020';
-import { upperFirst } from 'lodash';
 
 
-const validateBySchema = (
-  value: string,
-  schema: string | undefined,
-  type: 'key' | 'content'
-) => {
-  let errors: string[] = [];
-
-  if (!value || !schema) {
-    return errors;
-  }
+const validateMessage = async (
+  key: string,
+  content: string,
+  messageSchema: TopicMessageSchema | undefined,
+  setSchemaErrors: React.Dispatch<React.SetStateAction<string[]>>
+): Promise<boolean> => {
+  setSchemaErrors([]);
+  const keyAjv = new Ajv();
+  const contentAjv = new Ajv();
+  try {
+    if (messageSchema) {
+      let keyIsValid = false;
+      let contentIsValid = false;
 
 
-  let parcedSchema;
-  let parsedValue;
+      try {
+        const keySchema = JSON.parse(messageSchema.key.schema);
+        const validateKey = keyAjv.compile(keySchema);
+        if (keySchema.type === 'string') {
+          keyIsValid = true;
+        } else {
+          keyIsValid = validateKey(JSON.parse(key));
+        }
+        if (!keyIsValid) {
+          const errorString: string[] = [];
+          if (validateKey.errors) {
+            validateKey.errors.forEach((e) => {
+              errorString.push(
+                `${e.schemaPath.replace('#', 'Key')} ${e.message}`
+              );
+            });
+            setSchemaErrors((e) => [...e, ...errorString]);
+          }
+        }
+      } catch (err) {
+        setSchemaErrors((e) => [...e, `Key ${err.message}`]);
+      }
+      try {
+        const contentSchema = JSON.parse(messageSchema.value.schema);
+        const validateContent = contentAjv.compile(contentSchema);
+        if (contentSchema.type === 'string') {
+          contentIsValid = true;
+        } else {
+          contentIsValid = validateContent(JSON.parse(content));
+        }
+        if (!contentIsValid) {
+          const errorString: string[] = [];
+          if (validateContent.errors) {
+            validateContent.errors.forEach((e) => {
+              errorString.push(
+                `${e.schemaPath.replace('#', 'Content')} ${e.message}`
+              );
+            });
+            setSchemaErrors((e) => [...e, ...errorString]);
+          }
+        }
+      } catch (err) {
+        setSchemaErrors((e) => [...e, `Content ${err.message}`]);
+      }
 
 
-  try {
-    parcedSchema = JSON.parse(schema);
-  } catch (e) {
-    return [`Error in parsing the "${type}" field schema`];
-  }
-  if (parcedSchema.type === 'string') {
-    return [];
-  }
-  try {
-    parsedValue = JSON.parse(value);
-  } catch (e) {
-    return [`Error in parsing the "${type}" field value`];
-  }
-  try {
-    const validate = new Ajv().compile(parcedSchema);
-    validate(parsedValue);
-    if (validate.errors) {
-      errors = validate.errors.map(
-        ({ schemaPath, message }) =>
-          `${schemaPath.replace('#', upperFirst(type))} - ${message}`
-      );
+      return keyIsValid && contentIsValid;
     }
     }
-  } catch (e) {
-    return [`${upperFirst(type)} ${e.message}`];
+  } catch (err) {
+    setSchemaErrors((e) => [...e, err.message]);
   }
   }
-
-  return errors;
+  return false;
 };
 };
 
 
-const validateMessage = (
-  key: string,
-  content: string,
-  messageSchema: TopicMessageSchema | undefined
-): string[] => [
-  ...validateBySchema(key, messageSchema?.key?.schema, 'key'),
-  ...validateBySchema(content, messageSchema?.value?.schema, 'content'),
-];
-
 export default validateMessage;
 export default validateMessage;

+ 2 - 2
kafka-ui-react-app/src/components/Topics/Topic/Topic.tsx

@@ -5,7 +5,7 @@ import EditContainer from 'components/Topics/Topic/Edit/EditContainer';
 import DetailsContainer from 'components/Topics/Topic/Details/DetailsContainer';
 import DetailsContainer from 'components/Topics/Topic/Details/DetailsContainer';
 import PageLoader from 'components/common/PageLoader/PageLoader';
 import PageLoader from 'components/common/PageLoader/PageLoader';
 
 
-import SendMessage from './SendMessage/SendMessage';
+import SendMessageContainer from './SendMessage/SendMessageContainer';
 
 
 interface RouterParams {
 interface RouterParams {
   clusterName: ClusterName;
   clusterName: ClusterName;
@@ -41,7 +41,7 @@ const Topic: React.FC<TopicProps> = ({
       <Route
       <Route
         exact
         exact
         path="/ui/clusters/:clusterName/topics/:topicName/message"
         path="/ui/clusters/:clusterName/topics/:topicName/message"
-        component={SendMessage}
+        component={SendMessageContainer}
       />
       />
       <Route
       <Route
         path="/ui/clusters/:clusterName/topics/:topicName"
         path="/ui/clusters/:clusterName/topics/:topicName"

+ 51 - 0
kafka-ui-react-app/src/redux/actions/__test__/thunks/topics.spec.ts

@@ -211,6 +211,57 @@ describe('Thunks', () => {
       ]);
       ]);
     });
     });
   });
   });
+
+  describe('sendTopicMessage', () => {
+    it('creates SEND_TOPIC_MESSAGE__FAILURE', async () => {
+      fetchMock.postOnce(
+        `/api/clusters/${clusterName}/topics/${topicName}/messages`,
+        404
+      );
+      try {
+        await store.dispatch(
+          thunks.sendTopicMessage(clusterName, topicName, {
+            key: '{}',
+            content: '{}',
+            headers: undefined,
+            partition: 0,
+          })
+        );
+      } catch (error) {
+        const err = error as Response;
+        expect(err.status).toEqual(404);
+        expect(store.getActions()).toEqual([
+          actions.sendTopicMessageAction.request(),
+          actions.sendTopicMessageAction.failure({
+            alert: {
+              subject: ['topic', topicName].join('-'),
+              title: `Topic Message ${topicName}`,
+              response: err,
+            },
+          }),
+        ]);
+      }
+    });
+
+    it('creates SEND_TOPIC_MESSAGE__SUCCESS', async () => {
+      fetchMock.postOnce(
+        `/api/clusters/${clusterName}/topics/${topicName}/messages`,
+        200
+      );
+      await store.dispatch(
+        thunks.sendTopicMessage(clusterName, topicName, {
+          key: '{}',
+          content: '{}',
+          headers: undefined,
+          partition: 0,
+        })
+      );
+      expect(store.getActions()).toEqual([
+        actions.sendTopicMessageAction.request(),
+        actions.sendTopicMessageAction.success(),
+      ]);
+    });
+  });
   describe('increasing partitions count', () => {
   describe('increasing partitions count', () => {
     it('calls updateTopicPartitionsCountAction.success on success', async () => {
     it('calls updateTopicPartitionsCountAction.success on success', async () => {
       fetchMock.patchOnce(
       fetchMock.patchOnce(

+ 6 - 0
kafka-ui-react-app/src/redux/actions/actions.ts

@@ -219,6 +219,12 @@ export const fetchTopicMessageSchemaAction = createAsyncAction(
   { alert?: FailurePayload }
   { alert?: FailurePayload }
 >();
 >();
 
 
+export const sendTopicMessageAction = createAsyncAction(
+  'SEND_TOPIC_MESSAGE__REQUEST',
+  'SEND_TOPIC_MESSAGE__SUCCESS',
+  'SEND_TOPIC_MESSAGE__FAILURE'
+)<undefined, undefined, { alert?: FailurePayload }>();
+
 export const updateTopicPartitionsCountAction = createAsyncAction(
 export const updateTopicPartitionsCountAction = createAsyncAction(
   'UPDATE_PARTITIONS__REQUEST',
   'UPDATE_PARTITIONS__REQUEST',
   'UPDATE_PARTITIONS__SUCCESS',
   'UPDATE_PARTITIONS__SUCCESS',

+ 31 - 0
kafka-ui-react-app/src/redux/actions/thunks/topics.ts

@@ -8,6 +8,7 @@ import {
   TopicUpdate,
   TopicUpdate,
   TopicConfig,
   TopicConfig,
   ConsumerGroupsApi,
   ConsumerGroupsApi,
+  CreateTopicMessage,
   GetTopicsRequest,
   GetTopicsRequest,
 } from 'generated-sources';
 } from 'generated-sources';
 import {
 import {
@@ -317,6 +318,36 @@ export const fetchTopicMessageSchema =
     }
     }
   };
   };
 
 
+export const sendTopicMessage =
+  (
+    clusterName: ClusterName,
+    topicName: TopicName,
+    payload: CreateTopicMessage
+  ): PromiseThunkResult =>
+  async (dispatch) => {
+    dispatch(actions.sendTopicMessageAction.request());
+    try {
+      await messagesApiClient.sendTopicMessages({
+        clusterName,
+        topicName,
+        createTopicMessage: {
+          key: payload.key,
+          content: payload.content,
+          headers: payload.headers,
+          partition: payload.partition,
+        },
+      });
+      dispatch(actions.sendTopicMessageAction.success());
+    } catch (e) {
+      const response = await getResponse(e);
+      const alert: FailurePayload = {
+        subject: ['topic', topicName].join('-'),
+        title: `Topic Message ${topicName}`,
+        response,
+      };
+      dispatch(actions.sendTopicMessageAction.failure({ alert }));
+    }
+  };
 export const updateTopicPartitionsCount =
 export const updateTopicPartitionsCount =
   (
   (
     clusterName: ClusterName,
     clusterName: ClusterName,

+ 2 - 9
kafka-ui-react-app/src/redux/reducers/alerts/alertsSlice.ts

@@ -1,8 +1,4 @@
-import {
-  createEntityAdapter,
-  createSlice,
-  PayloadAction,
-} from '@reduxjs/toolkit';
+import { createEntityAdapter, createSlice } from '@reduxjs/toolkit';
 import { UnknownAsyncThunkRejectedWithValueAction } from '@reduxjs/toolkit/dist/matchers';
 import { UnknownAsyncThunkRejectedWithValueAction } from '@reduxjs/toolkit/dist/matchers';
 import { now } from 'lodash';
 import { now } from 'lodash';
 import { Alert, RootState, ServerResponse } from 'redux/interfaces';
 import { Alert, RootState, ServerResponse } from 'redux/interfaces';
@@ -23,9 +19,6 @@ const alertsSlice = createSlice({
   initialState: alertsAdapter.getInitialState(),
   initialState: alertsAdapter.getInitialState(),
   reducers: {
   reducers: {
     alertDissmissed: alertsAdapter.removeOne,
     alertDissmissed: alertsAdapter.removeOne,
-    alertAdded(state, action: PayloadAction<Alert>) {
-      alertsAdapter.upsertOne(state, action.payload);
-    },
   },
   },
   extraReducers: (builder) => {
   extraReducers: (builder) => {
     builder.addMatcher(
     builder.addMatcher(
@@ -54,6 +47,6 @@ export const { selectAll } = alertsAdapter.getSelectors<RootState>(
   (state) => state.alerts
   (state) => state.alerts
 );
 );
 
 
-export const { alertDissmissed, alertAdded } = alertsSlice.actions;
+export const { alertDissmissed } = alertsSlice.actions;
 
 
 export default alertsSlice.reducer;
 export default alertsSlice.reducer;

+ 7 - 0
kafka-ui-react-app/src/redux/reducers/topics/selectors.ts

@@ -26,6 +26,8 @@ const getTopicCreationStatus = createLeagcyFetchingSelector('POST_TOPIC');
 const getTopicUpdateStatus = createLeagcyFetchingSelector('PATCH_TOPIC');
 const getTopicUpdateStatus = createLeagcyFetchingSelector('PATCH_TOPIC');
 const getTopicMessageSchemaFetchingStatus =
 const getTopicMessageSchemaFetchingStatus =
   createLeagcyFetchingSelector('GET_TOPIC_SCHEMA');
   createLeagcyFetchingSelector('GET_TOPIC_SCHEMA');
+const getTopicMessageSendingStatus =
+  createLeagcyFetchingSelector('SEND_TOPIC_MESSAGE');
 const getPartitionsCountIncreaseStatus =
 const getPartitionsCountIncreaseStatus =
   createLeagcyFetchingSelector('UPDATE_PARTITIONS');
   createLeagcyFetchingSelector('UPDATE_PARTITIONS');
 const getReplicationFactorUpdateStatus = createLeagcyFetchingSelector(
 const getReplicationFactorUpdateStatus = createLeagcyFetchingSelector(
@@ -78,6 +80,11 @@ export const getTopicMessageSchemaFetched = createSelector(
   (status) => status === 'fetched'
   (status) => status === 'fetched'
 );
 );
 
 
+export const getTopicMessageSending = createSelector(
+  getTopicMessageSendingStatus,
+  (status) => status === 'fetching'
+);
+
 export const getTopicPartitionsCountIncreased = createSelector(
 export const getTopicPartitionsCountIncreased = createSelector(
   getPartitionsCountIncreaseStatus,
   getPartitionsCountIncreaseStatus,
   (status) => status === 'fetched'
   (status) => status === 'fetched'