Allow producing messages with an empty key/value

* [ISSUE 1046] UI allows to submit message with empty key & value (#1264)

* [ISSUE 1046] UI allows to submit message with empty key & value

* Update Contract

(cherry picked from commit 4b730eb288)

* Backend fix

* Refactoring

* Fix nullable & checkstyle

* Fix jsonnullable get

* Remove unnecessary check and add a test

Co-authored-by: Oleg Shur <workshur@gmail.com>
This commit is contained in:
Roman Zabaluev 2021-12-24 19:00:27 +03:00 committed by GitHub
parent 5a487e437d
commit 32a2e753b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 348 additions and 495 deletions

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.config;
import com.fasterxml.jackson.databind.Module;
import com.provectus.kafka.ui.model.JmxConnectionInfo;
import com.provectus.kafka.ui.util.JmxPoolFactory;
import java.util.Collections;
@ -9,6 +10,7 @@ import lombok.AllArgsConstructor;
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.openapitools.jackson.nullable.JsonNullableModule;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties;
@ -78,4 +80,9 @@ public class Config {
.codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
.build();
}
@Bean
public JsonNullableModule jsonNullableModule() {
return new JsonNullableModule();
}
}

View file

@ -39,7 +39,7 @@ public class DeserializationService {
objectMapper);
} else {
log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName());
return new SchemaRegistryAwareRecordSerDe(cluster);
return new SchemaRegistryAwareRecordSerDe(cluster, objectMapper);
}
} catch (Throwable e) {
throw new RuntimeException("Can't init deserializer", e);

View file

@ -13,6 +13,8 @@ import org.apache.kafka.common.utils.Bytes;
public class SimpleRecordSerDe implements RecordSerDe {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public DeserializedKeyValue deserialize(ConsumerRecord<Bytes, Bytes> msg) {
var builder = DeserializedKeyValue.builder();
@ -45,7 +47,7 @@ public class SimpleRecordSerDe implements RecordSerDe {
final MessageSchemaDTO schema = new MessageSchemaDTO()
.name("unknown")
.source(MessageSchemaDTO.SourceEnum.UNKNOWN)
.schema(JsonSchema.stringSchema().toJson(new ObjectMapper()));
.schema(JsonSchema.stringSchema().toJson(objectMapper));
return new TopicMessageSchemaDTO()
.key(schema)
.value(schema);

View file

@ -10,7 +10,6 @@ import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.MessageSchemaDTO;
import com.provectus.kafka.ui.model.TopicMessageSchemaDTO;
import com.provectus.kafka.ui.serde.RecordSerDe;
import com.provectus.kafka.ui.util.ConsumerRecordUtil;
import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
import com.provectus.kafka.ui.util.jsonschema.JsonSchema;
import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter;
@ -27,7 +26,6 @@ import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -47,31 +45,32 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
private static final int CLIENT_IDENTITY_MAP_CAPACITY = 100;
private static final StringMessageFormatter stringFormatter = new StringMessageFormatter();
private static final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter();
private static final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter();
private final KafkaCluster cluster;
private final Map<String, MessageFormatter> valueFormatMap = new ConcurrentHashMap<>();
private final Map<String, MessageFormatter> keyFormatMap = new ConcurrentHashMap<>();
@Nullable
private final SchemaRegistryClient schemaRegistryClient;
@Nullable
private final AvroMessageFormatter avroFormatter;
@Nullable
private final ProtobufMessageFormatter protobufFormatter;
@Nullable
private final JsonSchemaMessageFormatter jsonSchemaMessageFormatter;
private final StringMessageFormatter stringFormatter = new StringMessageFormatter();
private final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter();
private final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter();
private final ObjectMapper objectMapper = new ObjectMapper();
private ObjectMapper objectMapper;
private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) {
private SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster,
ObjectMapper objectMapper) {
if (cluster.getSchemaRegistry() == null) {
throw new ValidationException("schemaRegistry is not specified");
}
this.objectMapper = objectMapper;
List<SchemaProvider> schemaProviders =
List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider());
@ -97,10 +96,10 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
);
}
public SchemaRegistryAwareRecordSerDe(KafkaCluster cluster) {
public SchemaRegistryAwareRecordSerDe(KafkaCluster cluster, ObjectMapper objectMapper) {
this.cluster = cluster;
this.schemaRegistryClient = cluster.getSchemaRegistry() != null
? createSchemaRegistryClient(cluster)
? createSchemaRegistryClient(cluster, objectMapper)
: null;
if (schemaRegistryClient != null) {
this.avroFormatter = new AvroMessageFormatter(schemaRegistryClient);
@ -147,41 +146,45 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
@Nullable String key,
@Nullable String data,
@Nullable Integer partition) {
final Optional<SchemaMetadata> maybeValueSchema = getSchemaBySubject(topic, false);
final Optional<SchemaMetadata> maybeKeySchema = getSchemaBySubject(topic, true);
final Optional<SchemaMetadata> maybeValueSchema = getSchemaBySubject(topic, false);
final byte[] serializedValue = data != null
? serialize(maybeValueSchema, topic, data, false)
: null;
final byte[] serializedKey = key != null
? serialize(maybeKeySchema, topic, key, true)
: null;
final byte[] serializedKey = maybeKeySchema.isPresent()
? serialize(maybeKeySchema.get(), topic, key, true)
: serialize(key);
final byte[] serializedValue = maybeValueSchema.isPresent()
? serialize(maybeValueSchema.get(), topic, data, false)
: serialize(data);
return new ProducerRecord<>(topic, partition, serializedKey, serializedValue);
}
@SneakyThrows
private byte[] serialize(
Optional<SchemaMetadata> maybeSchema, String topic, String value, boolean isKey) {
if (maybeSchema.isPresent()) {
final SchemaMetadata schema = maybeSchema.get();
MessageReader<?> reader;
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
reader = new ProtobufMessageReader(topic, isKey, schemaRegistryClient, schema);
} else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) {
reader = new AvroMessageReader(topic, isKey, schemaRegistryClient, schema);
} else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) {
reader = new JsonSchemaMessageReader(topic, isKey, schemaRegistryClient, schema);
} else {
throw new IllegalStateException("Unsupported schema type: " + schema.getSchemaType());
}
return reader.read(value);
} else {
// if no schema provided serialize input as raw string
return value.getBytes();
private byte[] serialize(SchemaMetadata schema, String topic, String value, boolean isKey) {
if (value == null) {
return null;
}
MessageReader<?> reader;
if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) {
reader = new ProtobufMessageReader(topic, isKey, schemaRegistryClient, schema);
} else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) {
reader = new AvroMessageReader(topic, isKey, schemaRegistryClient, schema);
} else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) {
reader = new JsonSchemaMessageReader(topic, isKey, schemaRegistryClient, schema);
} else {
throw new IllegalStateException("Unsupported schema type: " + schema.getSchemaType());
}
return reader.read(value);
}
private byte[] serialize(String value) {
if (value == null) {
return null;
}
// if no schema provided serialize input as raw string
return value.getBytes();
}
@Override

