SendAndReadTests.java 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. package com.provectus.kafka.ui.service;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.provectus.kafka.ui.AbstractIntegrationTest;
  5. import com.provectus.kafka.ui.model.ConsumerPosition;
  6. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  7. import com.provectus.kafka.ui.model.KafkaCluster;
  8. import com.provectus.kafka.ui.model.PollingModeDTO;
  9. import com.provectus.kafka.ui.model.TopicMessageDTO;
  10. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  11. import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
  12. import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
  13. import com.provectus.kafka.ui.serdes.builtin.StringSerde;
  14. import com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerde;
  15. import io.confluent.kafka.schemaregistry.ParsedSchema;
  16. import io.confluent.kafka.schemaregistry.avro.AvroSchema;
  17. import io.confluent.kafka.schemaregistry.json.JsonSchema;
  18. import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
  19. import java.time.Duration;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.Objects;
  23. import java.util.UUID;
  24. import java.util.function.Consumer;
  25. import lombok.SneakyThrows;
  26. import org.apache.kafka.clients.admin.NewTopic;
  27. import org.apache.kafka.common.TopicPartition;
  28. import org.junit.jupiter.api.BeforeEach;
  29. import org.junit.jupiter.api.Test;
  30. import org.springframework.beans.factory.annotation.Autowired;
  31. import reactor.test.StepVerifier;
  32. public class SendAndReadTests extends AbstractIntegrationTest {
  33. private static final AvroSchema AVRO_SCHEMA_1 = new AvroSchema(
  34. "{"
  35. + " \"type\": \"record\","
  36. + " \"name\": \"TestAvroRecord1\","
  37. + " \"fields\": ["
  38. + " {"
  39. + " \"name\": \"field1\","
  40. + " \"type\": \"string\""
  41. + " },"
  42. + " {"
  43. + " \"name\": \"field2\","
  44. + " \"type\": \"int\""
  45. + " }"
  46. + " ]"
  47. + "}"
  48. );
  49. private static final AvroSchema AVRO_SCHEMA_2 = new AvroSchema(
  50. "{"
  51. + " \"type\": \"record\","
  52. + " \"name\": \"TestAvroRecord2\","
  53. + " \"fields\": ["
  54. + " {"
  55. + " \"name\": \"f1\","
  56. + " \"type\": \"int\""
  57. + " },"
  58. + " {"
  59. + " \"name\": \"f2\","
  60. + " \"type\": \"string\""
  61. + " }"
  62. + " ]"
  63. + "}"
  64. );
  65. private static final AvroSchema AVRO_SCHEMA_PRIMITIVE_STRING =
  66. new AvroSchema("{ \"type\": \"string\" }");
  67. private static final AvroSchema AVRO_SCHEMA_PRIMITIVE_INT =
  68. new AvroSchema("{ \"type\": \"int\" }");
  69. private static final String AVRO_SCHEMA_1_JSON_RECORD
  70. = "{ \"field1\":\"testStr\", \"field2\": 123 }";
  71. private static final String AVRO_SCHEMA_2_JSON_RECORD = "{ \"f1\": 111, \"f2\": \"testStr\" }";
  72. private static final ProtobufSchema PROTOBUF_SCHEMA = new ProtobufSchema(
  73. "syntax = \"proto3\";\n"
  74. + "package com.provectus;\n"
  75. + "\n"
  76. + "message TestProtoRecord {\n"
  77. + " string f1 = 1;\n"
  78. + " int32 f2 = 2;\n"
  79. + "}\n"
  80. + "\n"
  81. );
  82. private static final String PROTOBUF_SCHEMA_JSON_RECORD
  83. = "{ \"f1\" : \"test str\", \"f2\" : 123 }";
  84. private static final JsonSchema JSON_SCHEMA = new JsonSchema(
  85. "{ "
  86. + " \"$schema\": \"http://json-schema.org/draft-07/schema#\", "
  87. + " \"$id\": \"http://example.com/myURI.schema.json\", "
  88. + " \"title\": \"TestRecord\","
  89. + " \"type\": \"object\","
  90. + " \"additionalProperties\": false,"
  91. + " \"properties\": {"
  92. + " \"f1\": {"
  93. + " \"type\": \"integer\""
  94. + " },"
  95. + " \"f2\": {"
  96. + " \"type\": \"string\""
  97. + " },"
  98. // it is important special case since there is code in KafkaJsonSchemaSerializer
  99. // that checks fields with this name (it should be worked around)
  100. + " \"schema\": {"
  101. + " \"type\": \"string\""
  102. + " }"
  103. + " }"
  104. + "}"
  105. );
  106. private static final String JSON_SCHEMA_RECORD
  107. = "{ \"f1\": 12, \"f2\": \"testJsonSchema1\", \"schema\": \"some txt\" }";
  108. private KafkaCluster targetCluster;
  109. @Autowired
  110. private MessagesService messagesService;
  111. @Autowired
  112. private ClustersStorage clustersStorage;
  113. @BeforeEach
  114. void init() {
  115. targetCluster = clustersStorage.getClusterByName(LOCAL).get();
  116. }
  117. @Test
  118. void noSchemaStringKeyStringValue() {
  119. new SendAndReadSpec()
  120. .withMsgToSend(
  121. new CreateTopicMessageDTO()
  122. .key("testKey")
  123. .keySerde(StringSerde.name())
  124. .content("testValue")
  125. .valueSerde(StringSerde.name())
  126. )
  127. .doAssert(polled -> {
  128. assertThat(polled.getKey()).isEqualTo("testKey");
  129. assertThat(polled.getContent()).isEqualTo("testValue");
  130. });
  131. }
  132. @Test
  133. void keyIsIntValueIsLong() {
  134. new SendAndReadSpec()
  135. .withMsgToSend(
  136. new CreateTopicMessageDTO()
  137. .key("123")
  138. .keySerde(Int32Serde.name())
  139. .content("21474836470")
  140. .valueSerde(Int64Serde.name())
  141. )
  142. .doAssert(polled -> {
  143. assertThat(polled.getKey()).isEqualTo("123");
  144. assertThat(polled.getContent()).isEqualTo("21474836470");
  145. });
  146. }
  147. @Test
  148. void keyIsNull() {
  149. new SendAndReadSpec()
  150. .withMsgToSend(
  151. new CreateTopicMessageDTO()
  152. .key(null)
  153. .keySerde(StringSerde.name())
  154. .content("testValue")
  155. .valueSerde(StringSerde.name())
  156. )
  157. .doAssert(polled -> {
  158. assertThat(polled.getKey()).isNull();
  159. assertThat(polled.getContent()).isEqualTo("testValue");
  160. });
  161. }
  162. @Test
  163. void valueIsNull() {
  164. new SendAndReadSpec()
  165. .withMsgToSend(
  166. new CreateTopicMessageDTO()
  167. .key("testKey")
  168. .keySerde(StringSerde.name())
  169. .content(null)
  170. .valueSerde(StringSerde.name())
  171. )
  172. .doAssert(polled -> {
  173. assertThat(polled.getKey()).isEqualTo("testKey");
  174. assertThat(polled.getContent()).isNull();
  175. });
  176. }
  177. @Test
  178. void primitiveAvroSchemas() {
  179. new SendAndReadSpec()
  180. .withKeySchema(AVRO_SCHEMA_PRIMITIVE_STRING)
  181. .withValueSchema(AVRO_SCHEMA_PRIMITIVE_INT)
  182. .withMsgToSend(
  183. new CreateTopicMessageDTO()
  184. .key("\"some string\"")
  185. .keySerde(SchemaRegistrySerde.name())
  186. .content("123")
  187. .valueSerde(SchemaRegistrySerde.name())
  188. )
  189. .doAssert(polled -> {
  190. assertThat(polled.getKey()).isEqualTo("\"some string\"");
  191. assertThat(polled.getContent()).isEqualTo("123");
  192. });
  193. }
  194. @Test
  195. void recordAvroSchema() {
  196. new SendAndReadSpec()
  197. .withKeySchema(AVRO_SCHEMA_1)
  198. .withValueSchema(AVRO_SCHEMA_2)
  199. .withMsgToSend(
  200. new CreateTopicMessageDTO()
  201. .key(AVRO_SCHEMA_1_JSON_RECORD)
  202. .keySerde(SchemaRegistrySerde.name())
  203. .content(AVRO_SCHEMA_2_JSON_RECORD)
  204. .valueSerde(SchemaRegistrySerde.name())
  205. )
  206. .doAssert(polled -> {
  207. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  208. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
  209. });
  210. }
  211. @Test
  212. void keyWithNoSchemaValueWithProtoSchema() {
  213. new SendAndReadSpec()
  214. .withValueSchema(PROTOBUF_SCHEMA)
  215. .withMsgToSend(
  216. new CreateTopicMessageDTO()
  217. .key("testKey")
  218. .keySerde(StringSerde.name())
  219. .content(PROTOBUF_SCHEMA_JSON_RECORD)
  220. .valueSerde(SchemaRegistrySerde.name())
  221. )
  222. .doAssert(polled -> {
  223. assertThat(polled.getKey()).isEqualTo("testKey");
  224. assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
  225. });
  226. }
  227. @Test
  228. void keyWithAvroSchemaValueWithAvroSchemaKeyIsNull() {
  229. new SendAndReadSpec()
  230. .withKeySchema(AVRO_SCHEMA_1)
  231. .withValueSchema(AVRO_SCHEMA_2)
  232. .withMsgToSend(
  233. new CreateTopicMessageDTO()
  234. .key(null)
  235. .keySerde(SchemaRegistrySerde.name())
  236. .content(AVRO_SCHEMA_2_JSON_RECORD)
  237. .valueSerde(SchemaRegistrySerde.name())
  238. )
  239. .doAssert(polled -> {
  240. assertThat(polled.getKey()).isNull();
  241. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
  242. });
  243. }
  244. @Test
  245. void valueWithAvroSchemaShouldThrowExceptionIfArgIsNotValidJsonObject() {
  246. new SendAndReadSpec()
  247. .withValueSchema(AVRO_SCHEMA_2)
  248. .withMsgToSend(
  249. new CreateTopicMessageDTO()
  250. .keySerde(StringSerde.name())
  251. // f2 has type int instead of string
  252. .content("{ \"f1\": 111, \"f2\": 123 }")
  253. .valueSerde(SchemaRegistrySerde.name())
  254. )
  255. .assertSendThrowsException();
  256. }
  257. @Test
  258. void keyWithAvroSchemaValueWithProtoSchema() {
  259. new SendAndReadSpec()
  260. .withKeySchema(AVRO_SCHEMA_1)
  261. .withValueSchema(PROTOBUF_SCHEMA)
  262. .withMsgToSend(
  263. new CreateTopicMessageDTO()
  264. .key(AVRO_SCHEMA_1_JSON_RECORD)
  265. .keySerde(SchemaRegistrySerde.name())
  266. .content(PROTOBUF_SCHEMA_JSON_RECORD)
  267. .valueSerde(SchemaRegistrySerde.name())
  268. )
  269. .doAssert(polled -> {
  270. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  271. assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
  272. });
  273. }
  274. @Test
  275. void valueWithProtoSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
  276. new SendAndReadSpec()
  277. .withValueSchema(PROTOBUF_SCHEMA)
  278. .withMsgToSend(
  279. new CreateTopicMessageDTO()
  280. .key(null)
  281. .keySerde(StringSerde.name())
  282. // f2 field has type object instead of int
  283. .content("{ \"f1\" : \"test str\", \"f2\" : {} }")
  284. .valueSerde(SchemaRegistrySerde.name())
  285. )
  286. .assertSendThrowsException();
  287. }
  288. @Test
  289. void keyWithProtoSchemaValueWithJsonSchema() {
  290. new SendAndReadSpec()
  291. .withKeySchema(PROTOBUF_SCHEMA)
  292. .withValueSchema(JSON_SCHEMA)
  293. .withMsgToSend(
  294. new CreateTopicMessageDTO()
  295. .key(PROTOBUF_SCHEMA_JSON_RECORD)
  296. .keySerde(SchemaRegistrySerde.name())
  297. .content(JSON_SCHEMA_RECORD)
  298. .valueSerde(SchemaRegistrySerde.name())
  299. )
  300. .doAssert(polled -> {
  301. assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
  302. assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
  303. });
  304. }
  305. @Test
  306. void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() {
  307. new SendAndReadSpec()
  308. .withValueSchema(JSON_SCHEMA)
  309. .withMsgToSend(
  310. new CreateTopicMessageDTO()
  311. .key(null)
  312. .keySerde(StringSerde.name())
  313. // 'f2' field has has type object instead of string
  314. .content("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }")
  315. .valueSerde(SchemaRegistrySerde.name())
  316. )
  317. .assertSendThrowsException();
  318. }
  319. @Test
  320. void topicMessageMetadataAvro() {
  321. new SendAndReadSpec()
  322. .withKeySchema(AVRO_SCHEMA_1)
  323. .withValueSchema(AVRO_SCHEMA_2)
  324. .withMsgToSend(
  325. new CreateTopicMessageDTO()
  326. .key(AVRO_SCHEMA_1_JSON_RECORD)
  327. .keySerde(SchemaRegistrySerde.name())
  328. .content(AVRO_SCHEMA_2_JSON_RECORD)
  329. .valueSerde(SchemaRegistrySerde.name())
  330. )
  331. .doAssert(polled -> {
  332. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  333. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
  334. assertThat(polled.getKeySize()).isEqualTo(15L);
  335. assertThat(polled.getValueSize()).isEqualTo(15L);
  336. assertThat(polled.getKeyDeserializeProperties().get("schemaId")).isNotNull();
  337. assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
  338. assertThat(polled.getKeyDeserializeProperties().get("type")).isEqualTo("AVRO");
  339. assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
  340. assertThat(polled.getValueDeserializeProperties().get("type")).isEqualTo("AVRO");
  341. });
  342. }
  343. @Test
  344. void topicMessageMetadataProtobuf() {
  345. new SendAndReadSpec()
  346. .withKeySchema(PROTOBUF_SCHEMA)
  347. .withValueSchema(PROTOBUF_SCHEMA)
  348. .withMsgToSend(
  349. new CreateTopicMessageDTO()
  350. .key(PROTOBUF_SCHEMA_JSON_RECORD)
  351. .keySerde(SchemaRegistrySerde.name())
  352. .content(PROTOBUF_SCHEMA_JSON_RECORD)
  353. .valueSerde(SchemaRegistrySerde.name())
  354. )
  355. .doAssert(polled -> {
  356. assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
  357. assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
  358. assertThat(polled.getKeySize()).isEqualTo(18L);
  359. assertThat(polled.getValueSize()).isEqualTo(18L);
  360. assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
  361. assertThat(polled.getKeyDeserializeProperties().get("type")).isEqualTo("PROTOBUF");
  362. assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
  363. assertThat(polled.getValueDeserializeProperties().get("type")).isEqualTo("PROTOBUF");
  364. });
  365. }
  366. @Test
  367. void topicMessageMetadataJson() {
  368. new SendAndReadSpec()
  369. .withKeySchema(JSON_SCHEMA)
  370. .withValueSchema(JSON_SCHEMA)
  371. .withMsgToSend(
  372. new CreateTopicMessageDTO()
  373. .key(JSON_SCHEMA_RECORD)
  374. .keySerde(SchemaRegistrySerde.name())
  375. .content(JSON_SCHEMA_RECORD)
  376. .valueSerde(SchemaRegistrySerde.name())
  377. .headers(Map.of("header1", "value1"))
  378. )
  379. .doAssert(polled -> {
  380. assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
  381. assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
  382. assertThat(polled.getKeySize()).isEqualTo(57L);
  383. assertThat(polled.getValueSize()).isEqualTo(57L);
  384. assertThat(polled.getHeadersSize()).isEqualTo(13L);
  385. assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
  386. assertThat(polled.getKeyDeserializeProperties().get("type")).isEqualTo("JSON");
  387. assertThat(polled.getValueDeserializeProperties().get("schemaId")).isNotNull();
  388. assertThat(polled.getValueDeserializeProperties().get("type")).isEqualTo("JSON");
  389. });
  390. }
  391. @Test
  392. void noKeyAndNoContentPresentTest() {
  393. new SendAndReadSpec()
  394. .withMsgToSend(
  395. new CreateTopicMessageDTO()
  396. .key(null)
  397. .keySerde(StringSerde.name()) // any serde
  398. .content(null)
  399. .valueSerde(StringSerde.name()) // any serde
  400. )
  401. .doAssert(polled -> {
  402. assertThat(polled.getKey()).isNull();
  403. assertThat(polled.getContent()).isNull();
  404. });
  405. }
  406. @SneakyThrows
  407. private void assertJsonEqual(String actual, String expected) {
  408. var mapper = new ObjectMapper();
  409. assertThat(mapper.readTree(actual)).isEqualTo(mapper.readTree(expected));
  410. }
  411. class SendAndReadSpec {
  412. CreateTopicMessageDTO msgToSend;
  413. ParsedSchema keySchema;
  414. ParsedSchema valueSchema;
  415. public SendAndReadSpec withMsgToSend(CreateTopicMessageDTO msg) {
  416. this.msgToSend = msg;
  417. return this;
  418. }
  419. public SendAndReadSpec withKeySchema(ParsedSchema keyScheam) {
  420. this.keySchema = keyScheam;
  421. return this;
  422. }
  423. public SendAndReadSpec withValueSchema(ParsedSchema valueSchema) {
  424. this.valueSchema = valueSchema;
  425. return this;
  426. }
  427. @SneakyThrows
  428. private String createTopicAndCreateSchemas() {
  429. Objects.requireNonNull(msgToSend);
  430. String topic = UUID.randomUUID().toString();
  431. createTopic(new NewTopic(topic, 1, (short) 1));
  432. if (keySchema != null) {
  433. schemaRegistry.schemaRegistryClient().register(topic + "-key", keySchema);
  434. }
  435. if (valueSchema != null) {
  436. schemaRegistry.schemaRegistryClient().register(topic + "-value", valueSchema);
  437. }
  438. return topic;
  439. }
  440. public void assertSendThrowsException() {
  441. String topic = createTopicAndCreateSchemas();
  442. try {
  443. StepVerifier.create(
  444. messagesService.sendMessage(targetCluster, topic, msgToSend)
  445. ).expectError().verify();
  446. } finally {
  447. deleteTopic(topic);
  448. }
  449. }
  450. @SneakyThrows
  451. public void doAssert(Consumer<TopicMessageDTO> msgAssert) {
  452. String topic = createTopicAndCreateSchemas();
  453. try {
  454. messagesService.sendMessage(targetCluster, topic, msgToSend).block();
  455. TopicMessageDTO polled = messagesService.loadMessagesV2(
  456. targetCluster,
  457. topic,
  458. new ConsumerPosition(PollingModeDTO.EARLIEST, topic, List.of(), null, null),
  459. null,
  460. null,
  461. 1,
  462. msgToSend.getKeySerde().get(),
  463. msgToSend.getValueSerde().get()
  464. ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
  465. .map(TopicMessageEventDTO::getMessage)
  466. .blockLast(Duration.ofSeconds(5000));
  467. assertThat(polled).isNotNull();
  468. assertThat(polled.getPartition()).isEqualTo(0);
  469. assertThat(polled.getOffset()).isNotNull();
  470. msgAssert.accept(polled);
  471. } finally {
  472. deleteTopic(topic);
  473. }
  474. }
  475. }
  476. }