123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- package com.provectus.kafka.ui;
- import static java.util.function.Predicate.not;
- import static org.junit.jupiter.api.Assertions.assertEquals;
- import com.provectus.kafka.ui.model.ConnectorDTO;
- import com.provectus.kafka.ui.model.ConnectorPluginConfigDTO;
- import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
- import com.provectus.kafka.ui.model.ConnectorPluginConfigValueDTO;
- import com.provectus.kafka.ui.model.ConnectorPluginDTO;
- import com.provectus.kafka.ui.model.ConnectorStateDTO;
- import com.provectus.kafka.ui.model.ConnectorStatusDTO;
- import com.provectus.kafka.ui.model.ConnectorTypeDTO;
- import com.provectus.kafka.ui.model.NewConnectorDTO;
- import com.provectus.kafka.ui.model.TaskIdDTO;
- import java.util.List;
- import java.util.Map;
- import java.util.UUID;
- import lombok.extern.slf4j.Slf4j;
- import org.junit.jupiter.api.AfterEach;
- import org.junit.jupiter.api.BeforeEach;
- import org.junit.jupiter.api.Test;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
- import org.springframework.core.ParameterizedTypeReference;
- import org.springframework.test.context.ContextConfiguration;
- import org.springframework.test.web.reactive.server.WebTestClient;
- @ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class})
- @Slf4j
- @AutoConfigureWebTestClient(timeout = "60000")
- public class KafkaConnectServiceTests extends AbstractBaseTest {
- private final String connectName = "kafka-connect";
- private final String connectorName = UUID.randomUUID().toString();
- private final Map<String, Object> config = Map.of(
- "name", connectorName,
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test",
- "test.password", "******"
- );
- @Autowired
- private WebTestClient webTestClient;
- @BeforeEach
- public void setUp() {
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .bodyValue(new NewConnectorDTO()
- .name(connectorName)
- .config(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test",
- "test.password", "test-credentials"
- ))
- )
- .exchange()
- .expectStatus().isOk();
- }
- @AfterEach
- public void tearDown() {
- webTestClient.delete()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL,
- connectName, connectorName)
- .exchange()
- .expectStatus().isOk();
- }
- @Test
- public void shouldListAllConnectors() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connectors", LOCAL)
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
- .exists();
- }
- @Test
- public void shouldFilterByNameConnectors() {
- webTestClient.get()
- .uri(
- "/api/clusters/{clusterName}/connectors?search={search}",
- LOCAL,
- connectorName.split("-")[1])
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
- .exists();
- }
- @Test
- public void shouldFilterByStatusConnectors() {
- webTestClient.get()
- .uri(
- "/api/clusters/{clusterName}/connectors?search={search}",
- LOCAL,
- "running")
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
- .exists();
- }
- @Test
- public void shouldFilterByTypeConnectors() {
- webTestClient.get()
- .uri(
- "/api/clusters/{clusterName}/connectors?search={search}",
- LOCAL,
- "sink")
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
- .exists();
- }
- @Test
- public void shouldNotFilterConnectors() {
- webTestClient.get()
- .uri(
- "/api/clusters/{clusterName}/connectors?search={search}",
- LOCAL,
- "something-else")
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
- .doesNotExist();
- }
- @Test
- public void shouldListConnectors() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
- .exists();
- }
- @Test
- public void shouldReturnNotFoundForNonExistingCluster() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", "nonExistingCluster",
- connectName)
- .exchange()
- .expectStatus().isNotFound();
- }
- @Test
- public void shouldReturnNotFoundForNonExistingConnectName() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL,
- "nonExistingConnect")
- .exchange()
- .expectStatus().isNotFound();
- }
- @Test
- public void shouldRetrieveConnector() {
- ConnectorDTO expected = (ConnectorDTO) new ConnectorDTO()
- .connect(connectName)
- .status(new ConnectorStatusDTO()
- .state(ConnectorStateDTO.RUNNING)
- .workerId("kafka-connect:8083"))
- .tasks(List.of(new TaskIdDTO()
- .connector(connectorName)
- .task(0)))
- .type(ConnectorTypeDTO.SINK)
- .name(connectorName)
- .config(config);
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL,
- connectName, connectorName)
- .exchange()
- .expectStatus().isOk()
- .expectBody(ConnectorDTO.class)
- .value(connector -> assertEquals(expected, connector));
- }
- @Test
- public void shouldUpdateConfig() {
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
- LOCAL, connectName, connectorName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "another-topic",
- "file", "/tmp/new"
- )
- )
- .exchange()
- .expectStatus().isOk();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
- LOCAL, connectName, connectorName)
- .exchange()
- .expectStatus().isOk()
- .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
- })
- .isEqualTo(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "another-topic",
- "file", "/tmp/new",
- "name", connectorName
- ));
- }
- @Test
- public void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() {
- var connectorName = UUID.randomUUID().toString();
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .bodyValue(Map.of(
- "name", connectorName,
- "config", Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "invalid number",
- "topics", "another-topic",
- "file", "/tmp/test"
- ))
- )
- .exchange()
- .expectStatus().isBadRequest();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
- .doesNotExist();
- }
- @Test
- public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() {
- var connectorName = UUID.randomUUID().toString();
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .bodyValue(Map.of(
- "name", connectorName,
- "config", Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
- ))
- )
- .exchange()
- .expectStatus().isBadRequest();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .exchange()
- .expectStatus().isOk()
- .expectBody()
- .jsonPath(String.format("$[?(@ == '%s')]", connectorName))
- .doesNotExist();
- }
- @Test
- public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
- LOCAL, connectName, connectorName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "invalid number",
- "topics", "another-topic",
- "file", "/tmp/test"
- )
- )
- .exchange()
- .expectStatus().isBadRequest();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
- LOCAL, connectName, connectorName)
- .exchange()
- .expectStatus().isOk()
- .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
- })
- .isEqualTo(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test",
- "name", connectorName,
- "test.password", "******"
- ));
- }
- @Test
- public void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() {
- webTestClient.put()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
- LOCAL, connectName, connectorName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector"
- )
- )
- .exchange()
- .expectStatus().isBadRequest();
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
- LOCAL, connectName, connectorName)
- .exchange()
- .expectStatus().isOk()
- .expectBody(new ParameterizedTypeReference<Map<String, Object>>() {
- })
- .isEqualTo(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test",
- "test.password", "******",
- "name", connectorName
- ));
- }
- @Test
- public void shouldRetrieveConnectorPlugins() {
- webTestClient.get()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins", LOCAL, connectName)
- .exchange()
- .expectStatus().isOk()
- .expectBodyList(ConnectorPluginDTO.class)
- .value(plugins -> assertEquals(14, plugins.size()));
- }
- @Test
- public void shouldSuccessfullyValidateConnectorPluginConfiguration() {
- var pluginName = "FileStreamSinkConnector";
- var path =
- "/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate";
- webTestClient.put()
- .uri(path, LOCAL, connectName, pluginName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test",
- "name", connectorName
- )
- )
- .exchange()
- .expectStatus().isOk()
- .expectBody(ConnectorPluginConfigValidationResponseDTO.class)
- .value(response -> assertEquals(0, response.getErrorCount()));
- }
- @Test
- public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() {
- var pluginName = "FileStreamSinkConnector";
- var path =
- "/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate";
- webTestClient.put()
- .uri(path, LOCAL, connectName, pluginName)
- .bodyValue(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "0",
- "topics", "output-topic",
- "file", "/tmp/test",
- "name", connectorName
- )
- )
- .exchange()
- .expectStatus().isOk()
- .expectBody(ConnectorPluginConfigValidationResponseDTO.class)
- .value(response -> {
- assertEquals(1, response.getErrorCount());
- var error = response.getConfigs().stream()
- .map(ConnectorPluginConfigDTO::getValue)
- .map(ConnectorPluginConfigValueDTO::getErrors)
- .filter(not(List::isEmpty))
- .findFirst().get();
- assertEquals(
- "Invalid value 0 for configuration tasks.max: Value must be at least 1",
- error.get(0)
- );
- });
- }
- @Test
- public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() {
- webTestClient.post()
- .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName)
- .bodyValue(new NewConnectorDTO()
- .name(connectorName)
- .config(Map.of(
- "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
- "tasks.max", "1",
- "topics", "output-topic",
- "file", "/tmp/test"
- ))
- )
- .exchange()
- .expectStatus()
- .isBadRequest();
- }
- }
|