View file

@ -81,9 +81,6 @@ public class MessagesService {
public Mono<RecordMetadata> sendMessage(KafkaCluster cluster, String topic,
CreateTopicMessageDTO msg) {
if (msg.getKey() == null && msg.getContent() == null) {
throw new ValidationException("Invalid message: both key and value can't be null");
}
if (msg.getPartition() != null
&& msg.getPartition() > metricsCache.get(cluster).getTopicDescriptions()
.get(topic).partitions().size() - 1) {
@ -100,8 +97,8 @@ public class MessagesService {
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
ProducerRecord<byte[], byte[]> producerRecord = serde.serialize(
topic,
msg.getKey(),
msg.getContent(),
msg.getKey().orElse(null),
msg.getContent().orElse(null),
msg.getPartition()
);
producerRecord = new ProducerRecord<>(

View file

@ -17,7 +17,7 @@ class SchemaRegistryRecordDeserializerTest {
new SchemaRegistryAwareRecordSerDe(
KafkaCluster.builder()
.schemaNameTemplate("%s-value")
.build()
.build(), new ObjectMapper()
);
@Test

View file

@ -465,6 +465,20 @@ public class SendAndReadTests extends AbstractBaseTest {
});
}
@Test
void noKeyAndNoContentPresentTest() {
new SendAndReadSpec()
.withMsgToSend(
new CreateTopicMessageDTO()
.key(null)
.content(null)
)
.doAssert(polled -> {
assertThat(polled.getKey()).isNull();
assertThat(polled.getContent()).isNull();
});
}
@SneakyThrows
private void assertJsonEqual(String actual, String expected) {
var mapper = new ObjectMapper();

View file

@ -1870,12 +1870,16 @@ components:
type: integer
key:
type: string
nullable: true
headers:
type: object
additionalProperties:
type: string
content:
type: string
nullable: true
required:
- partition
TopicMessageSchema:
type: object
@ -2635,4 +2639,4 @@ components:
- DELETE
- COMPACT
- COMPACT_DELETE
- UNKNOWN
- UNKNOWN

View file

@ -3,7 +3,7 @@ import styled from 'styled-components';
export const Alert = styled.div<{ $type: AlertType }>`
background-color: ${({ $type, theme }) => theme.alert.color[$type]};
width: 400px;
min-width: 400px;
min-height: 64px;
border-radius: 8px;
padding: 12px;
@ -20,8 +20,14 @@ export const Title = styled.div`
font-size: 14px;
`;
export const Message = styled.p`
export const Message = styled.div`
font-weight: normal;
font-size: 14px;
margin: 3px 0;
ol,
ul {
padding-left: 25px;
list-style: auto;
}
`;

View file

@ -168,7 +168,7 @@ export const AlertsContainer = styled.div`
width: 500px;
position: fixed;
bottom: 15px;
left: 15px;
right: 15px;
z-index: 1000;
@media screen and (max-width: 1023px) {

View file

@ -1,47 +1,32 @@
import JSONEditor from 'components/common/JSONEditor/JSONEditor';
import PageLoader from 'components/common/PageLoader/PageLoader';
import {
CreateTopicMessage,
Partition,
TopicMessageSchema,
} from 'generated-sources';
import React from 'react';
import { useForm, Controller } from 'react-hook-form';
import { useHistory } from 'react-router';
import { useHistory, useParams } from 'react-router';
import { clusterTopicMessagesPath } from 'lib/paths';
import jsf from 'json-schema-faker';
import { fetchTopicMessageSchema, messagesApiClient } from 'redux/actions';
import { useAppDispatch, useAppSelector } from 'lib/hooks/redux';
import { alertAdded } from 'redux/reducers/alerts/alertsSlice';
import { now } from 'lodash';
import { Button } from 'components/common/Button/Button';
import { ClusterName, TopicName } from 'redux/interfaces';
import {
getMessageSchemaByTopicName,
getPartitionsByTopicName,
getTopicMessageSchemaFetched,
} from 'redux/reducers/topics/selectors';
import validateMessage from './validateMessage';
export interface Props {
clusterName: string;
topicName: string;
fetchTopicMessageSchema: (clusterName: string, topicName: string) => void;
sendTopicMessage: (
clusterName: string,
topicName: string,
payload: CreateTopicMessage
) => void;
messageSchema: TopicMessageSchema | undefined;
schemaIsFetched: boolean;
messageIsSending: boolean;
partitions: Partition[];
interface RouterParams {
clusterName: ClusterName;
topicName: TopicName;
}
const SendMessage: React.FC<Props> = ({
clusterName,
topicName,
fetchTopicMessageSchema,
sendTopicMessage,
messageSchema,
schemaIsFetched,
messageIsSending,
partitions,
}) => {
const [keyExampleValue, setKeyExampleValue] = React.useState('');
const [contentExampleValue, setContentExampleValue] = React.useState('');
const [schemaIsReady, setSchemaIsReady] = React.useState(false);
const [schemaErrors, setSchemaErrors] = React.useState<string[]>([]);
const SendMessage: React.FC = () => {
const dispatch = useAppDispatch();
const { clusterName, topicName } = useParams<RouterParams>();
const {
register,
handleSubmit,
@ -54,27 +39,38 @@ const SendMessage: React.FC<Props> = ({
jsf.option('alwaysFakeOptionals', true);
React.useEffect(() => {
fetchTopicMessageSchema(clusterName, topicName);
dispatch(fetchTopicMessageSchema(clusterName, topicName));
}, []);
React.useEffect(() => {
if (schemaIsFetched && messageSchema) {
setKeyExampleValue(
JSON.stringify(
jsf.generate(JSON.parse(messageSchema.key.schema)),
null,
'\t'
)
);
setContentExampleValue(
JSON.stringify(
jsf.generate(JSON.parse(messageSchema.value.schema)),
null,
'\t'
)
);
setSchemaIsReady(true);
const messageSchema = useAppSelector((state) =>
getMessageSchemaByTopicName(state, topicName)
);
const partitions = useAppSelector((state) =>
getPartitionsByTopicName(state, topicName)
);
const schemaIsFetched = useAppSelector(getTopicMessageSchemaFetched);
const keyDefaultValue = React.useMemo(() => {
if (!schemaIsFetched || !messageSchema) {
return undefined;
}
}, [schemaIsFetched]);
return JSON.stringify(
jsf.generate(JSON.parse(messageSchema.key.schema)),
null,
'\t'
);
}, [messageSchema, schemaIsFetched]);
const contentDefaultValue = React.useMemo(() => {
if (!schemaIsFetched || !messageSchema) {
return undefined;
}
return JSON.stringify(
jsf.generate(JSON.parse(messageSchema.value.schema)),
null,
'\t'
);
}, [messageSchema, schemaIsFetched]);
const onSubmit = async (data: {
key: string;
@ -83,30 +79,55 @@ const SendMessage: React.FC<Props> = ({
partition: number;
}) => {
if (messageSchema) {
const key = data.key || keyExampleValue;
const content = data.content || contentExampleValue;
const { partition } = data;
const { partition, key, content } = data;
const headers = data.headers ? JSON.parse(data.headers) : undefined;
const messageIsValid = await validateMessage(
key,
content,
messageSchema,
setSchemaErrors
);
if (messageIsValid) {
sendTopicMessage(clusterName, topicName, {
key,
content,
headers,
partition,
});
history.push(clusterTopicMessagesPath(clusterName, topicName));
const errors = validateMessage(key, content, messageSchema);
if (errors.length > 0) {
dispatch(
alertAdded({
id: `${clusterName}-${topicName}-createTopicMessageError`,
type: 'error',
title: 'Validation Error',
message: (
<ul>
{errors.map((e) => (
<li>{e}</li>
))}
</ul>
),
createdAt: now(),
})
);
return;
}
try {
await messagesApiClient.sendTopicMessages({
clusterName,
topicName,
createTopicMessage: {
key: !key ? null : key,
content: !content ? null : content,
headers,
partition,
},
});
} catch (e) {
dispatch(
alertAdded({
id: `${clusterName}-${topicName}-sendTopicMessagesError`,
type: 'error',
title: `Error in sending a message to ${topicName}`,
message: e?.message,
createdAt: now(),
})
);
}
history.push(clusterTopicMessagesPath(clusterName, topicName));
}
};
if (!schemaIsReady) {
if (!schemaIsFetched) {
return <PageLoader />;
}
return (
@ -121,7 +142,7 @@ const SendMessage: React.FC<Props> = ({
<select
id="select"
defaultValue={partitions[0].partition}
disabled={isSubmitting || messageIsSending}
disabled={isSubmitting}
{...register('partition')}
>
{partitions.map((partition) => (
@ -142,8 +163,8 @@ const SendMessage: React.FC<Props> = ({
name="key"
render={({ field: { name, onChange } }) => (
<JSONEditor
readOnly={isSubmitting || messageIsSending}
defaultValue={keyExampleValue}
readOnly={isSubmitting}
defaultValue={keyDefaultValue}
name={name}
onChange={onChange}
/>
@ -157,8 +178,8 @@ const SendMessage: React.FC<Props> = ({
name="content"
render={({ field: { name, onChange } }) => (
<JSONEditor
readOnly={isSubmitting || messageIsSending}
defaultValue={contentExampleValue}
readOnly={isSubmitting}
defaultValue={contentDefaultValue}
name={name}
onChange={onChange}
/>
@ -174,7 +195,7 @@ const SendMessage: React.FC<Props> = ({
name="headers"
render={({ field: { name, onChange } }) => (
<JSONEditor
readOnly={isSubmitting || messageIsSending}
readOnly={isSubmitting}
defaultValue="{}"
name={name}
onChange={onChange}
@ -184,22 +205,14 @@ const SendMessage: React.FC<Props> = ({
/>
</div>
</div>
{schemaErrors && (
<div className="mb-4">
{schemaErrors.map((err) => (
<p className="help is-danger" key={err}>
{err}
</p>
))}
</div>
)}
<button
<Button
buttonSize="M"
buttonType="primary"
type="submit"
className="button is-primary"
disabled={!isDirty || isSubmitting || messageIsSending}
disabled={!isDirty || isSubmitting}
>
Send
</button>
</Button>
</form>
</div>
);

View file

@ -1,44 +0,0 @@
import { connect } from 'react-redux';
import { RootState, ClusterName, TopicName } from 'redux/interfaces';
import { withRouter, RouteComponentProps } from 'react-router-dom';
import { fetchTopicMessageSchema, sendTopicMessage } from 'redux/actions';
import {
getMessageSchemaByTopicName,
getPartitionsByTopicName,
getTopicMessageSchemaFetched,
getTopicMessageSending,
} from 'redux/reducers/topics/selectors';
import SendMessage from './SendMessage';
interface RouteProps {
clusterName: ClusterName;
topicName: TopicName;
}
type OwnProps = RouteComponentProps<RouteProps>;
const mapStateToProps = (
state: RootState,
{
match: {
params: { topicName, clusterName },
},
}: OwnProps
) => ({
clusterName,
topicName,
messageSchema: getMessageSchemaByTopicName(state, topicName),
schemaIsFetched: getTopicMessageSchemaFetched(state),
messageIsSending: getTopicMessageSending(state),
partitions: getPartitionsByTopicName(state, topicName),
});
const mapDispatchToProps = {
fetchTopicMessageSchema,
sendTopicMessage,
};
export default withRouter(
connect(mapStateToProps, mapDispatchToProps)(SendMessage)
);

View file

@ -1,11 +1,25 @@
import React from 'react';
import SendMessage, {
Props,
} from 'components/Topics/Topic/SendMessage/SendMessage';
import { MessageSchemaSourceEnum } from 'generated-sources';
import { screen, waitFor } from '@testing-library/react';
import SendMessage from 'components/Topics/Topic/SendMessage/SendMessage';
import {
screen,
waitFor,
waitForElementToBeRemoved,
} from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import fetchMock from 'fetch-mock';
import { createMemoryHistory } from 'history';
import { render } from 'lib/testHelpers';
import { Route, Router } from 'react-router';
import {
clusterTopicMessagesPath,
clusterTopicSendMessagePath,
} from 'lib/paths';
import { store } from 'redux/store';
import { fetchTopicDetailsAction } from 'redux/actions';
import { initialState } from 'redux/reducers/topics/reducer';
import { externalTopicPayload } from 'redux/reducers/topics/__test__/fixtures';
import { testSchema } from './fixtures';
jest.mock('json-schema-faker', () => ({
generate: () => ({
@ -16,118 +30,68 @@ jest.mock('json-schema-faker', () => ({
option: jest.fn(),
}));
const setupWrapper = (props?: Partial<Props>) => (
<SendMessage
clusterName="testCluster"
topicName="testTopic"
fetchTopicMessageSchema={jest.fn()}
sendTopicMessage={jest.fn()}
messageSchema={{
key: {
name: 'key',
source: MessageSchemaSourceEnum.SCHEMA_REGISTRY,
schema: `{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "http://example.com/myURI.schema.json",
"title": "TestRecord",
"type": "object",
"additionalProperties": false,
"properties": {
"f1": {
"type": "integer"
},
"f2": {
"type": "string"
},
"schema": {
"type": "string"
}
}
}
`,
},
value: {
name: 'value',
source: MessageSchemaSourceEnum.SCHEMA_REGISTRY,
schema: `{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "http://example.com/myURI1.schema.json",
"title": "TestRecord",
"type": "object",
"additionalProperties": false,
"properties": {
"f1": {
"type": "integer"
},
"f2": {
"type": "string"
},
"schema": {
"type": "string"
}
}
}
`,
},
}}
schemaIsFetched={false}
messageIsSending={false}
partitions={[
{
partition: 0,
leader: 2,
replicas: [
{
broker: 2,
leader: false,
inSync: true,
},
],
offsetMax: 0,
offsetMin: 0,
},
{
partition: 1,
leader: 1,
replicas: [
{
broker: 1,
leader: false,
inSync: true,
},
],
offsetMax: 0,
offsetMin: 0,
},
]}
{...props}
/>
);
const clusterName = 'testCluster';
const topicName = externalTopicPayload.name;
const history = createMemoryHistory();
const renderComponent = () => {
history.push(clusterTopicSendMessagePath(clusterName, topicName));
render(
<Router history={history}>
<Route path={clusterTopicSendMessagePath(':clusterName', ':topicName')}>
<SendMessage />
</Route>
</Router>
);
};
describe('SendMessage', () => {
it('calls fetchTopicMessageSchema on first render', () => {
const fetchTopicMessageSchemaMock = jest.fn();
render(
setupWrapper({ fetchTopicMessageSchema: fetchTopicMessageSchemaMock })
beforeAll(() => {
store.dispatch(
fetchTopicDetailsAction.success({
...initialState,
byName: {
[externalTopicPayload.name]: externalTopicPayload,
},
})
);
expect(fetchTopicMessageSchemaMock).toHaveBeenCalledTimes(1);
});
afterEach(() => {
fetchMock.reset();
});
it('fetches schema on first render', () => {
const fetchTopicMessageSchemaMock = fetchMock.getOnce(
`/api/clusters/${clusterName}/topics/${topicName}/messages/schema`,
testSchema
);
renderComponent();
expect(fetchTopicMessageSchemaMock.called()).toBeTruthy();
});
describe('when schema is fetched', () => {
it('calls sendTopicMessage on submit', async () => {
jest.mock('../validateMessage', () => jest.fn().mockReturnValue(true));
const mockSendTopicMessage = jest.fn();
render(
setupWrapper({
schemaIsFetched: true,
sendTopicMessage: mockSendTopicMessage,
})
beforeEach(() => {
fetchMock.getOnce(
`/api/clusters/${clusterName}/topics/${topicName}/messages/schema`,
testSchema
);
});
it('calls sendTopicMessage on submit', async () => {
const sendTopicMessageMock = fetchMock.postOnce(
`/api/clusters/${clusterName}/topics/${topicName}/messages`,
200
);
renderComponent();
await waitForElementToBeRemoved(() => screen.getByRole('progressbar'));
userEvent.selectOptions(screen.getByLabelText('Partition'), '0');
await screen.findByText('Send');
userEvent.click(screen.getByText('Send'));
await waitFor(() => expect(sendTopicMessageMock.called()).toBeTruthy());
expect(history.location.pathname).toEqual(
clusterTopicMessagesPath(clusterName, topicName)
);
userEvent.selectOptions(screen.getByLabelText('Partition'), '1');
await waitFor(async () => {
userEvent.click(await screen.findByText('Send'));
expect(mockSendTopicMessage).toHaveBeenCalledTimes(1);
});
});
});
});

View file

@ -3,45 +3,34 @@ import validateMessage from 'components/Topics/Topic/SendMessage/validateMessage
import { testSchema } from './fixtures';
describe('validateMessage', () => {
it('returns true on correct input data', async () => {
const mockSetError = jest.fn();
expect(
await validateMessage(
`{
"f1": 32,
"f2": "multi-state",
"schema": "Bedfordshire violet SAS"
}`,
`{
"f1": 21128,
"f2": "Health Berkshire Re-engineered",
"schema": "Dynamic Greenland Beauty"
}`,
testSchema,
mockSetError
)
).toBe(true);
expect(mockSetError).toHaveBeenCalledTimes(1);
it('returns no errors on correct input data', () => {
const key = `{"f1": 32, "f2": "multi-state", "schema": "Bedfordshire violet SAS"}`;
const content = `{"f1": 21128, "f2": "Health Berkshire", "schema": "Dynamic"}`;
expect(validateMessage(key, content, testSchema)).toEqual([]);
});
it('returns false on incorrect input data', async () => {
const mockSetError = jest.fn();
expect(
await validateMessage(
`{
"f1": "32",
"f2": "multi-state",
"schema": "Bedfordshire violet SAS"
}`,
`{
"f1": "21128",
"f2": "Health Berkshire Re-engineered",
"schema": "Dynamic Greenland Beauty"
}`,
testSchema,
mockSetError
)
).toBe(false);
expect(mockSetError).toHaveBeenCalledTimes(3);
it('returns errors on invalid input data', () => {
const key = `{"f1": "32", "f2": "multi-state", "schema": "Bedfordshire violet SAS"}`;
const content = `{"f1": "21128", "f2": "Health Berkshire", "schema": "Dynamic"}`;
expect(validateMessage(key, content, testSchema)).toEqual([
'Key/properties/f1/type - must be integer',
'Content/properties/f1/type - must be integer',
]);
});
it('returns error on broken key value', () => {
const key = `{"f1": "32", "f2": "multi-state", "schema": "Bedfordshire violet SAS"`;
const content = `{"f1": 21128, "f2": "Health Berkshire", "schema": "Dynamic"}`;
expect(validateMessage(key, content, testSchema)).toEqual([
'Error in parsing the "key" field value',
]);
});
it('returns error on broken content value', () => {
const key = `{"f1": 32, "f2": "multi-state", "schema": "Bedfordshire violet SAS"}`;
const content = `{"f1": 21128, "f2": "Health Berkshire", "schema": "Dynamic"`;
expect(validateMessage(key, content, testSchema)).toEqual([
'Error in parsing the "content" field value',
]);
});
});

View file

@ -1,71 +1,57 @@
import { TopicMessageSchema } from 'generated-sources';
import Ajv from 'ajv/dist/2020';
import { upperFirst } from 'lodash';
const validateMessage = async (
key: string,
content: string,
messageSchema: TopicMessageSchema | undefined,
setSchemaErrors: React.Dispatch<React.SetStateAction<string[]>>
): Promise<boolean> => {
setSchemaErrors([]);
const keyAjv = new Ajv();
const contentAjv = new Ajv();
try {
if (messageSchema) {
let keyIsValid = false;
let contentIsValid = false;
const validateBySchema = (
value: string,
schema: string | undefined,
type: 'key' | 'content'
) => {
let errors: string[] = [];
try {
const keySchema = JSON.parse(messageSchema.key.schema);
const validateKey = keyAjv.compile(keySchema);
if (keySchema.type === 'string') {
keyIsValid = true;
} else {
keyIsValid = validateKey(JSON.parse(key));
}
if (!keyIsValid) {
const errorString: string[] = [];
if (validateKey.errors) {
validateKey.errors.forEach((e) => {
errorString.push(
`${e.schemaPath.replace('#', 'Key')} ${e.message}`
);
});
setSchemaErrors((e) => [...e, ...errorString]);
}
}
} catch (err) {
setSchemaErrors((e) => [...e, `Key ${err.message}`]);
}
try {
const contentSchema = JSON.parse(messageSchema.value.schema);
const validateContent = contentAjv.compile(contentSchema);
if (contentSchema.type === 'string') {
contentIsValid = true;
} else {
contentIsValid = validateContent(JSON.parse(content));
}
if (!contentIsValid) {
const errorString: string[] = [];
if (validateContent.errors) {
validateContent.errors.forEach((e) => {
errorString.push(
`${e.schemaPath.replace('#', 'Content')} ${e.message}`
);
});
setSchemaErrors((e) => [...e, ...errorString]);
}
}
} catch (err) {
setSchemaErrors((e) => [...e, `Content ${err.message}`]);
}
return keyIsValid && contentIsValid;
}
} catch (err) {
setSchemaErrors((e) => [...e, err.message]);
if (!value || !schema) {
return errors;
}
return false;
let parcedSchema;
let parsedValue;
try {
parcedSchema = JSON.parse(schema);
} catch (e) {
return [`Error in parsing the "${type}" field schema`];
}
if (parcedSchema.type === 'string') {
return [];
}
try {
parsedValue = JSON.parse(value);
} catch (e) {
return [`Error in parsing the "${type}" field value`];
}
try {
const validate = new Ajv().compile(parcedSchema);
validate(parsedValue);
if (validate.errors) {
errors = validate.errors.map(
({ schemaPath, message }) =>
`${schemaPath.replace('#', upperFirst(type))} - ${message}`
);
}
} catch (e) {
return [`${upperFirst(type)} ${e.message}`];
}
return errors;
};
const validateMessage = (
key: string,
content: string,
messageSchema: TopicMessageSchema | undefined
): string[] => [
...validateBySchema(key, messageSchema?.key?.schema, 'key'),
...validateBySchema(content, messageSchema?.value?.schema, 'content'),
];
export default validateMessage;

View file

@ -5,7 +5,7 @@ import EditContainer from 'components/Topics/Topic/Edit/EditContainer';
import DetailsContainer from 'components/Topics/Topic/Details/DetailsContainer';
import PageLoader from 'components/common/PageLoader/PageLoader';
import SendMessageContainer from './SendMessage/SendMessageContainer';
import SendMessage from './SendMessage/SendMessage';
interface RouterParams {
clusterName: ClusterName;
@ -41,7 +41,7 @@ const Topic: React.FC<TopicProps> = ({
<Route
exact
path="/ui/clusters/:clusterName/topics/:topicName/message"
component={SendMessageContainer}
component={SendMessage}
/>
<Route
path="/ui/clusters/:clusterName/topics/:topicName"

View file

@ -211,57 +211,6 @@ describe('Thunks', () => {
]);
});
});
describe('sendTopicMessage', () => {
it('creates SEND_TOPIC_MESSAGE__FAILURE', async () => {
fetchMock.postOnce(
`/api/clusters/${clusterName}/topics/${topicName}/messages`,
404
);
try {
await store.dispatch(
thunks.sendTopicMessage(clusterName, topicName, {
key: '{}',
content: '{}',
headers: undefined,
partition: 0,
})
);
} catch (error) {
const err = error as Response;
expect(err.status).toEqual(404);
expect(store.getActions()).toEqual([
actions.sendTopicMessageAction.request(),
actions.sendTopicMessageAction.failure({
alert: {
subject: ['topic', topicName].join('-'),
title: `Topic Message ${topicName}`,
response: err,
},
}),
]);
}
});
it('creates SEND_TOPIC_MESSAGE__SUCCESS', async () => {
fetchMock.postOnce(
`/api/clusters/${clusterName}/topics/${topicName}/messages`,
200
);
await store.dispatch(
thunks.sendTopicMessage(clusterName, topicName, {
key: '{}',
content: '{}',
headers: undefined,
partition: 0,
})
);
expect(store.getActions()).toEqual([
actions.sendTopicMessageAction.request(),
actions.sendTopicMessageAction.success(),
]);
});
});
describe('increasing partitions count', () => {
it('calls updateTopicPartitionsCountAction.success on success', async () => {
fetchMock.patchOnce(

View file

@ -219,12 +219,6 @@ export const fetchTopicMessageSchemaAction = createAsyncAction(
{ alert?: FailurePayload }
>();
export const sendTopicMessageAction = createAsyncAction(
'SEND_TOPIC_MESSAGE__REQUEST',
'SEND_TOPIC_MESSAGE__SUCCESS',
'SEND_TOPIC_MESSAGE__FAILURE'
)<undefined, undefined, { alert?: FailurePayload }>();
export const updateTopicPartitionsCountAction = createAsyncAction(
'UPDATE_PARTITIONS__REQUEST',
'UPDATE_PARTITIONS__SUCCESS',

View file

@ -8,7 +8,6 @@ import {
TopicUpdate,
TopicConfig,
ConsumerGroupsApi,
CreateTopicMessage,
GetTopicsRequest,
} from 'generated-sources';
import {
@ -318,36 +317,6 @@ export const fetchTopicMessageSchema =
}
};
export const sendTopicMessage =
(
clusterName: ClusterName,
topicName: TopicName,
payload: CreateTopicMessage
): PromiseThunkResult =>
async (dispatch) => {
dispatch(actions.sendTopicMessageAction.request());
try {
await messagesApiClient.sendTopicMessages({
clusterName,
topicName,
createTopicMessage: {
key: payload.key,
content: payload.content,
headers: payload.headers,
partition: payload.partition,
},
});
dispatch(actions.sendTopicMessageAction.success());
} catch (e) {
const response = await getResponse(e);
const alert: FailurePayload = {
subject: ['topic', topicName].join('-'),
title: `Topic Message ${topicName}`,
response,
};
dispatch(actions.sendTopicMessageAction.failure({ alert }));
}
};
export const updateTopicPartitionsCount =
(
clusterName: ClusterName,

View file

@ -1,4 +1,8 @@
import { createEntityAdapter, createSlice } from '@reduxjs/toolkit';
import {
createEntityAdapter,
createSlice,
PayloadAction,
} from '@reduxjs/toolkit';
import { UnknownAsyncThunkRejectedWithValueAction } from '@reduxjs/toolkit/dist/matchers';
import { now } from 'lodash';
import { Alert, RootState, ServerResponse } from 'redux/interfaces';
@ -19,6 +23,9 @@ const alertsSlice = createSlice({
initialState: alertsAdapter.getInitialState(),
reducers: {
alertDissmissed: alertsAdapter.removeOne,
alertAdded(state, action: PayloadAction<Alert>) {
alertsAdapter.upsertOne(state, action.payload);
},
},
extraReducers: (builder) => {
builder.addMatcher(
@ -47,6 +54,6 @@ export const { selectAll } = alertsAdapter.getSelectors<RootState>(
(state) => state.alerts
);
export const { alertDissmissed } = alertsSlice.actions;
export const { alertDissmissed, alertAdded } = alertsSlice.actions;
export default alertsSlice.reducer;

View file

@ -26,8 +26,6 @@ const getTopicCreationStatus = createLeagcyFetchingSelector('POST_TOPIC');
const getTopicUpdateStatus = createLeagcyFetchingSelector('PATCH_TOPIC');
const getTopicMessageSchemaFetchingStatus =
createLeagcyFetchingSelector('GET_TOPIC_SCHEMA');
const getTopicMessageSendingStatus =
createLeagcyFetchingSelector('SEND_TOPIC_MESSAGE');
const getPartitionsCountIncreaseStatus =
createLeagcyFetchingSelector('UPDATE_PARTITIONS');
const getReplicationFactorUpdateStatus = createLeagcyFetchingSelector(
@ -80,11 +78,6 @@ export const getTopicMessageSchemaFetched = createSelector(
(status) => status === 'fetched'
);
export const getTopicMessageSending = createSelector(
getTopicMessageSendingStatus,
(status) => status === 'fetching'
);
export const getTopicPartitionsCountIncreased = createSelector(
getPartitionsCountIncreaseStatus,
(status) => status === 'fetched'