MessagesServiceTest.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package com.provectus.kafka.ui.service;
  2. import static org.assertj.core.api.Assertions.assertThat;
  3. import com.provectus.kafka.ui.AbstractIntegrationTest;
  4. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  5. import com.provectus.kafka.ui.model.ConsumerPosition;
  6. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  7. import com.provectus.kafka.ui.model.KafkaCluster;
  8. import com.provectus.kafka.ui.model.PollingModeDTO;
  9. import com.provectus.kafka.ui.model.TopicMessageDTO;
  10. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  11. import com.provectus.kafka.ui.producer.KafkaTestProducer;
  12. import com.provectus.kafka.ui.serdes.builtin.StringSerde;
  13. import java.util.HashSet;
  14. import java.util.List;
  15. import java.util.Set;
  16. import java.util.UUID;
  17. import java.util.concurrent.atomic.AtomicReference;
  18. import org.apache.kafka.clients.admin.NewTopic;
  19. import org.junit.jupiter.api.AfterEach;
  20. import org.junit.jupiter.api.BeforeEach;
  21. import org.junit.jupiter.api.Test;
  22. import org.junit.jupiter.params.ParameterizedTest;
  23. import org.junit.jupiter.params.provider.CsvSource;
  24. import org.springframework.beans.factory.annotation.Autowired;
  25. import reactor.core.publisher.Flux;
  26. import reactor.test.StepVerifier;
  27. class MessagesServiceTest extends AbstractIntegrationTest {
  28. private static final String MASKED_TOPICS_PREFIX = "masking-test-";
  29. private static final String NON_EXISTING_TOPIC = UUID.randomUUID().toString();
  30. @Autowired
  31. MessagesService messagesService;
  32. KafkaCluster cluster;
  33. Set<String> createdTopics = new HashSet<>();
  34. @BeforeEach
  35. void init() {
  36. cluster = applicationContext
  37. .getBean(ClustersStorage.class)
  38. .getClusterByName(LOCAL)
  39. .get();
  40. }
  41. @AfterEach
  42. void deleteCreatedTopics() {
  43. createdTopics.forEach(MessagesServiceTest::deleteTopic);
  44. }
  45. @Test
  46. void deleteTopicMessagesReturnsExceptionWhenTopicNotFound() {
  47. StepVerifier.create(messagesService.deleteTopicMessages(cluster, NON_EXISTING_TOPIC, List.of()))
  48. .expectError(TopicNotFoundException.class)
  49. .verify();
  50. }
  51. @Test
  52. void sendMessageReturnsExceptionWhenTopicNotFound() {
  53. StepVerifier.create(messagesService.sendMessage(cluster, NON_EXISTING_TOPIC, new CreateTopicMessageDTO()))
  54. .expectError(TopicNotFoundException.class)
  55. .verify();
  56. }
  57. @Test
  58. void loadMessagesReturnsExceptionWhenTopicNotFound() {
  59. StepVerifier.create(messagesService
  60. .loadMessages(cluster, NON_EXISTING_TOPIC,
  61. new ConsumerPosition(PollingModeDTO.TAILING, NON_EXISTING_TOPIC, List.of(), null, null),
  62. null, null, 1, "String", "String"))
  63. .expectError(TopicNotFoundException.class)
  64. .verify();
  65. }
  66. @Test
  67. void maskingAppliedOnConfiguredClusters() throws Exception {
  68. String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
  69. createTopicWithCleanup(new NewTopic(testTopic, 1, (short) 1));
  70. try (var producer = KafkaTestProducer.forKafka(kafka)) {
  71. producer.send(testTopic, "message1");
  72. producer.send(testTopic, "message2").get();
  73. }
  74. Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
  75. cluster,
  76. testTopic,
  77. new ConsumerPosition(PollingModeDTO.EARLIEST, testTopic, List.of(), null, null),
  78. null,
  79. null,
  80. 100,
  81. StringSerde.name(),
  82. StringSerde.name()
  83. ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
  84. .map(TopicMessageEventDTO::getMessage);
  85. // both messages should be masked
  86. StepVerifier.create(msgsFlux)
  87. .expectNextMatches(msg -> msg.getContent().equals("***"))
  88. .expectNextMatches(msg -> msg.getContent().equals("***"))
  89. .verifyComplete();
  90. }
  91. @ParameterizedTest
  92. @CsvSource({"EARLIEST", "LATEST"})
  93. void cursorIsRegisteredAfterPollingIsDoneAndCanBeUsedForNextPagePolling(PollingModeDTO mode) {
  94. String testTopic = MessagesServiceTest.class.getSimpleName() + UUID.randomUUID();
  95. createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1));
  96. int msgsToGenerate = 100;
  97. int pageSize = (msgsToGenerate / 2) + 1;
  98. try (var producer = KafkaTestProducer.forKafka(kafka)) {
  99. for (int i = 0; i < msgsToGenerate; i++) {
  100. producer.send(testTopic, "message_" + i);
  101. }
  102. }
  103. var cursorIdCatcher = new AtomicReference<String>();
  104. Flux<String> msgsFlux = messagesService.loadMessages(
  105. cluster, testTopic,
  106. new ConsumerPosition(mode, testTopic, List.of(), null, null),
  107. null, null, pageSize, StringSerde.name(), StringSerde.name())
  108. .doOnNext(evt -> {
  109. if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) {
  110. assertThat(evt.getCursor()).isNotNull();
  111. cursorIdCatcher.set(evt.getCursor().getId());
  112. }
  113. })
  114. .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
  115. .map(evt -> evt.getMessage().getContent());
  116. StepVerifier.create(msgsFlux)
  117. .expectNextCount(pageSize)
  118. .verifyComplete();
  119. assertThat(cursorIdCatcher.get()).isNotNull();
  120. Flux<String> remainingMsgs = messagesService.loadMessages(cluster, testTopic, cursorIdCatcher.get())
  121. .doOnNext(evt -> {
  122. if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) {
  123. assertThat(evt.getCursor()).isNull();
  124. }
  125. })
  126. .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
  127. .map(evt -> evt.getMessage().getContent());
  128. StepVerifier.create(remainingMsgs)
  129. .expectNextCount(msgsToGenerate - pageSize)
  130. .verifyComplete();
  131. }
  132. private void createTopicWithCleanup(NewTopic newTopic) {
  133. createTopic(newTopic);
  134. createdTopics.add(newTopic.name());
  135. }
  136. }