SendAndReadTests.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. package com.provectus.kafka.ui.service;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import static org.assertj.core.api.Assertions.assertThatThrownBy;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import com.provectus.kafka.ui.AbstractBaseTest;
  6. import com.provectus.kafka.ui.model.ConsumerPosition;
  7. import com.provectus.kafka.ui.model.CreateTopicMessage;
  8. import com.provectus.kafka.ui.model.MessageFormat;
  9. import com.provectus.kafka.ui.model.SeekDirection;
  10. import com.provectus.kafka.ui.model.SeekType;
  11. import com.provectus.kafka.ui.model.TopicMessage;
  12. import com.provectus.kafka.ui.model.TopicMessageEvent;
  13. import io.confluent.kafka.schemaregistry.ParsedSchema;
  14. import io.confluent.kafka.schemaregistry.avro.AvroSchema;
  15. import io.confluent.kafka.schemaregistry.json.JsonSchema;
  16. import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
  17. import java.time.Duration;
  18. import java.util.Map;
  19. import java.util.Objects;
  20. import java.util.UUID;
  21. import java.util.function.Consumer;
  22. import lombok.SneakyThrows;
  23. import org.apache.kafka.clients.admin.NewTopic;
  24. import org.apache.kafka.common.TopicPartition;
  25. import org.junit.jupiter.api.Test;
  26. import org.springframework.beans.factory.annotation.Autowired;
  27. import org.springframework.test.context.ContextConfiguration;
  28. @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
  29. public class SendAndReadTests extends AbstractBaseTest {
  30. private static final AvroSchema AVRO_SCHEMA_1 = new AvroSchema(
  31. "{"
  32. + " \"type\": \"record\","
  33. + " \"name\": \"TestAvroRecord1\","
  34. + " \"fields\": ["
  35. + " {"
  36. + " \"name\": \"field1\","
  37. + " \"type\": \"string\""
  38. + " },"
  39. + " {"
  40. + " \"name\": \"field2\","
  41. + " \"type\": \"int\""
  42. + " }"
  43. + " ]"
  44. + "}"
  45. );
  46. private static final AvroSchema AVRO_SCHEMA_2 = new AvroSchema(
  47. "{"
  48. + " \"type\": \"record\","
  49. + " \"name\": \"TestAvroRecord2\","
  50. + " \"fields\": ["
  51. + " {"
  52. + " \"name\": \"f1\","
  53. + " \"type\": \"int\""
  54. + " },"
  55. + " {"
  56. + " \"name\": \"f2\","
  57. + " \"type\": \"string\""
  58. + " }"
  59. + " ]"
  60. + "}"
  61. );
  62. private static final String AVRO_SCHEMA_1_JSON_RECORD
  63. = "{ \"field1\":\"testStr\", \"field2\": 123 }";
  64. private static final String AVRO_SCHEMA_2_JSON_RECORD = "{ \"f1\": 111, \"f2\": \"testStr\" }";
  65. private static final ProtobufSchema PROTOBUF_SCHEMA = new ProtobufSchema(
  66. "syntax = \"proto3\";\n"
  67. + "package com.provectus;\n"
  68. + "\n"
  69. + "message TestProtoRecord {\n"
  70. + " string f1 = 1;\n"
  71. + " int32 f2 = 2;\n"
  72. + "}\n"
  73. + "\n"
  74. );
  75. private static final String PROTOBUF_SCHEMA_JSON_RECORD
  76. = "{ \"f1\" : \"test str\", \"f2\" : 123 }";
  77. private static final JsonSchema JSON_SCHEMA = new JsonSchema(
  78. "{ "
  79. + " \"$schema\": \"http://json-schema.org/draft-07/schema#\", "
  80. + " \"$id\": \"http://example.com/myURI.schema.json\", "
  81. + " \"title\": \"TestRecord\","
  82. + " \"type\": \"object\","
  83. + " \"additionalProperties\": false,"
  84. + " \"properties\": {"
  85. + " \"f1\": {"
  86. + " \"type\": \"integer\""
  87. + " },"
  88. + " \"f2\": {"
  89. + " \"type\": \"string\""
  90. + " },"
  91. // it is important special case since there is code in KafkaJsonSchemaSerializer
  92. // that checks fields with this name (it should be worked around)
  93. + " \"schema\": {"
  94. + " \"type\": \"string\""
  95. + " }"
  96. + " }"
  97. + "}"
  98. );
  99. private static final String JSON_SCHEMA_RECORD
  100. = "{ \"f1\": 12, \"f2\": \"testJsonSchema1\", \"schema\": \"some txt\" }";
  101. @Autowired
  102. private ClusterService clusterService;
  103. @Autowired
  104. private ClustersMetricsScheduler clustersMetricsScheduler;
  105. @Test
  106. void noSchemaStringKeyStringValue() {
  107. new SendAndReadSpec()
  108. .withMsgToSend(
  109. new CreateTopicMessage()
  110. .key("testKey")
  111. .content("testValue")
  112. )
  113. .doAssert(polled -> {
  114. assertThat(polled.getKey()).isEqualTo("testKey");
  115. assertThat(polled.getContent()).isEqualTo("testValue");
  116. });
  117. }
  118. @Test
  119. void noSchemaJsonKeyJsonValue() {
  120. new SendAndReadSpec()
  121. .withMsgToSend(
  122. new CreateTopicMessage()
  123. .key("{ \"f1\": 111, \"f2\": \"testStr1\" }")
  124. .content("{ \"f1\": 222, \"f2\": \"testStr2\" }")
  125. )
  126. .doAssert(polled -> {
  127. assertThat(polled.getKey()).isEqualTo("{ \"f1\": 111, \"f2\": \"testStr1\" }");
  128. assertThat(polled.getContent()).isEqualTo("{ \"f1\": 222, \"f2\": \"testStr2\" }");
  129. });
  130. }
  131. @Test
  132. void keyIsIntValueIsDoubleShouldBeSerializedAsStrings() {
  133. new SendAndReadSpec()
  134. .withMsgToSend(
  135. new CreateTopicMessage()
  136. .key("123")
  137. .content("234.56")
  138. )
  139. .doAssert(polled -> {
  140. assertThat(polled.getKey()).isEqualTo("123");
  141. assertThat(polled.getContent()).isEqualTo("234.56");
  142. });
  143. }
  144. @Test
  145. void noSchemaKeyIsNull() {
  146. new SendAndReadSpec()
  147. .withMsgToSend(
  148. new CreateTopicMessage()
  149. .key(null)
  150. .content("testValue")
  151. )
  152. .doAssert(polled -> {
  153. assertThat(polled.getKey()).isNull();
  154. assertThat(polled.getContent()).isEqualTo("testValue");
  155. });
  156. }
  157. @Test
  158. void noSchemaValueIsNull() {
  159. new SendAndReadSpec()
  160. .withMsgToSend(
  161. new CreateTopicMessage()
  162. .key("testKey")
  163. .content(null)
  164. )
  165. .doAssert(polled -> {
  166. assertThat(polled.getKey()).isEqualTo("testKey");
  167. assertThat(polled.getContent()).isNull();
  168. });
  169. }
  170. @Test
  171. void nonNullableKvWithAvroSchema() {
  172. new SendAndReadSpec()
  173. .withKeySchema(AVRO_SCHEMA_1)
  174. .withValueSchema(AVRO_SCHEMA_2)
  175. .withMsgToSend(
  176. new CreateTopicMessage()
  177. .key(AVRO_SCHEMA_1_JSON_RECORD)
  178. .content(AVRO_SCHEMA_2_JSON_RECORD)
  179. )
  180. .doAssert(polled -> {
  181. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  182. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
  183. });
  184. }
  185. @Test
  186. void keyWithNoSchemaValueWithAvroSchema() {
  187. new SendAndReadSpec()
  188. .withValueSchema(AVRO_SCHEMA_1)
  189. .withMsgToSend(
  190. new CreateTopicMessage()
  191. .key("testKey")
  192. .content(AVRO_SCHEMA_1_JSON_RECORD)
  193. )
  194. .doAssert(polled -> {
  195. assertThat(polled.getKey()).isEqualTo("testKey");
  196. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_1_JSON_RECORD);
  197. });
  198. }
  199. @Test
  200. void keyWithAvroSchemaValueWithNoSchema() {
  201. new SendAndReadSpec()
  202. .withKeySchema(AVRO_SCHEMA_1)
  203. .withMsgToSend(
  204. new CreateTopicMessage()
  205. .key(AVRO_SCHEMA_1_JSON_RECORD)
  206. .content("testVal")
  207. )
  208. .doAssert(polled -> {
  209. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  210. assertThat(polled.getContent()).isEqualTo("testVal");
  211. });
  212. }
  213. @Test
  214. void keyWithNoSchemaValueWithProtoSchema() {
  215. new SendAndReadSpec()
  216. .withValueSchema(PROTOBUF_SCHEMA)
  217. .withMsgToSend(
  218. new CreateTopicMessage()
  219. .key("testKey")
  220. .content(PROTOBUF_SCHEMA_JSON_RECORD)
  221. )
  222. .doAssert(polled -> {
  223. assertThat(polled.getKey()).isEqualTo("testKey");
  224. assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
  225. });
  226. }
  227. @Test
  228. void keyWithAvroSchemaValueWithAvroSchemaKeyIsNull() {
  229. new SendAndReadSpec()
  230. .withKeySchema(AVRO_SCHEMA_1)
  231. .withValueSchema(AVRO_SCHEMA_2)
  232. .withMsgToSend(
  233. new CreateTopicMessage()
  234. .key(null)
  235. .content(AVRO_SCHEMA_2_JSON_RECORD)
  236. )
  237. .doAssert(polled -> {
  238. assertThat(polled.getKey()).isNull();
  239. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
  240. });
  241. }
  242. @Test
  243. void valueWithAvroSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
  244. new SendAndReadSpec()
  245. .withValueSchema(AVRO_SCHEMA_2)
  246. .withMsgToSend(
  247. new CreateTopicMessage()
  248. // f2 has type object instead of string
  249. .content("{ \"f1\": 111, \"f2\": {} }")
  250. )
  251. .assertSendThrowsException();
  252. }
  253. @Test
  254. void keyWithAvroSchemaValueWithAvroSchemaValueIsNull() {
  255. new SendAndReadSpec()
  256. .withKeySchema(AVRO_SCHEMA_1)
  257. .withValueSchema(AVRO_SCHEMA_2)
  258. .withMsgToSend(
  259. new CreateTopicMessage()
  260. .key(AVRO_SCHEMA_1_JSON_RECORD)
  261. .content(null)
  262. )
  263. .doAssert(polled -> {
  264. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  265. assertThat(polled.getContent()).isNull();
  266. });
  267. }
  268. @Test
  269. void keyWithAvroSchemaValueWithProtoSchema() {
  270. new SendAndReadSpec()
  271. .withKeySchema(AVRO_SCHEMA_1)
  272. .withValueSchema(PROTOBUF_SCHEMA)
  273. .withMsgToSend(
  274. new CreateTopicMessage()
  275. .key(AVRO_SCHEMA_1_JSON_RECORD)
  276. .content(PROTOBUF_SCHEMA_JSON_RECORD)
  277. )
  278. .doAssert(polled -> {
  279. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  280. assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
  281. });
  282. }
  283. @Test
  284. void valueWithProtoSchemaShouldThrowExceptionArgIsNotValidJsonObject() {
  285. new SendAndReadSpec()
  286. .withValueSchema(PROTOBUF_SCHEMA)
  287. .withMsgToSend(
  288. new CreateTopicMessage()
  289. // f2 field has type object instead of int
  290. .content("{ \"f1\" : \"test str\", \"f2\" : {} }"))
  291. .assertSendThrowsException();
  292. }
  293. @Test
  294. void keyWithProtoSchemaValueWithJsonSchema() {
  295. new SendAndReadSpec()
  296. .withKeySchema(PROTOBUF_SCHEMA)
  297. .withValueSchema(JSON_SCHEMA)
  298. .withMsgToSend(
  299. new CreateTopicMessage()
  300. .key(PROTOBUF_SCHEMA_JSON_RECORD)
  301. .content(JSON_SCHEMA_RECORD)
  302. )
  303. .doAssert(polled -> {
  304. assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
  305. assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
  306. });
  307. }
  308. @Test
  309. void keyWithJsonValueWithJsonSchemaKeyValueIsNull() {
  310. new SendAndReadSpec()
  311. .withKeySchema(JSON_SCHEMA)
  312. .withValueSchema(JSON_SCHEMA)
  313. .withMsgToSend(
  314. new CreateTopicMessage()
  315. .key(JSON_SCHEMA_RECORD)
  316. )
  317. .doAssert(polled -> {
  318. assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
  319. assertThat(polled.getContent()).isNull();
  320. });
  321. }
  322. @Test
  323. void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() {
  324. new SendAndReadSpec()
  325. .withValueSchema(JSON_SCHEMA)
  326. .withMsgToSend(
  327. new CreateTopicMessage()
  328. // 'f2' field has has type object instead of string
  329. .content("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }")
  330. )
  331. .assertSendThrowsException();
  332. }
  333. @Test
  334. void topicMessageMetadataAvro() {
  335. new SendAndReadSpec()
  336. .withKeySchema(AVRO_SCHEMA_1)
  337. .withValueSchema(AVRO_SCHEMA_2)
  338. .withMsgToSend(
  339. new CreateTopicMessage()
  340. .key(AVRO_SCHEMA_1_JSON_RECORD)
  341. .content(AVRO_SCHEMA_2_JSON_RECORD)
  342. )
  343. .doAssert(polled -> {
  344. assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD);
  345. assertJsonEqual(polled.getContent(), AVRO_SCHEMA_2_JSON_RECORD);
  346. assertThat(polled.getKeySize()).isEqualTo(15L);
  347. assertThat(polled.getValueSize()).isEqualTo(15L);
  348. assertThat(polled.getKeyFormat()).isEqualTo(MessageFormat.AVRO);
  349. assertThat(polled.getValueFormat()).isEqualTo(MessageFormat.AVRO);
  350. assertThat(polled.getKeySchemaId()).isNotEmpty();
  351. assertThat(polled.getValueSchemaId()).isNotEmpty();
  352. });
  353. }
  354. @Test
  355. void topicMessageMetadataProtobuf() {
  356. new SendAndReadSpec()
  357. .withKeySchema(PROTOBUF_SCHEMA)
  358. .withValueSchema(PROTOBUF_SCHEMA)
  359. .withMsgToSend(
  360. new CreateTopicMessage()
  361. .key(PROTOBUF_SCHEMA_JSON_RECORD)
  362. .content(PROTOBUF_SCHEMA_JSON_RECORD)
  363. )
  364. .doAssert(polled -> {
  365. assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD);
  366. assertJsonEqual(polled.getContent(), PROTOBUF_SCHEMA_JSON_RECORD);
  367. assertThat(polled.getKeySize()).isEqualTo(18L);
  368. assertThat(polled.getValueSize()).isEqualTo(18L);
  369. assertThat(polled.getKeyFormat()).isEqualTo(MessageFormat.PROTOBUF);
  370. assertThat(polled.getValueFormat()).isEqualTo(MessageFormat.PROTOBUF);
  371. assertThat(polled.getKeySchemaId()).isNotEmpty();
  372. assertThat(polled.getValueSchemaId()).isNotEmpty();
  373. });
  374. }
  375. @Test
  376. void topicMessageMetadataJson() {
  377. new SendAndReadSpec()
  378. .withKeySchema(JSON_SCHEMA)
  379. .withValueSchema(JSON_SCHEMA)
  380. .withMsgToSend(
  381. new CreateTopicMessage()
  382. .key(JSON_SCHEMA_RECORD)
  383. .content(JSON_SCHEMA_RECORD)
  384. .headers(Map.of("header1", "value1"))
  385. )
  386. .doAssert(polled -> {
  387. assertJsonEqual(polled.getKey(), JSON_SCHEMA_RECORD);
  388. assertJsonEqual(polled.getContent(), JSON_SCHEMA_RECORD);
  389. assertThat(polled.getKeyFormat()).isEqualTo(MessageFormat.JSON);
  390. assertThat(polled.getValueFormat()).isEqualTo(MessageFormat.JSON);
  391. assertThat(polled.getKeySchemaId()).isNotEmpty();
  392. assertThat(polled.getValueSchemaId()).isNotEmpty();
  393. assertThat(polled.getKeySize()).isEqualTo(57L);
  394. assertThat(polled.getValueSize()).isEqualTo(57L);
  395. assertThat(polled.getHeadersSize()).isEqualTo(13L);
  396. });
  397. }
  398. @SneakyThrows
  399. private void assertJsonEqual(String actual, String expected) {
  400. var mapper = new ObjectMapper();
  401. assertThat(mapper.readTree(actual)).isEqualTo(mapper.readTree(expected));
  402. }
  403. class SendAndReadSpec {
  404. CreateTopicMessage msgToSend;
  405. ParsedSchema keySchema;
  406. ParsedSchema valueSchema;
  407. public SendAndReadSpec withMsgToSend(CreateTopicMessage msg) {
  408. this.msgToSend = msg;
  409. return this;
  410. }
  411. public SendAndReadSpec withKeySchema(ParsedSchema keyScheam) {
  412. this.keySchema = keyScheam;
  413. return this;
  414. }
  415. public SendAndReadSpec withValueSchema(ParsedSchema valueSchema) {
  416. this.valueSchema = valueSchema;
  417. return this;
  418. }
  419. @SneakyThrows
  420. private String createTopicAndCreateSchemas() {
  421. Objects.requireNonNull(msgToSend);
  422. String topic = UUID.randomUUID().toString();
  423. createTopic(new NewTopic(topic, 1, (short) 1));
  424. if (keySchema != null) {
  425. schemaRegistry.schemaRegistryClient().register(topic + "-key", keySchema);
  426. }
  427. if (valueSchema != null) {
  428. schemaRegistry.schemaRegistryClient().register(topic + "-value", valueSchema);
  429. }
  430. // need to update to see new topic & schemas
  431. clustersMetricsScheduler.updateMetrics();
  432. return topic;
  433. }
  434. public void assertSendThrowsException() {
  435. String topic = createTopicAndCreateSchemas();
  436. try {
  437. assertThatThrownBy(() -> clusterService.sendMessage(LOCAL, topic, msgToSend).block());
  438. } finally {
  439. deleteTopic(topic);
  440. }
  441. }
  442. @SneakyThrows
  443. public void doAssert(Consumer<TopicMessage> msgAssert) {
  444. String topic = createTopicAndCreateSchemas();
  445. try {
  446. clusterService.sendMessage(LOCAL, topic, msgToSend).block();
  447. TopicMessage polled = clusterService.getMessages(
  448. LOCAL,
  449. topic,
  450. new ConsumerPosition(
  451. SeekType.BEGINNING,
  452. Map.of(new TopicPartition(topic, 0), 0L),
  453. SeekDirection.FORWARD
  454. ),
  455. null,
  456. 1
  457. ).filter(e -> e.getType().equals(TopicMessageEvent.TypeEnum.MESSAGE))
  458. .map(TopicMessageEvent::getMessage)
  459. .blockLast(Duration.ofSeconds(5000));
  460. assertThat(polled).isNotNull();
  461. assertThat(polled.getPartition()).isEqualTo(0);
  462. assertThat(polled.getOffset()).isNotNull();
  463. msgAssert.accept(polled);
  464. } finally {
  465. deleteTopic(topic);
  466. }
  467. }
  468. }
  469. }