123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567 |
- package com.provectus.kafka.ui.service;
- import static org.assertj.core.api.Assertions.assertThat;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.provectus.kafka.ui.AbstractBaseTest;
- import com.provectus.kafka.ui.model.ConsumerPosition;
- import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
- import com.provectus.kafka.ui.model.KafkaCluster;
- import com.provectus.kafka.ui.model.MessageFormatDTO;
- import com.provectus.kafka.ui.model.SeekDirectionDTO;
- import com.provectus.kafka.ui.model.SeekTypeDTO;
- import com.provectus.kafka.ui.model.TopicMessageDTO;
- import com.provectus.kafka.ui.model.TopicMessageEventDTO;
- import io.confluent.kafka.schemaregistry.ParsedSchema;
- import io.confluent.kafka.schemaregistry.avro.AvroSchema;
- import io.confluent.kafka.schemaregistry.json.JsonSchema;
- import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
- import java.time.Duration;
- import java.util.Map;
- import java.util.Objects;
- import java.util.UUID;
- import java.util.function.Consumer;
- import lombok.SneakyThrows;
- import org.apache.kafka.clients.admin.NewTopic;
- import org.apache.kafka.common.TopicPartition;
- import org.junit.jupiter.api.BeforeEach;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.test.context.ContextConfiguration;
- import reactor.test.StepVerifier;
- @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
- public class SendAndReadTests extends AbstractBaseTest {
- private static final AvroSchema AVRO_SCHEMA_1 = new AvroSchema(
- "{"
- + " \"type\": \"record\","
- + " \"name\": \"TestAvroRecord1\","
- + " \"fields\": ["
- + " {"
- + " \"name\": \"field1\","
- + " \"type\": \"string\""
- + " },"
- + " {"
- + " \"name\": \"field2\","
- + " \"type\": \"int\""
- + " }"
- + " ]"
- + "}"
- );
- private static final AvroSchema AVRO_SCHEMA_2 = new AvroSchema(
- "{"
- + " \"type\": \"record\","
- + " \"name\": \"TestAvroRecord2\","
- + " \"fields\": ["
- + " {"
- + " \"name\": \"f1\","
- + " \"type\": \"int\""
- + " },"
- + " {"
- + " \"name\": \"f2\","
- + " \"type\": \"string\""
- + " }"
- + " ]"
- + "}"
- );
- private static final AvroSchema AVRO_SCHEMA_PRIMITIVE_STRING =
- new AvroSchema("{ \"type\": \"string\" }");
- private static final AvroSchema AVRO_SCHEMA_PRIMITIVE_INT =
- new AvroSchema("{ \"type\": \"int\" }");
- private static final String AVRO_SCHEMA_1_JSON_RECORD
- = "{ \"field1\":\"testStr\", \"field2\": 123 }";
- private static final String AVRO_SCHEMA_2_JSON_RECORD = "{ \"f1\": 111, \"f2\": \"testStr\" }";
- private static final ProtobufSchema PROTOBUF_SCHEMA = new ProtobufSchema(
- "syntax = \"proto3\";\n"
- + "package com.provectus;\n"
- + "\n"
- + "message TestProtoRecord {\n"
- + " string f1 = 1;\n"
- + " int32 f2 = 2;\n"
- + "}\n"
- + "\n"
- );
- private static final String PROTOBUF_SCHEMA_JSON_RECORD
- = "{ \"f1\" : \"test str\", \"f2\" : 123 }";
- private static final JsonSchema JSON_SCHEMA = new JsonSchema(
- "{ "
- + " \"$schema\": \"http://json-schema.org/draft-07/schema#\", "
- + " \"$id\": \"http://example.com/myURI.schema.json\", "
- + " \"title\": \"TestRecord\","
- + " \"type\": \"object\","
- + " \"additionalProperties\": false,"
- + " \"properties\": {"
- + " \"f1\": {"
- + " \"type\": \"integer\""
- + " },"
- + " \"f2\": {"
- + " \"type\": \"string\""
- + " },"
- // it is important special case since there is code in KafkaJsonSchemaSerializer
- // that checks fields with this name (it should be worked around)
- + " \"schema\": {"
- + " \"type\": \"string\""
- + " }"
- + " }"
- + "}"
- );
- private static final String JSON_SCHEMA_RECORD
- = "{ \"f1\": 12, \"f2\": \"testJsonSchema1\", \"schema\": \"some txt\" }";
- private KafkaCluster targetCluster;
- @Autowired
- private MessagesService messagesService;
- @Autowired
- private ClustersStorage clustersStorage;
- @Autowired
- private ClustersMetricsScheduler clustersMetricsScheduler;
- @BeforeEach
- void init() {
- targetCluster = clustersStorage.getClusterByName(LOCAL).get();
- }
- @Test
- void noSchemaStringKeyStringValue() {
- new SendAndReadSpec()
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key("testKey")
- .content("testValue")
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isEqualTo("testKey");
- assertThat(polled.getContent()).isEqualTo("testValue");
- });
- }
- @Test
- void noSchemaJsonKeyJsonValue() {
- new SendAndReadSpec()
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key("{ \"f1\": 111, \"f2\": \"testStr1\" }")
- .content("{ \"f1\": 222, \"f2\": \"testStr2\" }")
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isEqualTo("{ \"f1\": 111, \"f2\": \"testStr1\" }");
- assertThat(polled.getContent()).isEqualTo("{ \"f1\": 222, \"f2\": \"testStr2\" }");
- });
- }
- @Test
- void keyIsIntValueIsDoubleShouldBeSerializedAsStrings() {
- new SendAndReadSpec()
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key("123")
- .content("234.56")
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isEqualTo("123");
- assertThat(polled.getContent()).isEqualTo("234.56");
- });
- }
- @Test
- void noSchemaKeyIsNull() {
- new SendAndReadSpec()
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(null)
- .content("testValue")
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isNull();
- assertThat(polled.getContent()).isEqualTo("testValue");
- });
- }
- @Test
- void noSchemaValueIsNull() {
- new SendAndReadSpec()
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key("testKey")
- .content(null)
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isEqualTo("testKey");
- assertThat(polled.getContent()).isNull();
- });
- }
- @Test
- void primitiveAvroSchemas() {
- new SendAndReadSpec()
- .withKeySchema(AVRO_SCHEMA_PRIMITIVE_STRING)
- .withValueSchema(AVRO_SCHEMA_PRIMITIVE_INT)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key("\"some string\"")
- .content("123")
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isEqualTo("\"some string\"");
- assertThat(polled.getContent()).isEqualTo("123");
- });
- }
- @Test
- void nonNullableKvWithAvroSchema() {
- new SendAndReadSpec()
- .withKeySchema(AVRO_SCHEMA_1)
- .withValueSchema(AVRO_SCHEMA_2)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(AVRO_SCHEMA_1_JSON_RECORD)
- .content(AVRO_SCHEMA_2_JSON_RECORD)
- )
- .doAssert(polled -> {
- assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
- assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
- });
- }
- @Test
- void keyWithNoSchemaValueWithAvroSchema() {
- new SendAndReadSpec()
- .withValueSchema(AVRO_SCHEMA_1)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key("testKey")
- .content(AVRO_SCHEMA_1_JSON_RECORD)
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isEqualTo("testKey");
- assertJsonEqual(polled.getContent(), AVRO_SCHEMA_1_JSON_RECORD);
- });
- }
- @Test
- void keyWithAvroSchemaValueWithNoSchema() {
- new SendAndReadSpec()
- .withKeySchema(AVRO_SCHEMA_1)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(AVRO_SCHEMA_1_JSON_RECORD)
- .content("testVal")
- )
- .doAssert(polled -> {
- assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
- assertThat(polled.getContent()).isEqualTo("testVal");
- });
- }
- @Test
- void keyWithNoSchemaValueWithProtoSchema() {
- new SendAndReadSpec()
- .withValueSchema(PROTOBUF_SCHEMA)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key("testKey")
- .content(PROTOBUF_SCHEMA_JSON_RECORD)
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isEqualTo("testKey");
- assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
- });
- }
- @Test
- void keyWithAvroSchemaValueWithAvroSchemaKeyIsNull() {
- new SendAndReadSpec()
- .withKeySchema(AVRO_SCHEMA_1)
- .withValueSchema(AVRO_SCHEMA_2)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(null)
- .content(AVRO_SCHEMA_2_JSON_RECORD)
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isNull();
- assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
- });
- }
- @Test
- void valueWithAvroSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
- new SendAndReadSpec()
- .withValueSchema(AVRO_SCHEMA_2)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- // f2 has type object instead of string
- .content("{ \"f1\": 111, \"f2\": {} }")
- )
- .assertSendThrowsException();
- }
- @Test
- void keyWithAvroSchemaValueWithAvroSchemaValueIsNull() {
- new SendAndReadSpec()
- .withKeySchema(AVRO_SCHEMA_1)
- .withValueSchema(AVRO_SCHEMA_2)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(AVRO_SCHEMA_1_JSON_RECORD)
- .content(null)
- )
- .doAssert(polled -> {
- assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
- assertThat(polled.getContent()).isNull();
- });
- }
- @Test
- void keyWithAvroSchemaValueWithProtoSchema() {
- new SendAndReadSpec()
- .withKeySchema(AVRO_SCHEMA_1)
- .withValueSchema(PROTOBUF_SCHEMA)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(AVRO_SCHEMA_1_JSON_RECORD)
- .content(PROTOBUF_SCHEMA_JSON_RECORD)
- )
- .doAssert(polled -> {
- assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
- assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
- });
- }
- @Test
- void valueWithProtoSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
- new SendAndReadSpec()
- .withValueSchema(PROTOBUF_SCHEMA)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- // f2 field has type object instead of int
- .content("{ \"f1\" : \"test str\", \"f2\" : {} }"))
- .assertSendThrowsException();
- }
- @Test
- void keyWithProtoSchemaValueWithJsonSchema() {
- new SendAndReadSpec()
- .withKeySchema(PROTOBUF_SCHEMA)
- .withValueSchema(JSON_SCHEMA)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(PROTOBUF_SCHEMA_JSON_RECORD)
- .content(JSON_SCHEMA_RECORD)
- )
- .doAssert(polled -> {
- assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
- assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
- });
- }
- @Test
- void keyWithJsonValueWithJsonSchemaKeyValueIsNull() {
- new SendAndReadSpec()
- .withKeySchema(JSON_SCHEMA)
- .withValueSchema(JSON_SCHEMA)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(JSON_SCHEMA_RECORD)
- )
- .doAssert(polled -> {
- assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
- assertThat(polled.getContent()).isNull();
- });
- }
- @Test
- void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() {
- new SendAndReadSpec()
- .withValueSchema(JSON_SCHEMA)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- // 'f2' field has has type object instead of string
- .content("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }")
- )
- .assertSendThrowsException();
- }
- @Test
- void topicMessageMetadataAvro() {
- new SendAndReadSpec()
- .withKeySchema(AVRO_SCHEMA_1)
- .withValueSchema(AVRO_SCHEMA_2)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(AVRO_SCHEMA_1_JSON_RECORD)
- .content(AVRO_SCHEMA_2_JSON_RECORD)
- )
- .doAssert(polled -> {
- assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
- assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
- assertThat(polled.getKeySize()).isEqualTo(15L);
- assertThat(polled.getValueSize()).isEqualTo(15L);
- assertThat(polled.getKeyFormat()).isEqualTo(MessageFormatDTO.AVRO);
- assertThat(polled.getValueFormat()).isEqualTo(MessageFormatDTO.AVRO);
- assertThat(polled.getKeySchemaId()).isNotEmpty();
- assertThat(polled.getValueSchemaId()).isNotEmpty();
- });
- }
- @Test
- void topicMessageMetadataProtobuf() {
- new SendAndReadSpec()
- .withKeySchema(PROTOBUF_SCHEMA)
- .withValueSchema(PROTOBUF_SCHEMA)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(PROTOBUF_SCHEMA_JSON_RECORD)
- .content(PROTOBUF_SCHEMA_JSON_RECORD)
- )
- .doAssert(polled -> {
- assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
- assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
- assertThat(polled.getKeySize()).isEqualTo(18L);
- assertThat(polled.getValueSize()).isEqualTo(18L);
- assertThat(polled.getKeyFormat()).isEqualTo(MessageFormatDTO.PROTOBUF);
- assertThat(polled.getValueFormat()).isEqualTo(MessageFormatDTO.PROTOBUF);
- assertThat(polled.getKeySchemaId()).isNotEmpty();
- assertThat(polled.getValueSchemaId()).isNotEmpty();
- });
- }
- @Test
- void topicMessageMetadataJson() {
- new SendAndReadSpec()
- .withKeySchema(JSON_SCHEMA)
- .withValueSchema(JSON_SCHEMA)
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(JSON_SCHEMA_RECORD)
- .content(JSON_SCHEMA_RECORD)
- .headers(Map.of("header1", "value1"))
- )
- .doAssert(polled -> {
- assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
- assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
- assertThat(polled.getKeyFormat()).isEqualTo(MessageFormatDTO.JSON);
- assertThat(polled.getValueFormat()).isEqualTo(MessageFormatDTO.JSON);
- assertThat(polled.getKeySchemaId()).isNotEmpty();
- assertThat(polled.getValueSchemaId()).isNotEmpty();
- assertThat(polled.getKeySize()).isEqualTo(57L);
- assertThat(polled.getValueSize()).isEqualTo(57L);
- assertThat(polled.getHeadersSize()).isEqualTo(13L);
- });
- }
- @Test
- void noKeyAndNoContentPresentTest() {
- new SendAndReadSpec()
- .withMsgToSend(
- new CreateTopicMessageDTO()
- .key(null)
- .content(null)
- )
- .doAssert(polled -> {
- assertThat(polled.getKey()).isNull();
- assertThat(polled.getContent()).isNull();
- });
- }
- @SneakyThrows
- private void assertJsonEqual(String actual, String expected) {
- var mapper = new ObjectMapper();
- assertThat(mapper.readTree(actual)).isEqualTo(mapper.readTree(expected));
- }
- class SendAndReadSpec {
- CreateTopicMessageDTO msgToSend;
- ParsedSchema keySchema;
- ParsedSchema valueSchema;
- public SendAndReadSpec withMsgToSend(CreateTopicMessageDTO msg) {
- this.msgToSend = msg;
- return this;
- }
- public SendAndReadSpec withKeySchema(ParsedSchema keyScheam) {
- this.keySchema = keyScheam;
- return this;
- }
- public SendAndReadSpec withValueSchema(ParsedSchema valueSchema) {
- this.valueSchema = valueSchema;
- return this;
- }
- @SneakyThrows
- private String createTopicAndCreateSchemas() {
- Objects.requireNonNull(msgToSend);
- String topic = UUID.randomUUID().toString();
- createTopic(new NewTopic(topic, 1, (short) 1));
- if (keySchema != null) {
- schemaRegistry.schemaRegistryClient().register(topic + "-key", keySchema);
- }
- if (valueSchema != null) {
- schemaRegistry.schemaRegistryClient().register(topic + "-value", valueSchema);
- }
- // need to update to see new topic & schemas
- clustersMetricsScheduler.updateMetrics();
- return topic;
- }
- public void assertSendThrowsException() {
- String topic = createTopicAndCreateSchemas();
- try {
- StepVerifier.create(
- messagesService.sendMessage(targetCluster, topic, msgToSend)
- ).expectError().verify();
- } finally {
- deleteTopic(topic);
- }
- }
- @SneakyThrows
- public void doAssert(Consumer<TopicMessageDTO> msgAssert) {
- String topic = createTopicAndCreateSchemas();
- try {
- messagesService.sendMessage(targetCluster, topic, msgToSend).block();
- TopicMessageDTO polled = messagesService.loadMessages(
- targetCluster,
- topic,
- new ConsumerPosition(
- SeekTypeDTO.BEGINNING,
- Map.of(new TopicPartition(topic, 0), 0L),
- SeekDirectionDTO.FORWARD
- ),
- null,
- null,
- 1
- ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
- .map(TopicMessageEventDTO::getMessage)
- .blockLast(Duration.ofSeconds(5000));
- assertThat(polled).isNotNull();
- assertThat(polled.getPartition()).isEqualTo(0);
- assertThat(polled.getOffset()).isNotNull();
- msgAssert.accept(polled);
- } finally {
- deleteTopic(topic);
- }
- }
- }
- }
|