KafkaConnectContainer.java 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package com.provectus.kafka.ui.container;
  2. import org.testcontainers.containers.GenericContainer;
  3. import org.testcontainers.containers.KafkaContainer;
  4. import org.testcontainers.containers.Network;
  5. import org.testcontainers.containers.wait.strategy.Wait;
  6. import java.time.Duration;
  7. public class KafkaConnectContainer extends GenericContainer<KafkaConnectContainer> {
  8. private static final int CONNECT_PORT = 8083;
  9. public KafkaConnectContainer(String version) {
  10. super("confluentinc/cp-kafka-connect:" + version);
  11. addExposedPort(CONNECT_PORT);
  12. waitStrategy = Wait.forHttp("/")
  13. .withStartupTimeout(Duration.ofMinutes(5));
  14. }
  15. public KafkaConnectContainer withKafka(KafkaContainer kafka) {
  16. String bootstrapServers = kafka.getNetworkAliases().get(0) + ":9092";
  17. return withKafka(kafka.getNetwork(), bootstrapServers);
  18. }
  19. public KafkaConnectContainer withKafka(Network network, String bootstrapServers) {
  20. withNetwork(network);
  21. withEnv("CONNECT_BOOTSTRAP_SERVERS", "PLAINTEXT://" + bootstrapServers);
  22. withEnv("CONNECT_GROUP_ID", "connect-group");
  23. withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect_configs");
  24. withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1");
  25. withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect_offset");
  26. withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1");
  27. withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect_status");
  28. withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1");
  29. withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter");
  30. withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter");
  31. withEnv("CONNECT_INTERNAL_KEY_CONVERTER", "org.apache.kafka.connect.json.JsonConverter");
  32. withEnv("CONNECT_INTERNAL_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter");
  33. withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect");
  34. withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java,/usr/share/confluent-hub-components");
  35. return self();
  36. }
  37. public String getTarget() {
  38. return "http://" + getContainerIpAddress() + ":" + getMappedPort(CONNECT_PORT);
  39. }
  40. }