123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- package com.provectus.kafka.ui;
- import com.provectus.kafka.ui.model.*;
- import lombok.extern.log4j.Log4j2;
- import org.junit.jupiter.api.AfterEach;
- import org.junit.jupiter.api.BeforeEach;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
- import org.springframework.core.ParameterizedTypeReference;
- import org.springframework.test.context.ContextConfiguration;
- import org.springframework.test.web.reactive.server.WebTestClient;
- import java.util.List;
- import java.util.Map;
- import java.util.UUID;
- import static java.util.function.Predicate.not;
- import static org.junit.jupiter.api.Assertions.assertEquals;
- @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
- @Log4j2
- @AutoConfigureWebTestClient(timeout = "60000")
- public class KafkaConnectServiceTests extends AbstractBaseTest {
- private final String connectName = "kafka-connect";
- private final String connectorName = UUID.randomUUID().toString();
- private final Map<String, Object> config = Map.of(
- "name", connectorName,
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test"
- );
- @Autowired
- private WebTestClient webTestClient;
- @BeforeEach
- public void setUp() {
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .bodyValue(new NewConnector()
- .name(connectorName)
- .config(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test"
- ))
- )
- .exchange()
- .expectStatus().isOk();
- }
- @AfterEach
- public void tearDown() {
- webTestClient.delete()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, connectName, connectorName)
- .exchange()
- .expectStatus().isOk();
- }
- @Test
- public void shouldListConnectors() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
- .exists();
- }
- @Test
- public void shouldReturnNotFoundForNonExistingCluster() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", "nonExistingCluster", connectName)
- .exchange()
- .expectStatus().isNotFound();
- }
- @Test
- public void shouldReturnNotFoundForNonExistingConnectName() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, "nonExistingConnect")
- .exchange()
- .expectStatus().isNotFound();
- }
- @Test
- public void shouldRetrieveConnector() {
- Connector expected = (Connector) new Connector()
- .status(new ConnectorStatus()
- .state(ConnectorStatus.StateEnum.RUNNING)
- .workerId("kafka-connect:8083"))
- .tasks(List.of(new TaskId()
- .connector(connectorName)
- .task(0)))
- .type(Connector.TypeEnum.SINK)
- .name(connectorName)
- .config(config);
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, connectName, connectorName)
- .exchange()
- .expectStatus().isOk()
- .expectBody(Connector.class)
- .value(connector -> assertEquals(expected, connector));
- }
- @Test
- public void shouldUpdateConfig() {
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "another-topic",
- "file", "/tmp/new"
- )
- )
- .exchange()
- .expectStatus().isOk();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
- .exchange()
- .expectStatus().isOk()
- .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
- })
- .isEqualTo(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "another-topic",
- "file", "/tmp/new",
- "name", connectorName
- ));
- }
- @Test
- public void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() {
- var connectorName = UUID.randomUUID().toString();
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .bodyValue(Map.of(
- "name", connectorName,
- "config", Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "invalid number",
- "topics", "another-topic",
- "file", "/tmp/test"
- ))
- )
- .exchange()
- .expectStatus().isBadRequest();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
- .doesNotExist();
- }
- @Test
- public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() {
- var connectorName = UUID.randomUUID().toString();
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .bodyValue(Map.of(
- "name", connectorName,
- "config", Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
- ))
- )
- .exchange()
- .expectStatus().isBadRequest();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
- .doesNotExist();
- }
- @Test
- public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "invalid number",
- "topics", "another-topic",
- "file", "/tmp/test"
- )
- )
- .exchange()
- .expectStatus().isBadRequest();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
- .exchange()
- .expectStatus().isOk()
- .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
- })
- .isEqualTo(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test",
- "name", connectorName
- ));
- }
- @Test
- public void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() {
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
- )
- )
- .exchange()
- .expectStatus().isBadRequest();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName)
- .exchange()
- .expectStatus().isOk()
- .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
- })
- .isEqualTo(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test",
- "name", connectorName
- ));
- }
- @Test
- public void shouldRetrieveConnectorPlugins() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins", LOCAL, connectName)
- .exchange()
- .expectStatus().isOk()
- .expectBodyList(ConnectorPlugin.class)
- .value(plugins -> assertEquals(13, plugins.size()));
- }
- @Test
- public void shouldSuccessfullyValidateConnectorPluginConfiguration() {
- var pluginName = "FileStreamSinkConnector";
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", LOCAL, connectName, pluginName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test",
- "name", connectorName
- )
- )
- .exchange()
- .expectStatus().isOk()
- .expectBody(ConnectorPluginConfigValidationResponse.class)
- .value(response -> assertEquals(0, response.getErrorCount()));
- }
- @Test
- public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() {
- var pluginName = "FileStreamSinkConnector";
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", LOCAL, connectName, pluginName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "0",
- "topics", "output-topic",
- "file", "/tmp/test",
- "name", connectorName
- )
- )
- .exchange()
- .expectStatus().isOk()
- .expectBody(ConnectorPluginConfigValidationResponse.class)
- .value(response -> {
- assertEquals(1, response.getErrorCount());
- var error = response.getConfigs().stream()
- .map(ConnectorPluginConfig::getValue)
- .map(ConnectorPluginConfigValue::getErrors)
- .filter(not(List::isEmpty))
- .findFirst().get();
- assertEquals(
- "Invalid value 0 for configuration tasks.max: Value must be at least 1",
- error.get(0)
- );
- });
- }
- }
|