AbstractIntegrationTest.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package com.provectus.kafka.ui;
  2. import com.provectus.kafka.ui.container.KafkaConnectContainer;
  3. import com.provectus.kafka.ui.container.SchemaRegistryContainer;
  4. import java.nio.file.Path;
  5. import java.util.List;
  6. import java.util.Properties;
  7. import org.apache.kafka.clients.admin.AdminClient;
  8. import org.apache.kafka.clients.admin.AdminClientConfig;
  9. import org.apache.kafka.clients.admin.NewTopic;
  10. import org.jetbrains.annotations.NotNull;
  11. import org.junit.jupiter.api.function.ThrowingConsumer;
  12. import org.junit.jupiter.api.io.TempDir;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
  15. import org.springframework.boot.test.context.SpringBootTest;
  16. import org.springframework.context.ApplicationContextInitializer;
  17. import org.springframework.context.ConfigurableApplicationContext;
  18. import org.springframework.test.context.ActiveProfiles;
  19. import org.springframework.test.context.ContextConfiguration;
  20. import org.springframework.test.util.TestSocketUtils;
  21. import org.testcontainers.containers.KafkaContainer;
  22. import org.testcontainers.containers.Network;
  23. import org.testcontainers.utility.DockerImageName;
  24. @SpringBootTest
  25. @ActiveProfiles("test")
  26. @AutoConfigureWebTestClient(timeout = "60000")
  27. @ContextConfiguration(initializers = {AbstractIntegrationTest.Initializer.class})
  28. public abstract class AbstractIntegrationTest {
  29. public static final String LOCAL = "local";
  30. public static final String SECOND_LOCAL = "secondLocal";
  31. private static final String CONFLUENT_PLATFORM_VERSION = "7.2.1";
  32. public static final KafkaContainer kafka = new KafkaContainer(
  33. DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION))
  34. .withNetwork(Network.SHARED);
  35. public static final SchemaRegistryContainer schemaRegistry =
  36. new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION)
  37. .withKafka(kafka)
  38. .dependsOn(kafka);
  39. public static final KafkaConnectContainer kafkaConnect =
  40. new KafkaConnectContainer(CONFLUENT_PLATFORM_VERSION)
  41. .withKafka(kafka)
  42. .dependsOn(kafka)
  43. .dependsOn(schemaRegistry);
  44. @TempDir
  45. public static Path tmpDir;
  46. static {
  47. kafka.start();
  48. schemaRegistry.start();
  49. kafkaConnect.start();
  50. }
  51. public static class Initializer
  52. implements ApplicationContextInitializer<ConfigurableApplicationContext> {
  53. @Override
  54. public void initialize(@NotNull ConfigurableApplicationContext context) {
  55. System.setProperty("kafka.clusters.0.name", LOCAL);
  56. System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());
  57. // List unavailable hosts to verify failover
  58. System.setProperty("kafka.clusters.0.schemaRegistry", String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",
  59. TestSocketUtils.findAvailableTcpPort(), schemaRegistry.getUrl()));
  60. System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect");
  61. System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
  62. System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
  63. System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
  64. System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
  65. System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
  66. System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");
  67. System.setProperty("kafka.clusters.1.name", SECOND_LOCAL);
  68. System.setProperty("kafka.clusters.1.readOnly", "true");
  69. System.setProperty("kafka.clusters.1.bootstrapServers", kafka.getBootstrapServers());
  70. System.setProperty("kafka.clusters.1.schemaRegistry", schemaRegistry.getUrl());
  71. System.setProperty("kafka.clusters.1.kafkaConnect.0.name", "kafka-connect");
  72. System.setProperty("kafka.clusters.1.kafkaConnect.0.address", kafkaConnect.getTarget());
  73. System.setProperty("dynamic.config.enabled", "true");
  74. System.setProperty("config.related.uploads.dir", tmpDir.toString());
  75. }
  76. }
  77. public static void createTopic(NewTopic topic) {
  78. withAdminClient(client -> client.createTopics(List.of(topic)).all().get());
  79. }
  80. public static void deleteTopic(String topic) {
  81. withAdminClient(client -> client.deleteTopics(List.of(topic)).all().get());
  82. }
  83. private static void withAdminClient(ThrowingConsumer<AdminClient> consumer) {
  84. Properties properties = new Properties();
  85. properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
  86. try (var client = AdminClient.create(properties)) {
  87. try {
  88. consumer.accept(client);
  89. } catch (Throwable throwable) {
  90. throw new RuntimeException(throwable);
  91. }
  92. }
  93. }
  94. @Autowired
  95. protected ConfigurableApplicationContext applicationContext;
  96. }