diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java index aae652cd08..2b256d6eb0 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java @@ -1,5 +1,6 @@ package com.provectus.kafka.ui.config; +import com.fasterxml.jackson.databind.Module; import com.provectus.kafka.ui.model.JmxConnectionInfo; import com.provectus.kafka.ui.util.JmxPoolFactory; import java.util.Collections; @@ -9,6 +10,7 @@ import lombok.AllArgsConstructor; import org.apache.commons.pool2.KeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; +import org.openapitools.jackson.nullable.JsonNullableModule; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.web.ServerProperties; @@ -78,4 +80,9 @@ public class Config { .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes())) .build(); } + + @Bean + public JsonNullableModule jsonNullableModule() { + return new JsonNullableModule(); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java index ca4ff8078a..cbf3638c0f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/DeserializationService.java @@ -39,7 +39,7 @@ public class DeserializationService { objectMapper); } else { log.info("Using SchemaRegistryAwareRecordSerDe for cluster '{}'", cluster.getName()); - return new SchemaRegistryAwareRecordSerDe(cluster); + return new SchemaRegistryAwareRecordSerDe(cluster, objectMapper); } } catch (Throwable e) { throw new RuntimeException("Can't init deserializer", e); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/SimpleRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/SimpleRecordSerDe.java index fb8190a1ea..85a8a7340d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/SimpleRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/SimpleRecordSerDe.java @@ -13,6 +13,8 @@ import org.apache.kafka.common.utils.Bytes; public class SimpleRecordSerDe implements RecordSerDe { + private static final ObjectMapper objectMapper = new ObjectMapper(); + @Override public DeserializedKeyValue deserialize(ConsumerRecord msg) { var builder = DeserializedKeyValue.builder(); @@ -45,7 +47,7 @@ public class SimpleRecordSerDe implements RecordSerDe { final MessageSchemaDTO schema = new MessageSchemaDTO() .name("unknown") .source(MessageSchemaDTO.SourceEnum.UNKNOWN) - .schema(JsonSchema.stringSchema().toJson(new ObjectMapper())); + .schema(JsonSchema.stringSchema().toJson(objectMapper)); return new TopicMessageSchemaDTO() .key(schema) .value(schema); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java index 9d54b11424..c754b2b68c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java @@ -10,7 +10,6 @@ import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.MessageSchemaDTO; import com.provectus.kafka.ui.model.TopicMessageSchemaDTO; import com.provectus.kafka.ui.serde.RecordSerDe; -import com.provectus.kafka.ui.util.ConsumerRecordUtil; import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter; import com.provectus.kafka.ui.util.jsonschema.JsonSchema; import com.provectus.kafka.ui.util.jsonschema.ProtobufSchemaConverter; @@ -27,7 +26,6 @@ import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import java.net.URI; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,31 +45,32 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { private static final int CLIENT_IDENTITY_MAP_CAPACITY = 100; + private static final StringMessageFormatter stringFormatter = new StringMessageFormatter(); + private static final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter(); + private static final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter(); + private final KafkaCluster cluster; private final Map valueFormatMap = new ConcurrentHashMap<>(); private final Map keyFormatMap = new ConcurrentHashMap<>(); @Nullable private final SchemaRegistryClient schemaRegistryClient; - @Nullable private final AvroMessageFormatter avroFormatter; - @Nullable private final ProtobufMessageFormatter protobufFormatter; - @Nullable private final JsonSchemaMessageFormatter jsonSchemaMessageFormatter; - private final StringMessageFormatter stringFormatter = new StringMessageFormatter(); - private final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter(); - private final AvroJsonSchemaConverter avroSchemaConverter = new AvroJsonSchemaConverter(); - private final ObjectMapper objectMapper = new ObjectMapper(); + private ObjectMapper objectMapper; - private static SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster) { + private SchemaRegistryClient createSchemaRegistryClient(KafkaCluster cluster, + ObjectMapper objectMapper) { if (cluster.getSchemaRegistry() == null) { throw new ValidationException("schemaRegistry is not specified"); } + this.objectMapper = objectMapper; + List schemaProviders = List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider()); @@ -97,10 +96,10 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { ); } - public SchemaRegistryAwareRecordSerDe(KafkaCluster cluster) { + public SchemaRegistryAwareRecordSerDe(KafkaCluster cluster, ObjectMapper objectMapper) { this.cluster = cluster; this.schemaRegistryClient = cluster.getSchemaRegistry() != null - ? createSchemaRegistryClient(cluster) + ? createSchemaRegistryClient(cluster, objectMapper) : null; if (schemaRegistryClient != null) { this.avroFormatter = new AvroMessageFormatter(schemaRegistryClient); @@ -147,41 +146,45 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe { @Nullable String key, @Nullable String data, @Nullable Integer partition) { - final Optional maybeValueSchema = getSchemaBySubject(topic, false); final Optional maybeKeySchema = getSchemaBySubject(topic, true); + final Optional maybeValueSchema = getSchemaBySubject(topic, false); - final byte[] serializedValue = data != null - ? serialize(maybeValueSchema, topic, data, false) - : null; - final byte[] serializedKey = key != null - ? serialize(maybeKeySchema, topic, key, true) - : null; + final byte[] serializedKey = maybeKeySchema.isPresent() + ? serialize(maybeKeySchema.get(), topic, key, true) + : serialize(key); + + final byte[] serializedValue = maybeValueSchema.isPresent() + ? serialize(maybeValueSchema.get(), topic, data, false) + : serialize(data); return new ProducerRecord<>(topic, partition, serializedKey, serializedValue); } @SneakyThrows - private byte[] serialize( - Optional maybeSchema, String topic, String value, boolean isKey) { - if (maybeSchema.isPresent()) { - final SchemaMetadata schema = maybeSchema.get(); - - MessageReader reader; - if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) { - reader = new ProtobufMessageReader(topic, isKey, schemaRegistryClient, schema); - } else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) { - reader = new AvroMessageReader(topic, isKey, schemaRegistryClient, schema); - } else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) { - reader = new JsonSchemaMessageReader(topic, isKey, schemaRegistryClient, schema); - } else { - throw new IllegalStateException("Unsupported schema type: " + schema.getSchemaType()); - } - - return reader.read(value); - } else { - // if no schema provided serialize input as raw string - return value.getBytes(); + private byte[] serialize(SchemaMetadata schema, String topic, String value, boolean isKey) { + if (value == null) { + return null; } + MessageReader reader; + if (schema.getSchemaType().equals(MessageFormat.PROTOBUF.name())) { + reader = new ProtobufMessageReader(topic, isKey, schemaRegistryClient, schema); + } else if (schema.getSchemaType().equals(MessageFormat.AVRO.name())) { + reader = new AvroMessageReader(topic, isKey, schemaRegistryClient, schema); + } else if (schema.getSchemaType().equals(MessageFormat.JSON.name())) { + reader = new JsonSchemaMessageReader(topic, isKey, schemaRegistryClient, schema); + } else { + throw new IllegalStateException("Unsupported schema type: " + schema.getSchemaType()); + } + + return reader.read(value); + } + + private byte[] serialize(String value) { + if (value == null) { + return null; + } + // if no schema provided serialize input as raw string + return value.getBytes(); } @Override diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 923b4ad178..04998d4bb5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -81,9 +81,6 @@ public class MessagesService { public Mono sendMessage(KafkaCluster cluster, String topic, CreateTopicMessageDTO msg) { - if (msg.getKey() == null && msg.getContent() == null) { - throw new ValidationException("Invalid message: both key and value can't be null"); - } if (msg.getPartition() != null && msg.getPartition() > metricsCache.get(cluster).getTopicDescriptions() .get(topic).partitions().size() - 1) { @@ -100,8 +97,8 @@ public class MessagesService { try (KafkaProducer producer = new KafkaProducer<>(properties)) { ProducerRecord producerRecord = serde.serialize( topic, - msg.getKey(), - msg.getContent(), + msg.getKey().orElse(null), + msg.getContent().orElse(null), msg.getPartition() ); producerRecord = new ProducerRecord<>( diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SchemaRegistryRecordDeserializerTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SchemaRegistryRecordDeserializerTest.java index 7aaa94ecd7..227bb964e0 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SchemaRegistryRecordDeserializerTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/serde/SchemaRegistryRecordDeserializerTest.java @@ -17,7 +17,7 @@ class SchemaRegistryRecordDeserializerTest { new SchemaRegistryAwareRecordSerDe( KafkaCluster.builder() .schemaNameTemplate("%s-value") - .build() + .build(), new ObjectMapper() ); @Test diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java index 4ddacc6b1a..864c191248 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java @@ -465,6 +465,20 @@ public class SendAndReadTests extends AbstractBaseTest { }); } + @Test + void noKeyAndNoContentPresentTest() { + new SendAndReadSpec() + .withMsgToSend( + new CreateTopicMessageDTO() + .key(null) + .content(null) + ) + .doAssert(polled -> { + assertThat(polled.getKey()).isNull(); + assertThat(polled.getContent()).isNull(); + }); + } + @SneakyThrows private void assertJsonEqual(String actual, String expected) { var mapper = new ObjectMapper(); diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 88a48578b1..5c46e0be74 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1870,12 +1870,16 @@ components: type: integer key: type: string + nullable: true headers: type: object additionalProperties: type: string content: type: string + nullable: true + required: + - partition TopicMessageSchema: type: object @@ -2635,4 +2639,4 @@ components: - DELETE - COMPACT - COMPACT_DELETE - - UNKNOWN \ No newline at end of file + - UNKNOWN diff --git a/kafka-ui-react-app/src/components/Alerts/Alert.styled.ts b/kafka-ui-react-app/src/components/Alerts/Alert.styled.ts index 7f8133972a..141f696948 100644 --- a/kafka-ui-react-app/src/components/Alerts/Alert.styled.ts +++ b/kafka-ui-react-app/src/components/Alerts/Alert.styled.ts @@ -3,7 +3,7 @@ import styled from 'styled-components'; export const Alert = styled.div<{ $type: AlertType }>` background-color: ${({ $type, theme }) => theme.alert.color[$type]}; - width: 400px; + min-width: 400px; min-height: 64px; border-radius: 8px; padding: 12px; @@ -20,8 +20,14 @@ export const Title = styled.div` font-size: 14px; `; -export const Message = styled.p` +export const Message = styled.div` font-weight: normal; font-size: 14px; margin: 3px 0; + + ol, + ul { + padding-left: 25px; + list-style: auto; + } `; diff --git a/kafka-ui-react-app/src/components/App.styled.ts b/kafka-ui-react-app/src/components/App.styled.ts index b79b5a3ff4..5642945fc2 100644 --- a/kafka-ui-react-app/src/components/App.styled.ts +++ b/kafka-ui-react-app/src/components/App.styled.ts @@ -168,7 +168,7 @@ export const AlertsContainer = styled.div` width: 500px; position: fixed; bottom: 15px; - left: 15px; + right: 15px; z-index: 1000; @media screen and (max-width: 1023px) { diff --git a/kafka-ui-react-app/src/components/Topics/Topic/SendMessage/SendMessage.tsx b/kafka-ui-react-app/src/components/Topics/Topic/SendMessage/SendMessage.tsx index 18aaff3f6a..ee2f9ae038 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/SendMessage/SendMessage.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/SendMessage/SendMessage.tsx @@ -1,47 +1,32 @@ import JSONEditor from 'components/common/JSONEditor/JSONEditor'; import PageLoader from 'components/common/PageLoader/PageLoader'; -import { - CreateTopicMessage, - Partition, - TopicMessageSchema, -} from 'generated-sources'; import React from 'react'; import { useForm, Controller } from 'react-hook-form'; -import { useHistory } from 'react-router'; +import { useHistory, useParams } from 'react-router'; import { clusterTopicMessagesPath } from 'lib/paths'; import jsf from 'json-schema-faker'; +import { fetchTopicMessageSchema, messagesApiClient } from 'redux/actions'; +import { useAppDispatch, useAppSelector } from 'lib/hooks/redux'; +import { alertAdded } from 'redux/reducers/alerts/alertsSlice'; +import { now } from 'lodash'; +import { Button } from 'components/common/Button/Button'; +import { ClusterName, TopicName } from 'redux/interfaces'; +import { + getMessageSchemaByTopicName, + getPartitionsByTopicName, + getTopicMessageSchemaFetched, +} from 'redux/reducers/topics/selectors'; import validateMessage from './validateMessage'; -export interface Props { - clusterName: string; - topicName: string; - fetchTopicMessageSchema: (clusterName: string, topicName: string) => void; - sendTopicMessage: ( - clusterName: string, - topicName: string, - payload: CreateTopicMessage - ) => void; - messageSchema: TopicMessageSchema | undefined; - schemaIsFetched: boolean; - messageIsSending: boolean; - partitions: Partition[]; +interface RouterParams { + clusterName: ClusterName; + topicName: TopicName; } -const SendMessage: React.FC = ({ - clusterName, - topicName, - fetchTopicMessageSchema, - sendTopicMessage, - messageSchema, - schemaIsFetched, - messageIsSending, - partitions, -}) => { - const [keyExampleValue, setKeyExampleValue] = React.useState(''); - const [contentExampleValue, setContentExampleValue] = React.useState(''); - const [schemaIsReady, setSchemaIsReady] = React.useState(false); - const [schemaErrors, setSchemaErrors] = React.useState([]); +const SendMessage: React.FC = () => { + const dispatch = useAppDispatch(); + const { clusterName, topicName } = useParams(); const { register, handleSubmit, @@ -54,27 +39,38 @@ const SendMessage: React.FC = ({ jsf.option('alwaysFakeOptionals', true); React.useEffect(() => { - fetchTopicMessageSchema(clusterName, topicName); + dispatch(fetchTopicMessageSchema(clusterName, topicName)); }, []); - React.useEffect(() => { - if (schemaIsFetched && messageSchema) { - setKeyExampleValue( - JSON.stringify( - jsf.generate(JSON.parse(messageSchema.key.schema)), - null, - '\t' - ) - ); - setContentExampleValue( - JSON.stringify( - jsf.generate(JSON.parse(messageSchema.value.schema)), - null, - '\t' - ) - ); - setSchemaIsReady(true); + + const messageSchema = useAppSelector((state) => + getMessageSchemaByTopicName(state, topicName) + ); + const partitions = useAppSelector((state) => + getPartitionsByTopicName(state, topicName) + ); + const schemaIsFetched = useAppSelector(getTopicMessageSchemaFetched); + + const keyDefaultValue = React.useMemo(() => { + if (!schemaIsFetched || !messageSchema) { + return undefined; } - }, [schemaIsFetched]); + return JSON.stringify( + jsf.generate(JSON.parse(messageSchema.key.schema)), + null, + '\t' + ); + }, [messageSchema, schemaIsFetched]); + + const contentDefaultValue = React.useMemo(() => { + if (!schemaIsFetched || !messageSchema) { + return undefined; + } + return JSON.stringify( + jsf.generate(JSON.parse(messageSchema.value.schema)), + null, + '\t' + ); + }, [messageSchema, schemaIsFetched]); const onSubmit = async (data: { key: string; @@ -83,30 +79,55 @@ const SendMessage: React.FC = ({ partition: number; }) => { if (messageSchema) { - const key = data.key || keyExampleValue; - const content = data.content || contentExampleValue; - const { partition } = data; + const { partition, key, content } = data; const headers = data.headers ? JSON.parse(data.headers) : undefined; - const messageIsValid = await validateMessage( - key, - content, - messageSchema, - setSchemaErrors - ); - - if (messageIsValid) { - sendTopicMessage(clusterName, topicName, { - key, - content, - headers, - partition, - }); - history.push(clusterTopicMessagesPath(clusterName, topicName)); + const errors = validateMessage(key, content, messageSchema); + if (errors.length > 0) { + dispatch( + alertAdded({ + id: `${clusterName}-${topicName}-createTopicMessageError`, + type: 'error', + title: 'Validation Error', + message: ( +
    + {errors.map((e) => ( +
  • {e}
  • + ))} +
+ ), + createdAt: now(), + }) + ); + return; } + + try { + await messagesApiClient.sendTopicMessages({ + clusterName, + topicName, + createTopicMessage: { + key: !key ? null : key, + content: !content ? null : content, + headers, + partition, + }, + }); + } catch (e) { + dispatch( + alertAdded({ + id: `${clusterName}-${topicName}-sendTopicMessagesError`, + type: 'error', + title: `Error in sending a message to ${topicName}`, + message: e?.message, + createdAt: now(), + }) + ); + } + history.push(clusterTopicMessagesPath(clusterName, topicName)); } }; - if (!schemaIsReady) { + if (!schemaIsFetched) { return ; } return ( @@ -121,7 +142,7 @@ const SendMessage: React.FC = ({