MessagesServiceTest.java 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package com.provectus.kafka.ui.service;
  2. import static com.provectus.kafka.ui.service.MessagesService.execSmartFilterTest;
  3. import static org.assertj.core.api.Assertions.assertThat;
  4. import com.provectus.kafka.ui.AbstractIntegrationTest;
  5. import com.provectus.kafka.ui.exception.TopicNotFoundException;
  6. import com.provectus.kafka.ui.model.ConsumerPosition;
  7. import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
  8. import com.provectus.kafka.ui.model.KafkaCluster;
  9. import com.provectus.kafka.ui.model.SeekDirectionDTO;
  10. import com.provectus.kafka.ui.model.SeekTypeDTO;
  11. import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
  12. import com.provectus.kafka.ui.model.TopicMessageDTO;
  13. import com.provectus.kafka.ui.model.TopicMessageEventDTO;
  14. import com.provectus.kafka.ui.producer.KafkaTestProducer;
  15. import com.provectus.kafka.ui.serdes.builtin.StringSerde;
  16. import java.util.List;
  17. import java.util.Map;
  18. import java.util.UUID;
  19. import org.apache.kafka.clients.admin.NewTopic;
  20. import org.junit.jupiter.api.BeforeEach;
  21. import org.junit.jupiter.api.Test;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import reactor.core.publisher.Flux;
  24. import reactor.test.StepVerifier;
  25. class MessagesServiceTest extends AbstractIntegrationTest {
  26. private static final String MASKED_TOPICS_PREFIX = "masking-test-";
  27. private static final String NON_EXISTING_TOPIC = UUID.randomUUID().toString();
  28. @Autowired
  29. MessagesService messagesService;
  30. KafkaCluster cluster;
  31. @BeforeEach
  32. void init() {
  33. cluster = applicationContext
  34. .getBean(ClustersStorage.class)
  35. .getClusterByName(LOCAL)
  36. .get();
  37. }
  38. @Test
  39. void deleteTopicMessagesReturnsExceptionWhenTopicNotFound() {
  40. StepVerifier.create(messagesService.deleteTopicMessages(cluster, NON_EXISTING_TOPIC, List.of()))
  41. .expectError(TopicNotFoundException.class)
  42. .verify();
  43. }
  44. @Test
  45. void sendMessageReturnsExceptionWhenTopicNotFound() {
  46. StepVerifier.create(messagesService.sendMessage(cluster, NON_EXISTING_TOPIC, new CreateTopicMessageDTO()))
  47. .expectError(TopicNotFoundException.class)
  48. .verify();
  49. }
  50. @Test
  51. void loadMessagesReturnsExceptionWhenTopicNotFound() {
  52. StepVerifier.create(messagesService
  53. .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String"))
  54. .expectError(TopicNotFoundException.class)
  55. .verify();
  56. }
  57. @Test
  58. void maskingAppliedOnConfiguredClusters() throws Exception {
  59. String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
  60. try (var producer = KafkaTestProducer.forKafka(kafka)) {
  61. createTopic(new NewTopic(testTopic, 1, (short) 1));
  62. producer.send(testTopic, "message1");
  63. producer.send(testTopic, "message2").get();
  64. Flux<TopicMessageDTO> msgsFlux = messagesService.loadMessages(
  65. cluster,
  66. testTopic,
  67. new ConsumerPosition(SeekTypeDTO.BEGINNING, testTopic, null),
  68. null,
  69. null,
  70. 100,
  71. SeekDirectionDTO.FORWARD,
  72. StringSerde.name(),
  73. StringSerde.name()
  74. ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
  75. .map(TopicMessageEventDTO::getMessage);
  76. // both messages should be masked
  77. StepVerifier.create(msgsFlux)
  78. .expectNextMatches(msg -> msg.getContent().equals("***"))
  79. .expectNextMatches(msg -> msg.getContent().equals("***"))
  80. .verifyComplete();
  81. } finally {
  82. deleteTopic(testTopic);
  83. }
  84. }
  85. @Test
  86. void execSmartFilterTestReturnsExecutionResult() {
  87. var executionParams = new SmartFilterTestExecutionDTO()
  88. .filterCode("key != null && val != null && headers != null && timestampMs != null && offset != null")
  89. .key("key")
  90. .value("val")
  91. .headers(Map.of("h1", "hv1"))
  92. .offset(12345L)
  93. .timestampMs(System.currentTimeMillis())
  94. .partition(1);
  95. assertThat(execSmartFilterTest(executionParams).getResult()).isTrue();
  96. executionParams.setFilterCode("return false");
  97. assertThat(execSmartFilterTest(executionParams).getResult()).isFalse();
  98. }
  99. @Test
  100. void execSmartFilterTestReturnsErrorOnFilterApplyError() {
  101. var result = execSmartFilterTest(
  102. new SmartFilterTestExecutionDTO()
  103. .filterCode("return 1/0")
  104. );
  105. assertThat(result.getResult()).isNull();
  106. assertThat(result.getError()).containsIgnoringCase("execution error");
  107. }
  108. @Test
  109. void execSmartFilterTestReturnsErrorOnFilterCompilationError() {
  110. var result = execSmartFilterTest(
  111. new SmartFilterTestExecutionDTO()
  112. .filterCode("this is invalid groovy syntax = 1")
  113. );
  114. assertThat(result.getResult()).isNull();
  115. assertThat(result.getError()).containsIgnoringCase("Compilation error");
  116. }
  117. }