MessagesServiceTest.java 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.AbstractIntegrationTest;
  3. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  4. import com.provectus.kafka.ui.model.ConsumerPosition;
  5. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  6. import com.provectus.kafka.ui.model.KafkaCluster;
  7. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  8. import com.provectus.kafka.ui.model.SeekTypeDTO;
  9. import com.provectus.kafka.ui.model.TopicMessageDTO;
  10. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  11. import com.provectus.kafka.ui.producer.KafkaTestProducer;
  12. import com.provectus.kafka.ui.serdes.builtin.StringSerde;
  13. import java.util.List;
  14. import java.util.UUID;
  15. import org.apache.kafka.clients.admin.NewTopic;
  16. import org.junit.jupiter.api.BeforeEach;
  17. import org.junit.jupiter.api.Test;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import reactor.core.publisher.Flux;
  20. import reactor.test.StepVerifier;
  21. class MessagesServiceTest extends AbstractIntegrationTest {
  22. private static final String MASKED_TOPICS_PREFIX = "masking-test-";
  23. private static final String NON_EXISTING_TOPIC = UUID.randomUUID().toString();
  24. @Autowired
  25. MessagesService messagesService;
  26. KafkaCluster cluster;
  27. @BeforeEach
  28. void init() {
  29. cluster = applicationContext
  30. .getBean(ClustersStorage.class)
  31. .getClusterByName(LOCAL)
  32. .get();
  33. }
  34. @Test
  35. void deleteTopicMessagesReturnsExceptionWhenTopicNotFound() {
  36. StepVerifier.create(messagesService.deleteTopicMessages(cluster, NON_EXISTING_TOPIC, List.of()))
  37. .expectError(TopicNotFoundException.class)
  38. .verify();
  39. }
  40. @Test
  41. void sendMessageReturnsExceptionWhenTopicNotFound() {
  42. StepVerifier.create(messagesService.sendMessage(cluster, NON_EXISTING_TOPIC, new CreateTopicMessageDTO()))
  43. .expectError(TopicNotFoundException.class)
  44. .verify();
  45. }
  46. @Test
  47. void loadMessagesReturnsExceptionWhenTopicNotFound() {
  48. StepVerifier.create(messagesService
  49. .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String"))
  50. .expectError(TopicNotFoundException.class)
  51. .verify();
  52. }
  53. @Test
  54. void maskingAppliedOnConfiguredClusters() throws Exception {
  55. String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
  56. try (var producer = KafkaTestProducer.forKafka(kafka)) {
  57. createTopic(new NewTopic(testTopic, 1, (short) 1));
  58. producer.send(testTopic, "message1");
  59. producer.send(testTopic, "message2").get();
  60. Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
  61. cluster,
  62. testTopic,
  63. new ConsumerPosition(SeekTypeDTO.BEGINNING, testTopic, null),
  64. null,
  65. null,
  66. 100,
  67. SeekDirectionDTO.FORWARD,
  68. StringSerde.name(),
  69. StringSerde.name()
  70. ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
  71. .map(TopicMessageEventDTO::getMessage);
  72. // both messages should be masked
  73. StepVerifier.create(msgsFlux)
  74. .expectNextMatches(msg -> msg.getContent().equals("***"))
  75. .expectNextMatches(msg -> msg.getContent().equals("***"))
  76. .verifyComplete();
  77. } finally {
  78. deleteTopic(testTopic);
  79. }
  80. }
  81. }