SendAndReadTests.java 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. package com.provectus.kafka.ui.service;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import static org.assertj.core.api.Assertions.assertThatThrownBy;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import com.provectus.kafka.ui.AbstractBaseTest;
  6. import com.provectus.kafka.ui.model.ConsumerPosition;
  7. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  8. import com.provectus.kafka.ui.model.KafkaCluster;
  9. import com.provectus.kafka.ui.model.MessageFormatDTO;
  10. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  11. import com.provectus.kafka.ui.model.SeekTypeDTO;
  12. import com.provectus.kafka.ui.model.TopicMessageDTO;
  13. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  14. import io.confluent.kafka.schemaregistry.ParsedSchema;
  15. import io.confluent.kafka.schemaregistry.avro.AvroSchema;
  16. import io.confluent.kafka.schemaregistry.json.JsonSchema;
  17. import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
  18. import java.time.Duration;
  19. import java.util.Map;
  20. import java.util.Objects;
  21. import java.util.UUID;
  22. import java.util.function.Consumer;
  23. import lombok.SneakyThrows;
  24. import org.apache.kafka.clients.admin.NewTopic;
  25. import org.apache.kafka.common.TopicPartition;
  26. import org.junit.jupiter.api.BeforeEach;
  27. import org.junit.jupiter.api.Test;
  28. import org.springframework.beans.factory.annotation.Autowired;
  29. import org.springframework.test.context.ContextConfiguration;
  30. @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
  31. public class SendAndReadTests extends AbstractBaseTest {
  32. private static final AvroSchema AVRO_SCHEMA_1 = new AvroSchema(
  33. "{"
  34. + " \"type\": \"record\","
  35. + " \"name\": \"TestAvroRecord1\","
  36. + " \"fields\": ["
  37. + " {"
  38. + " \"name\": \"field1\","
  39. + " \"type\": \"string\""
  40. + " },"
  41. + " {"
  42. + " \"name\": \"field2\","
  43. + " \"type\": \"int\""
  44. + " }"
  45. + " ]"
  46. + "}"
  47. );
  48. private static final AvroSchema AVRO_SCHEMA_2 = new AvroSchema(
  49. "{"
  50. + " \"type\": \"record\","
  51. + " \"name\": \"TestAvroRecord2\","
  52. + " \"fields\": ["
  53. + " {"
  54. + " \"name\": \"f1\","
  55. + " \"type\": \"int\""
  56. + " },"
  57. + " {"
  58. + " \"name\": \"f2\","
  59. + " \"type\": \"string\""
  60. + " }"
  61. + " ]"
  62. + "}"
  63. );
  64. private static final AvroSchema AVRO_SCHEMA_PRIMITIVE_STRING =
  65. new AvroSchema("{ \"type\": \"string\" }");
  66. private static final AvroSchema AVRO_SCHEMA_PRIMITIVE_INT =
  67. new AvroSchema("{ \"type\": \"int\" }");
  68. private static final String AVRO_SCHEMA_1_JSON_RECORD
  69. = "{ \"field1\":\"testStr\", \"field2\": 123 }";
  70. private static final String AVRO_SCHEMA_2_JSON_RECORD = "{ \"f1\": 111, \"f2\": \"testStr\" }";
  71. private static final ProtobufSchema PROTOBUF_SCHEMA = new ProtobufSchema(
  72. "syntax = \"proto3\";\n"
  73. + "package com.provectus;\n"
  74. + "\n"
  75. + "message TestProtoRecord {\n"
  76. + " string f1 = 1;\n"
  77. + " int32 f2 = 2;\n"
  78. + "}\n"
  79. + "\n"
  80. );
  81. private static final String PROTOBUF_SCHEMA_JSON_RECORD
  82. = "{ \"f1\" : \"test str\", \"f2\" : 123 }";
  83. private static final JsonSchema JSON_SCHEMA = new JsonSchema(
  84. "{ "
  85. + " \"$schema\": \"http://json-schema.org/draft-07/schema#\", "
  86. + " \"$id\": \"http://example.com/myURI.schema.json\", "
  87. + " \"title\": \"TestRecord\","
  88. + " \"type\": \"object\","
  89. + " \"additionalProperties\": false,"
  90. + " \"properties\": {"
  91. + " \"f1\": {"
  92. + " \"type\": \"integer\""
  93. + " },"
  94. + " \"f2\": {"
  95. + " \"type\": \"string\""
  96. + " },"
  97. // it is important special case since there is code in KafkaJsonSchemaSerializer
  98. // that checks fields with this name (it should be worked around)
  99. + " \"schema\": {"
  100. + " \"type\": \"string\""
  101. + " }"
  102. + " }"
  103. + "}"
  104. );
  105. private static final String JSON_SCHEMA_RECORD
  106. = "{ \"f1\": 12, \"f2\": \"testJsonSchema1\", \"schema\": \"some txt\" }";
  107. private KafkaCluster targetCluster;
  108. @Autowired
  109. private MessagesService messagesService;
  110. @Autowired
  111. private ClustersStorage clustersStorage;
  112. @Autowired
  113. private ClustersMetricsScheduler clustersMetricsScheduler;
  114. @BeforeEach
  115. void init() {
  116. targetCluster = clustersStorage.getClusterByName(LOCAL).get();
  117. }
  118. @Test
  119. void noSchemaStringKeyStringValue() {
  120. new SendAndReadSpec()
  121. .withMsgToSend(
  122. new CreateTopicMessageDTO()
  123. .key("testKey")
  124. .content("testValue")
  125. )
  126. .doAssert(polled -> {
  127. assertThat(polled.getKey()).isEqualTo("testKey");
  128. assertThat(polled.getContent()).isEqualTo("testValue");
  129. });
  130. }
  131. @Test
  132. void noSchemaJsonKeyJsonValue() {
  133. new SendAndReadSpec()
  134. .withMsgToSend(
  135. new CreateTopicMessageDTO()
  136. .key("{ \"f1\": 111, \"f2\": \"testStr1\" }")
  137. .content("{ \"f1\": 222, \"f2\": \"testStr2\" }")
  138. )
  139. .doAssert(polled -> {
  140. assertThat(polled.getKey()).isEqualTo("{ \"f1\": 111, \"f2\": \"testStr1\" }");
  141. assertThat(polled.getContent()).isEqualTo("{ \"f1\": 222, \"f2\": \"testStr2\" }");
  142. });
  143. }
  144. @Test
  145. void keyIsIntValueIsDoubleShouldBeSerializedAsStrings() {
  146. new SendAndReadSpec()
  147. .withMsgToSend(
  148. new CreateTopicMessageDTO()
  149. .key("123")
  150. .content("234.56")
  151. )
  152. .doAssert(polled -> {
  153. assertThat(polled.getKey()).isEqualTo("123");
  154. assertThat(polled.getContent()).isEqualTo("234.56");
  155. });
  156. }
  157. @Test
  158. void noSchemaKeyIsNull() {
  159. new SendAndReadSpec()
  160. .withMsgToSend(
  161. new CreateTopicMessageDTO()
  162. .key(null)
  163. .content("testValue")
  164. )
  165. .doAssert(polled -> {
  166. assertThat(polled.getKey()).isNull();
  167. assertThat(polled.getContent()).isEqualTo("testValue");
  168. });
  169. }
  170. @Test
  171. void noSchemaValueIsNull() {
  172. new SendAndReadSpec()
  173. .withMsgToSend(
  174. new CreateTopicMessageDTO()
  175. .key("testKey")
  176. .content(null)
  177. )
  178. .doAssert(polled -> {
  179. assertThat(polled.getKey()).isEqualTo("testKey");
  180. assertThat(polled.getContent()).isNull();
  181. });
  182. }
  183. @Test
  184. void primitiveAvroSchemas() {
  185. new SendAndReadSpec()
  186. .withKeySchema(AVRO_SCHEMA_PRIMITIVE_STRING)
  187. .withValueSchema(AVRO_SCHEMA_PRIMITIVE_INT)
  188. .withMsgToSend(
  189. new CreateTopicMessageDTO()
  190. .key("\"some string\"")
  191. .content("123")
  192. )
  193. .doAssert(polled -> {
  194. assertThat(polled.getKey()).isEqualTo("\"some string\"");
  195. assertThat(polled.getContent()).isEqualTo("123");
  196. });
  197. }
  198. @Test
  199. void nonNullableKvWithAvroSchema() {
  200. new SendAndReadSpec()
  201. .withKeySchema(AVRO_SCHEMA_1)
  202. .withValueSchema(AVRO_SCHEMA_2)
  203. .withMsgToSend(
  204. new CreateTopicMessageDTO()
  205. .key(AVRO_SCHEMA_1_JSON_RECORD)
  206. .content(AVRO_SCHEMA_2_JSON_RECORD)
  207. )
  208. .doAssert(polled -> {
  209. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  210. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
  211. });
  212. }
  213. @Test
  214. void keyWithNoSchemaValueWithAvroSchema() {
  215. new SendAndReadSpec()
  216. .withValueSchema(AVRO_SCHEMA_1)
  217. .withMsgToSend(
  218. new CreateTopicMessageDTO()
  219. .key("testKey")
  220. .content(AVRO_SCHEMA_1_JSON_RECORD)
  221. )
  222. .doAssert(polled -> {
  223. assertThat(polled.getKey()).isEqualTo("testKey");
  224. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_1_JSON_RECORD);
  225. });
  226. }
  227. @Test
  228. void keyWithAvroSchemaValueWithNoSchema() {
  229. new SendAndReadSpec()
  230. .withKeySchema(AVRO_SCHEMA_1)
  231. .withMsgToSend(
  232. new CreateTopicMessageDTO()
  233. .key(AVRO_SCHEMA_1_JSON_RECORD)
  234. .content("testVal")
  235. )
  236. .doAssert(polled -> {
  237. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  238. assertThat(polled.getContent()).isEqualTo("testVal");
  239. });
  240. }
  241. @Test
  242. void keyWithNoSchemaValueWithProtoSchema() {
  243. new SendAndReadSpec()
  244. .withValueSchema(PROTOBUF_SCHEMA)
  245. .withMsgToSend(
  246. new CreateTopicMessageDTO()
  247. .key("testKey")
  248. .content(PROTOBUF_SCHEMA_JSON_RECORD)
  249. )
  250. .doAssert(polled -> {
  251. assertThat(polled.getKey()).isEqualTo("testKey");
  252. assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
  253. });
  254. }
  255. @Test
  256. void keyWithAvroSchemaValueWithAvroSchemaKeyIsNull() {
  257. new SendAndReadSpec()
  258. .withKeySchema(AVRO_SCHEMA_1)
  259. .withValueSchema(AVRO_SCHEMA_2)
  260. .withMsgToSend(
  261. new CreateTopicMessageDTO()
  262. .key(null)
  263. .content(AVRO_SCHEMA_2_JSON_RECORD)
  264. )
  265. .doAssert(polled -> {
  266. assertThat(polled.getKey()).isNull();
  267. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
  268. });
  269. }
  270. @Test
  271. void valueWithAvroSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
  272. new SendAndReadSpec()
  273. .withValueSchema(AVRO_SCHEMA_2)
  274. .withMsgToSend(
  275. new CreateTopicMessageDTO()
  276. // f2 has type object instead of string
  277. .content("{ \"f1\": 111, \"f2\": {} }")
  278. )
  279. .assertSendThrowsException();
  280. }
  281. @Test
  282. void keyWithAvroSchemaValueWithAvroSchemaValueIsNull() {
  283. new SendAndReadSpec()
  284. .withKeySchema(AVRO_SCHEMA_1)
  285. .withValueSchema(AVRO_SCHEMA_2)
  286. .withMsgToSend(
  287. new CreateTopicMessageDTO()
  288. .key(AVRO_SCHEMA_1_JSON_RECORD)
  289. .content(null)
  290. )
  291. .doAssert(polled -> {
  292. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  293. assertThat(polled.getContent()).isNull();
  294. });
  295. }
  296. @Test
  297. void keyWithAvroSchemaValueWithProtoSchema() {
  298. new SendAndReadSpec()
  299. .withKeySchema(AVRO_SCHEMA_1)
  300. .withValueSchema(PROTOBUF_SCHEMA)
  301. .withMsgToSend(
  302. new CreateTopicMessageDTO()
  303. .key(AVRO_SCHEMA_1_JSON_RECORD)
  304. .content(PROTOBUF_SCHEMA_JSON_RECORD)
  305. )
  306. .doAssert(polled -> {
  307. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  308. assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
  309. });
  310. }
  311. @Test
  312. void valueWithProtoSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
  313. new SendAndReadSpec()
  314. .withValueSchema(PROTOBUF_SCHEMA)
  315. .withMsgToSend(
  316. new CreateTopicMessageDTO()
  317. // f2 field has type object instead of int
  318. .content("{ \"f1\" : \"test str\", \"f2\" : {} }"))
  319. .assertSendThrowsException();
  320. }
  321. @Test
  322. void keyWithProtoSchemaValueWithJsonSchema() {
  323. new SendAndReadSpec()
  324. .withKeySchema(PROTOBUF_SCHEMA)
  325. .withValueSchema(JSON_SCHEMA)
  326. .withMsgToSend(
  327. new CreateTopicMessageDTO()
  328. .key(PROTOBUF_SCHEMA_JSON_RECORD)
  329. .content(JSON_SCHEMA_RECORD)
  330. )
  331. .doAssert(polled -> {
  332. assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
  333. assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
  334. });
  335. }
  336. @Test
  337. void keyWithJsonValueWithJsonSchemaKeyValueIsNull() {
  338. new SendAndReadSpec()
  339. .withKeySchema(JSON_SCHEMA)
  340. .withValueSchema(JSON_SCHEMA)
  341. .withMsgToSend(
  342. new CreateTopicMessageDTO()
  343. .key(JSON_SCHEMA_RECORD)
  344. )
  345. .doAssert(polled -> {
  346. assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
  347. assertThat(polled.getContent()).isNull();
  348. });
  349. }
  350. @Test
  351. void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() {
  352. new SendAndReadSpec()
  353. .withValueSchema(JSON_SCHEMA)
  354. .withMsgToSend(
  355. new CreateTopicMessageDTO()
  356. // 'f2' field has has type object instead of string
  357. .content("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }")
  358. )
  359. .assertSendThrowsException();
  360. }
  361. @Test
  362. void topicMessageMetadataAvro() {
  363. new SendAndReadSpec()
  364. .withKeySchema(AVRO_SCHEMA_1)
  365. .withValueSchema(AVRO_SCHEMA_2)
  366. .withMsgToSend(
  367. new CreateTopicMessageDTO()
  368. .key(AVRO_SCHEMA_1_JSON_RECORD)
  369. .content(AVRO_SCHEMA_2_JSON_RECORD)
  370. )
  371. .doAssert(polled -> {
  372. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  373. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
  374. assertThat(polled.getKeySize()).isEqualTo(15L);
  375. assertThat(polled.getValueSize()).isEqualTo(15L);
  376. assertThat(polled.getKeyFormat()).isEqualTo(MessageFormatDTO.AVRO);
  377. assertThat(polled.getValueFormat()).isEqualTo(MessageFormatDTO.AVRO);
  378. assertThat(polled.getKeySchemaId()).isNotEmpty();
  379. assertThat(polled.getValueSchemaId()).isNotEmpty();
  380. });
  381. }
  382. @Test
  383. void topicMessageMetadataProtobuf() {
  384. new SendAndReadSpec()
  385. .withKeySchema(PROTOBUF_SCHEMA)
  386. .withValueSchema(PROTOBUF_SCHEMA)
  387. .withMsgToSend(
  388. new CreateTopicMessageDTO()
  389. .key(PROTOBUF_SCHEMA_JSON_RECORD)
  390. .content(PROTOBUF_SCHEMA_JSON_RECORD)
  391. )
  392. .doAssert(polled -> {
  393. assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
  394. assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
  395. assertThat(polled.getKeySize()).isEqualTo(18L);
  396. assertThat(polled.getValueSize()).isEqualTo(18L);
  397. assertThat(polled.getKeyFormat()).isEqualTo(MessageFormatDTO.PROTOBUF);
  398. assertThat(polled.getValueFormat()).isEqualTo(MessageFormatDTO.PROTOBUF);
  399. assertThat(polled.getKeySchemaId()).isNotEmpty();
  400. assertThat(polled.getValueSchemaId()).isNotEmpty();
  401. });
  402. }
  403. @Test
  404. void topicMessageMetadataJson() {
  405. new SendAndReadSpec()
  406. .withKeySchema(JSON_SCHEMA)
  407. .withValueSchema(JSON_SCHEMA)
  408. .withMsgToSend(
  409. new CreateTopicMessageDTO()
  410. .key(JSON_SCHEMA_RECORD)
  411. .content(JSON_SCHEMA_RECORD)
  412. .headers(Map.of("header1", "value1"))
  413. )
  414. .doAssert(polled -> {
  415. assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
  416. assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
  417. assertThat(polled.getKeyFormat()).isEqualTo(MessageFormatDTO.JSON);
  418. assertThat(polled.getValueFormat()).isEqualTo(MessageFormatDTO.JSON);
  419. assertThat(polled.getKeySchemaId()).isNotEmpty();
  420. assertThat(polled.getValueSchemaId()).isNotEmpty();
  421. assertThat(polled.getKeySize()).isEqualTo(57L);
  422. assertThat(polled.getValueSize()).isEqualTo(57L);
  423. assertThat(polled.getHeadersSize()).isEqualTo(13L);
  424. });
  425. }
  426. @SneakyThrows
  427. private void assertJsonEqual(String actual, String expected) {
  428. var mapper = new ObjectMapper();
  429. assertThat(mapper.readTree(actual)).isEqualTo(mapper.readTree(expected));
  430. }
  431. class SendAndReadSpec {
  432. CreateTopicMessageDTO msgToSend;
  433. ParsedSchema keySchema;
  434. ParsedSchema valueSchema;
  435. public SendAndReadSpec withMsgToSend(CreateTopicMessageDTO msg) {
  436. this.msgToSend = msg;
  437. return this;
  438. }
  439. public SendAndReadSpec withKeySchema(ParsedSchema keyScheam) {
  440. this.keySchema = keyScheam;
  441. return this;
  442. }
  443. public SendAndReadSpec withValueSchema(ParsedSchema valueSchema) {
  444. this.valueSchema = valueSchema;
  445. return this;
  446. }
  447. @SneakyThrows
  448. private String createTopicAndCreateSchemas() {
  449. Objects.requireNonNull(msgToSend);
  450. String topic = UUID.randomUUID().toString();
  451. createTopic(new NewTopic(topic, 1, (short) 1));
  452. if (keySchema != null) {
  453. schemaRegistry.schemaRegistryClient().register(topic + "-key", keySchema);
  454. }
  455. if (valueSchema != null) {
  456. schemaRegistry.schemaRegistryClient().register(topic + "-value", valueSchema);
  457. }
  458. // need to update to see new topic & schemas
  459. clustersMetricsScheduler.updateMetrics();
  460. return topic;
  461. }
  462. public void assertSendThrowsException() {
  463. String topic = createTopicAndCreateSchemas();
  464. try {
  465. assertThatThrownBy(() ->
  466. messagesService.sendMessage(targetCluster, topic, msgToSend).block());
  467. } finally {
  468. deleteTopic(topic);
  469. }
  470. }
  471. @SneakyThrows
  472. public void doAssert(Consumer<TopicMessageDTO> msgAssert) {
  473. String topic = createTopicAndCreateSchemas();
  474. try {
  475. messagesService.sendMessage(targetCluster, topic, msgToSend).block();
  476. TopicMessageDTO polled = messagesService.loadMessages(
  477. targetCluster,
  478. topic,
  479. new ConsumerPosition(
  480. SeekTypeDTO.BEGINNING,
  481. Map.of(new TopicPartition(topic, 0), 0L),
  482. SeekDirectionDTO.FORWARD
  483. ),
  484. null,
  485. 1
  486. ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
  487. .map(TopicMessageEventDTO::getMessage)
  488. .blockLast(Duration.ofSeconds(5000));
  489. assertThat(polled).isNotNull();
  490. assertThat(polled.getPartition()).isEqualTo(0);
  491. assertThat(polled.getOffset()).isNotNull();
  492. msgAssert.accept(polled);
  493. } finally {
  494. deleteTopic(topic);
  495. }
  496. }
  497. }
  498. }