From 377fa830c6bc52551251c502ef80ea89811e9021 Mon Sep 17 00:00:00 2001 From: Ramazan Yapparov Date: Fri, 26 Feb 2021 15:48:04 +0300 Subject: [PATCH] #163 Kafka connect crud (#182) * - added kafka connect to docker-compose files - added kafka connect property to application * - added /connectors endpoint - added /connectors/{name} endpoint * - added /connectors/{name}/(restart/pause/resume) endpoints - updated connector config model - added FileStream connectors configs * - added /connectors post endpoint * - added all other kafka connect endpoints - reverted config file * - moved kafka connect related endpoints to separate tag/controller * - added kafka connect container - added simple test for kafka connect service - added assertj dependency * - added tests for KafkaConnectService * - added more tests - moved kafkaConnect rest client calls to separate class - added validation * - removed additional validation - removed Client class - removed client test and replaced with proper integration test - added retries * - cleanup * moved to using generated kafka connect client * updated rest schema * added new maven clean plugin configuration so it deletes generated sources in kafka-ui-react-app/src directory * changed restart/pause/resume action schema * - added test - refactoring * added more error handling * added more tests for error handling * fixed schema registry tests * changed /connect to /connects * - fixed mutating connector object during getConnector request - added new retrying kafka connect client with common retry logic - fixed dependency scope * - removed unnecessary `.cache()` * - reverted testcontainers initialization code * - added missing UNASSIGNED status to response enum * - fixed configurations - fixed testcontainers lifecycle management * fixed application-local.yml --- docker/kafka-clusters-only.yaml | 27 + docker/kafka-ui.yaml | 31 + kafka-ui-api/pom.xml | 18 +- .../cluster/client/KafkaConnectClients.java | 15 + .../client/RetryingKafkaConnectClient.java | 87 +++ .../ui/cluster/config/ClustersProperties.java | 7 + .../RebalanceInProgressException.java | 15 + .../exception/ValidationException.java | 14 + .../ui/cluster/mapper/ClusterMapper.java | 1 + .../ui/cluster/mapper/KafkaConnectMapper.java | 24 + .../kafka/ui/cluster/model/KafkaCluster.java | 3 + .../ui/cluster/model/KafkaConnectCluster.java | 11 + .../cluster/service/KafkaConnectService.java | 183 ++++++ .../ui/rest/KafkaConnectRestController.java | 92 +++ .../src/main/resources/application-local.yml | 7 +- .../provectus/kafka/ui/AbstractBaseTest.java | 53 +- .../kafka/ui/KafkaConnectContainer.java | 42 ++ .../kafka/ui/KafkaConnectServiceTests.java | 304 ++++++++++ .../kafka/ui/SchemaRegistryServiceTests.java | 4 +- kafka-ui-contract/pom.xml | 48 +- .../resources/swagger/kafka-connect-api.yaml | 501 ++++++++++++++++ .../main/resources/swagger/kafka-ui-api.yaml | 556 ++++++++++++++++++ pom.xml | 5 + 23 files changed, 2027 insertions(+), 21 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/KafkaConnectClients.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/RetryingKafkaConnectClient.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/RebalanceInProgressException.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ValidationException.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/KafkaConnectMapper.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaConnectCluster.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/KafkaConnectService.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/KafkaConnectRestController.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java create mode 100644 kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java create mode 100644 kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml diff --git a/docker/kafka-clusters-only.yaml b/docker/kafka-clusters-only.yaml index 91b74cd327..6911054ceb 100644 --- a/docker/kafka-clusters-only.yaml +++ b/docker/kafka-clusters-only.yaml @@ -88,6 +88,32 @@ services: ports: - 8081:8081 + kafka-connect0: + image: confluentinc/cp-kafka-connect:5.1.0 + ports: + - 8083:8083 + depends_on: + - kafka0 + - schemaregistry0 + environment: + CONNECT_BOOTSTRAP_SERVERS: kafka0:29092 + CONNECT_GROUP_ID: compose-connect-group + CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: _connect_status + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 + CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 + CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" + + kafka-init-topics: image: confluentinc/cp-kafka:5.1.0 volumes: @@ -98,4 +124,5 @@ services: cub kafka-ready -b kafka1:29092 1 30 && \ kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \ kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \ + kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \ kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'" diff --git a/docker/kafka-ui.yaml b/docker/kafka-ui.yaml index 780785920d..9740be15d1 100644 --- a/docker/kafka-ui.yaml +++ b/docker/kafka-ui.yaml @@ -13,17 +13,22 @@ services: - kafka0 - kafka1 - schemaregistry0 + - kafka-connect0 environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181 KAFKA_CLUSTERS_0_JMXPORT: 9997 KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085 + KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first + KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 KAFKA_CLUSTERS_1_NAME: secondLocal KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092 KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181 KAFKA_CLUSTERS_1_JMXPORT: 9998 KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry0:8085 + KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first + KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 zookeeper0: image: confluentinc/cp-zookeeper:5.1.0 @@ -91,6 +96,31 @@ services: SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas + kafka-connect0: + image: confluentinc/cp-kafka-connect:5.1.0 + ports: + - 8083:8083 + depends_on: + - kafka0 + - schemaregistry0 + environment: + CONNECT_BOOTSTRAP_SERVERS: kafka0:29092 + CONNECT_GROUP_ID: compose-connect-group + CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: _connect_status + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 + CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 + CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" + kafka-init-topics: image: confluentinc/cp-kafka:5.1.0 volumes: @@ -101,4 +131,5 @@ services: cub kafka-ready -b kafka1:29092 1 30 && \ kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \ kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \ + kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \ kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'" diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index b5924a3ed1..3df0f29bb9 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -103,6 +103,10 @@ org.springframework.boot spring-boot-starter-log4j2 + + io.projectreactor.addons + reactor-extra + org.springframework.boot @@ -164,8 +168,8 @@ maven-compiler-plugin ${maven-compiler-plugin.version} - 13 - 13 + ${maven.compiler.source} + ${maven.compiler.target} org.mapstruct @@ -185,6 +189,16 @@ + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + + --illegal-access=permit + + + diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/KafkaConnectClients.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/KafkaConnectClients.java new file mode 100644 index 0000000000..5c9ed96969 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/KafkaConnectClients.java @@ -0,0 +1,15 @@ +package com.provectus.kafka.ui.cluster.client; + +import com.provectus.kafka.ui.connect.api.ConnectApi; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public final class KafkaConnectClients { + + private static final Map CACHE = new ConcurrentHashMap<>(); + + public static ConnectApi withBaseUrl(String basePath) { + return CACHE.computeIfAbsent(basePath, RetryingKafkaConnectClient::new); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/RetryingKafkaConnectClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/RetryingKafkaConnectClient.java new file mode 100644 index 0000000000..a11564e84a --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/client/RetryingKafkaConnectClient.java @@ -0,0 +1,87 @@ +package com.provectus.kafka.ui.cluster.client; + +import com.provectus.kafka.ui.cluster.exception.RebalanceInProgressException; +import com.provectus.kafka.ui.cluster.exception.ValidationException; +import com.provectus.kafka.ui.connect.ApiClient; +import com.provectus.kafka.ui.connect.api.ConnectApi; +import com.provectus.kafka.ui.connect.model.Connector; +import com.provectus.kafka.ui.connect.model.NewConnector; +import lombok.extern.log4j.Log4j2; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.util.MultiValueMap; +import org.springframework.web.client.RestClientException; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.retry.Retry; + +import java.util.List; +import java.util.Map; + +@Log4j2 +public class RetryingKafkaConnectClient extends ConnectApi { + private static final int MAX_RETRIES = 5; + + public RetryingKafkaConnectClient(String basePath) { + super(new RetryingApiClient().setBasePath(basePath)); + } + + @Override + public Mono createConnector(NewConnector newConnector) throws RestClientException { + return withBadRequestErrorHandling( + super.createConnector(newConnector) + ); + } + + @Override + public Mono setConnectorConfig(String connectorName, Map requestBody) throws RestClientException { + return withBadRequestErrorHandling( + super.setConnectorConfig(connectorName, requestBody) + ); + } + + private static class RetryingApiClient extends ApiClient { + @Override + public Mono invokeAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, HttpHeaders headerParams, MultiValueMap cookieParams, MultiValueMap formParams, List accept, MediaType contentType, String[] authNames, ParameterizedTypeReference returnType) throws RestClientException { + return withRetryOnConflict( + super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType) + ); + } + + @Override + public Flux invokeFluxAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, HttpHeaders headerParams, MultiValueMap cookieParams, MultiValueMap formParams, List accept, MediaType contentType, String[] authNames, ParameterizedTypeReference returnType) throws RestClientException { + return withRetryOnConflict( + super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams, formParams, accept, contentType, authNames, returnType) + ); + } + } + + private static Mono withRetryOnConflict(Mono publisher) { + return publisher.retryWhen( + Retry.onlyIf(e -> e.exception() instanceof WebClientResponseException.Conflict) + .retryMax(MAX_RETRIES) + ) + .onErrorResume(WebClientResponseException.Conflict.class, e -> Mono.error(new RebalanceInProgressException())) + .doOnError(log::error); + } + + private static Flux withRetryOnConflict(Flux publisher) { + return publisher.retryWhen( + Retry.onlyIf(e -> e.exception() instanceof WebClientResponseException.Conflict) + .retryMax(MAX_RETRIES) + ) + .onErrorResume(WebClientResponseException.Conflict.class, e -> Mono.error(new RebalanceInProgressException())) + .doOnError(log::error); + } + + private static Mono withBadRequestErrorHandling(Mono publisher) { + return publisher + .onErrorResume(WebClientResponseException.BadRequest.class, e -> + Mono.error(new ValidationException("Invalid configuration"))) + .onErrorResume(WebClientResponseException.InternalServerError.class, e -> + Mono.error(new ValidationException("Invalid configuration"))); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java index 093bfefef8..9db628c4f4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java @@ -24,7 +24,14 @@ public class ClustersProperties { String schemaNameTemplate = "%s-value"; String protobufFile; String protobufMessageName; + List kafkaConnect; int jmxPort; Properties properties; } + + @Data + public static class ConnectCluster { + String name; + String address; + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/RebalanceInProgressException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/RebalanceInProgressException.java new file mode 100644 index 0000000000..a6fe7c4865 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/RebalanceInProgressException.java @@ -0,0 +1,15 @@ +package com.provectus.kafka.ui.cluster.exception; + +import org.springframework.http.HttpStatus; + +public class RebalanceInProgressException extends CustomBaseException { + + public RebalanceInProgressException() { + super("Rebalance is in progress."); + } + + @Override + public HttpStatus getResponseStatusCode() { + return HttpStatus.CONFLICT; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ValidationException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ValidationException.java new file mode 100644 index 0000000000..ff2e9b72af --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/exception/ValidationException.java @@ -0,0 +1,14 @@ +package com.provectus.kafka.ui.cluster.exception; + +import org.springframework.http.HttpStatus; + +public class ValidationException extends CustomBaseException { + public ValidationException(String message) { + super(message); + } + + @Override + public HttpStatus getResponseStatusCode() { + return HttpStatus.BAD_REQUEST; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java index 91cadf3c26..a9e48bd627 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java @@ -36,6 +36,7 @@ public interface ClusterMapper { TopicDetails toTopicDetails(InternalTopic topic); TopicConfig toTopicConfig(InternalTopicConfig topic); Replica toReplica(InternalReplica replica); + Connect toKafkaConnect(KafkaConnectCluster connect); @Mapping(target = "isCompatible", source = "compatible") CompatibilityCheckResponse toCompatibilityCheckResponse(InternalCompatibilityCheck dto); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/KafkaConnectMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/KafkaConnectMapper.java new file mode 100644 index 0000000000..4010e102dd --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/KafkaConnectMapper.java @@ -0,0 +1,24 @@ +package com.provectus.kafka.ui.cluster.mapper; + +import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector; +import com.provectus.kafka.ui.connect.model.ConnectorTask; +import com.provectus.kafka.ui.connect.model.NewConnector; +import com.provectus.kafka.ui.model.*; +import org.mapstruct.Mapper; + +@Mapper(componentModel = "spring") +public interface KafkaConnectMapper { + NewConnector toClient(com.provectus.kafka.ui.model.NewConnector newConnector); + + Connector fromClient(com.provectus.kafka.ui.connect.model.Connector connector); + + ConnectorStatus fromClient(ConnectorStatusConnector connectorStatus); + + Task fromClient(ConnectorTask connectorTask); + + TaskStatus fromClient(com.provectus.kafka.ui.connect.model.TaskStatus taskStatus); + + ConnectorPlugin fromClient(com.provectus.kafka.ui.connect.model.ConnectorPlugin connectorPlugin); + + ConnectorPluginConfigValidationResponse fromClient(com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse); +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java index a4a833fddf..cf6e109627 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java @@ -1,6 +1,8 @@ package com.provectus.kafka.ui.cluster.model; import com.provectus.kafka.ui.model.ServerStatus; + +import java.util.List; import java.util.Properties; import lombok.Builder; import lombok.Data; @@ -16,6 +18,7 @@ public class KafkaCluster { private final String bootstrapServers; private final String zookeeper; private final String schemaRegistry; + private final List kafkaConnect; private final String schemaNameTemplate; private final ServerStatus status; private final ServerStatus zookeeperStatus; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaConnectCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaConnectCluster.java new file mode 100644 index 0000000000..352a8369ac --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaConnectCluster.java @@ -0,0 +1,11 @@ +package com.provectus.kafka.ui.cluster.model; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder(toBuilder = true) +public class KafkaConnectCluster { + private final String name; + private final String address; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/KafkaConnectService.java new file mode 100644 index 0000000000..9362103ffa --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/KafkaConnectService.java @@ -0,0 +1,183 @@ +package com.provectus.kafka.ui.cluster.service; + +import com.provectus.kafka.ui.cluster.client.KafkaConnectClients; +import com.provectus.kafka.ui.cluster.exception.NotFoundException; +import com.provectus.kafka.ui.cluster.mapper.ClusterMapper; +import com.provectus.kafka.ui.cluster.mapper.KafkaConnectMapper; +import com.provectus.kafka.ui.cluster.model.ClustersStorage; +import com.provectus.kafka.ui.cluster.model.KafkaCluster; +import com.provectus.kafka.ui.cluster.model.KafkaConnectCluster; +import com.provectus.kafka.ui.model.*; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Service +@Log4j2 +@RequiredArgsConstructor +public class KafkaConnectService { + private final ClustersStorage clustersStorage; + private final ClusterMapper clusterMapper; + private final KafkaConnectMapper kafkaConnectMapper; + + public Mono> getConnects(String clusterName) { + return Mono.just( + Flux.fromIterable(clustersStorage.getClusterByName(clusterName) + .map(KafkaCluster::getKafkaConnect).stream() + .flatMap(Collection::stream) + .map(clusterMapper::toKafkaConnect) + .collect(Collectors.toList()) + ) + ); + } + + + public Flux getConnectors(String clusterName, String connectName) { + return getConnectAddress(clusterName, connectName) + .flatMapMany(connect -> + KafkaConnectClients.withBaseUrl(connect).getConnectors() + .doOnError(log::error) + ); + } + + public Mono createConnector(String clusterName, String connectName, Mono connector) { + return getConnectAddress(clusterName, connectName) + .flatMap(connect -> + connector + .map(kafkaConnectMapper::toClient) + .flatMap(c -> + KafkaConnectClients.withBaseUrl(connect).createConnector(c) + ) + .flatMap(c -> getConnector(clusterName, connectName, c.getName())) + ); + } + + public Mono getConnector(String clusterName, String connectName, String connectorName) { + return getConnectAddress(clusterName, connectName) + .flatMap(connect -> + KafkaConnectClients.withBaseUrl(connect).getConnector(connectorName) + .map(kafkaConnectMapper::fromClient) + .flatMap(connector -> + KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName()) + .map(connectorStatus -> { + var status = connectorStatus.getConnector(); + connector.status(kafkaConnectMapper.fromClient(status)); + return (Connector) new Connector() + .status(kafkaConnectMapper.fromClient(status)) + .type(connector.getType()) + .tasks(connector.getTasks()) + .name(connector.getName()) + .config(connector.getConfig()); + }) + ) + ); + } + + public Mono> getConnectorConfig(String clusterName, String connectName, String connectorName) { + return getConnectAddress(clusterName, connectName) + .flatMap(connect -> + KafkaConnectClients.withBaseUrl(connect).getConnectorConfig(connectorName) + ); + } + + public Mono setConnectorConfig(String clusterName, String connectName, String connectorName, Mono requestBody) { + return getConnectAddress(clusterName, connectName) + .flatMap(connect -> + requestBody.flatMap(body -> + KafkaConnectClients.withBaseUrl(connect).setConnectorConfig(connectorName, (Map) body) + ) + .map(kafkaConnectMapper::fromClient) + ); + } + + public Mono deleteConnector(String clusterName, String connectName, String connectorName) { + return getConnectAddress(clusterName, connectName) + .flatMap(connect -> + KafkaConnectClients.withBaseUrl(connect).deleteConnector(connectorName) + ); + } + + public Mono updateConnectorState(String clusterName, String connectName, String connectorName, ConnectorAction action) { + Function> kafkaClientCall; + switch (action) { + case RESTART: + kafkaClientCall = connect -> KafkaConnectClients.withBaseUrl(connect).restartConnector(connectorName); + break; + case PAUSE: + kafkaClientCall = connect -> KafkaConnectClients.withBaseUrl(connect).pauseConnector(connectorName); + break; + case RESUME: + kafkaClientCall = connect -> KafkaConnectClients.withBaseUrl(connect).resumeConnector(connectorName); + break; + default: + throw new IllegalStateException("Unexpected value: " + action); + } + return getConnectAddress(clusterName, connectName) + .flatMap(kafkaClientCall); + } + + public Flux getConnectorTasks(String clusterName, String connectName, String connectorName) { + return getConnectAddress(clusterName, connectName) + .flatMapMany(connect -> + KafkaConnectClients.withBaseUrl(connect).getConnectorTasks(connectorName) + .map(kafkaConnectMapper::fromClient) + .flatMap(task -> + KafkaConnectClients.withBaseUrl(connect).getConnectorTaskStatus(connectorName, task.getId().getTask()) + .map(kafkaConnectMapper::fromClient) + .map(task::status) + ) + ); + } + + public Mono restartConnectorTask(String clusterName, String connectName, String connectorName, Integer taskId) { + return getConnectAddress(clusterName, connectName) + .flatMap(connect -> + KafkaConnectClients.withBaseUrl(connect).restartConnectorTask(connectorName, taskId) + ); + } + + public Mono> getConnectorPlugins(String clusterName, String connectName) { + return Mono.just(getConnectAddress(clusterName, connectName) + .flatMapMany(connect -> + KafkaConnectClients.withBaseUrl(connect).getConnectorPlugins() + .map(kafkaConnectMapper::fromClient) + )); + } + + public Mono validateConnectorPluginConfig(String clusterName, String connectName, String pluginName, Mono requestBody) { + return getConnectAddress(clusterName, connectName) + .flatMap(connect -> + requestBody.flatMap(body -> + KafkaConnectClients.withBaseUrl(connect).validateConnectorPluginConfig(pluginName, (Map) body) + ) + .map(kafkaConnectMapper::fromClient) + ); + } + + private Mono getCluster(String clusterName) { + return clustersStorage.getClusterByName(clusterName) + .map(Mono::just) + .orElse(Mono.error(new NotFoundException("No such cluster"))); + } + + private Mono getConnectAddress(String clusterName, String connectName) { + return getCluster(clusterName) + .map(kafkaCluster -> + kafkaCluster.getKafkaConnect().stream() + .filter(connect -> connect.getName().equals(connectName)) + .findFirst() + .map(KafkaConnectCluster::getAddress) + ) + .flatMap(connect -> connect + .map(Mono::just) + .orElse(Mono.error(new NotFoundException("No such connect cluster"))) + ); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/KafkaConnectRestController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/KafkaConnectRestController.java new file mode 100644 index 0000000000..5024e4aeae --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/KafkaConnectRestController.java @@ -0,0 +1,92 @@ +package com.provectus.kafka.ui.rest; + +import com.provectus.kafka.ui.api.ApiClustersConnectsApi; +import com.provectus.kafka.ui.cluster.service.KafkaConnectService; +import com.provectus.kafka.ui.model.*; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.validation.Valid; +import java.util.Map; + +@RestController +@RequiredArgsConstructor +@Log4j2 +public class KafkaConnectRestController implements ApiClustersConnectsApi { + private final KafkaConnectService kafkaConnectService; + + @Override + public Mono>> getConnects(String clusterName, ServerWebExchange exchange) { + return kafkaConnectService.getConnects(clusterName).map(ResponseEntity::ok); + } + + @Override + public Mono>> getConnectors(String clusterName, String connectName, ServerWebExchange exchange) { + Flux connectors = kafkaConnectService.getConnectors(clusterName, connectName); + return Mono.just(ResponseEntity.ok(connectors)); + } + + @Override + public Mono> createConnector(String clusterName, String connectName, @Valid Mono connector, ServerWebExchange exchange) { + return kafkaConnectService.createConnector(clusterName, connectName, connector) + .map(ResponseEntity::ok); + } + + @Override + public Mono> getConnector(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) { + return kafkaConnectService.getConnector(clusterName, connectName, connectorName) + .map(ResponseEntity::ok); + } + + @Override + public Mono> deleteConnector(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) { + return kafkaConnectService.deleteConnector(clusterName, connectName, connectorName) + .map(ResponseEntity::ok); + } + + @Override + public Mono>> getConnectorConfig(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) { + return kafkaConnectService.getConnectorConfig(clusterName, connectName, connectorName) + .map(ResponseEntity::ok); + } + + @Override + public Mono> setConnectorConfig(String clusterName, String connectName, String connectorName, @Valid Mono requestBody, ServerWebExchange exchange) { + return kafkaConnectService.setConnectorConfig(clusterName, connectName, connectorName, requestBody) + .map(ResponseEntity::ok); + } + + @Override + public Mono> updateConnectorState(String clusterName, String connectName, String connectorName, ConnectorAction action, ServerWebExchange exchange) { + return kafkaConnectService.updateConnectorState(clusterName, connectName, connectorName, action) + .map(ResponseEntity::ok); + } + + @Override + public Mono>> getConnectorTasks(String clusterName, String connectName, String connectorName, ServerWebExchange exchange) { + return Mono.just(ResponseEntity.ok(kafkaConnectService.getConnectorTasks(clusterName, connectName, connectorName))); + } + + @Override + public Mono> restartConnectorTask(String clusterName, String connectName, String connectorName, Integer taskId, ServerWebExchange exchange) { + return kafkaConnectService.restartConnectorTask(clusterName, connectName, connectorName, taskId) + .map(ResponseEntity::ok); + } + + @Override + public Mono>> getConnectorPlugins(String clusterName, String connectName, ServerWebExchange exchange) { + return kafkaConnectService.getConnectorPlugins(clusterName, connectName) + .map(ResponseEntity::ok); + } + + @Override + public Mono> validateConnectorPluginConfig(String clusterName, String connectName, String pluginName, @Valid Mono requestBody, ServerWebExchange exchange) { + return kafkaConnectService.validateConnectorPluginConfig(clusterName, connectName, pluginName, requestBody) + .map(ResponseEntity::ok); + } +} diff --git a/kafka-ui-api/src/main/resources/application-local.yml b/kafka-ui-api/src/main/resources/application-local.yml index c90b425b37..8a72bdf4f9 100644 --- a/kafka-ui-api/src/main/resources/application-local.yml +++ b/kafka-ui-api/src/main/resources/application-local.yml @@ -5,13 +5,18 @@ kafka: bootstrapServers: localhost:9093 zookeeper: localhost:2181 schemaRegistry: http://localhost:8081 -# schemaNameTemplate: "%s-value" + kafkaConnect: + - name: first + address: http://localhost:8083 jmxPort: 9997 - name: secondLocal bootstrapServers: localhost:9093 zookeeper: localhost:2182 schemaRegistry: http://localhost:8081 + kafkaConnect: + - name: first + address: http://localhost:8083 jmxPort: 9998 admin-client-timeout: 5000 zookeeper: diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java index 576dd53a44..3552975bc3 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractBaseTest.java @@ -1,35 +1,60 @@ package com.provectus.kafka.ui; -import org.junit.runner.RunWith; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ApplicationListener; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.context.junit.jupiter.SpringExtension; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; -@RunWith(SpringRunner.class) +import java.time.Duration; + +@ExtendWith(SpringExtension.class) @SpringBootTest @ActiveProfiles("test") -@Testcontainers public abstract class AbstractBaseTest { - @Container - public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1")) - .withNetwork(Network.SHARED); - @Container - public static SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer("5.2.1") - .withKafka(kafka) - .dependsOn(kafka); public static class Initializer implements ApplicationContextInitializer { + public final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1")) + .withNetwork(Network.SHARED); + + public final SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer("5.2.1") + .withKafka(kafka) + .dependsOn(kafka); + + public final KafkaConnectContainer kafkaConnect = new KafkaConnectContainer("5.2.1") + .withKafka(kafka) + .waitingFor( + Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1) + ) + .dependsOn(kafka) + .dependsOn(schemaRegistry) + .withStartupTimeout(Duration.ofMinutes(15)); + @Override - public void initialize(ConfigurableApplicationContext context) { + public void initialize(@NotNull ConfigurableApplicationContext context) { + kafka.start(); + schemaRegistry.start(); + kafkaConnect.start(); + System.setProperty("kafka.clusters.0.name", "local"); + System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers()); System.setProperty("kafka.clusters.0.schemaRegistry", schemaRegistry.getTarget()); + System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect"); + System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget()); + + context.addApplicationListener((ApplicationListener) event -> { + kafkaConnect.close(); + schemaRegistry.close(); + kafka.close(); + }); } } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java new file mode 100644 index 0000000000..7926ca7c7e --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectContainer.java @@ -0,0 +1,42 @@ +package com.provectus.kafka.ui; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; + +public class KafkaConnectContainer extends GenericContainer { + private static final int CONNECT_PORT = 8083; + + public KafkaConnectContainer(String version) { + super("confluentinc/cp-kafka-connect:" + version); + } + + + public KafkaConnectContainer withKafka(KafkaContainer kafka) { + String bootstrapServers = kafka.getNetworkAliases().get(0) + ":9092"; + return withKafka(kafka.getNetwork(), bootstrapServers); + } + + public KafkaConnectContainer withKafka(Network network, String bootstrapServers) { + withNetwork(network); + withEnv("CONNECT_BOOTSTRAP_SERVERS", "PLAINTEXT://" + bootstrapServers); + withEnv("CONNECT_GROUP_ID", "connect-group"); + withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "_connect_configs"); + withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1"); + withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "_connect_offset"); + withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1"); + withEnv("CONNECT_STATUS_STORAGE_TOPIC", "_connect_status"); + withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1"); + withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter"); + withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.storage.StringConverter"); + withEnv("CONNECT_INTERNAL_KEY_CONVERTER", "org.apache.kafka.connect.json.JsonConverter"); + withEnv("CONNECT_INTERNAL_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter"); + withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "kafka-connect"); + withEnv("CONNECT_PLUGIN_PATH", "/usr/share/java,/usr/share/confluent-hub-components"); + return self(); + } + + public String getTarget() { + return "http://" + getContainerIpAddress() + ":" + getMappedPort(CONNECT_PORT); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java new file mode 100644 index 0000000000..0f4ae978d7 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConnectServiceTests.java @@ -0,0 +1,304 @@ +package com.provectus.kafka.ui; + +import com.provectus.kafka.ui.model.*; +import lombok.extern.log4j.Log4j2; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.web.reactive.server.WebTestClient; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static java.util.function.Predicate.not; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@ContextConfiguration(initializers = {AbstractBaseTest.Initializer.class}) +@Log4j2 +@AutoConfigureWebTestClient(timeout = "60000") +public class KafkaConnectServiceTests extends AbstractBaseTest { + private final String clusterName = "local"; + private final String connectName = "kafka-connect"; + private final String connectorName = UUID.randomUUID().toString(); + private final Map config = Map.of( + "name", connectorName, + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "output-topic", + "file", "/tmp/test" + ); + + @Autowired + private WebTestClient webTestClient; + + + @BeforeEach + public void setUp() { + webTestClient.post() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName) + .bodyValue(new NewConnector() + .name(connectorName) + .config(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "output-topic", + "file", "/tmp/test" + )) + ) + .exchange() + .expectStatus().isOk(); + } + + @AfterEach + public void tearDown() { + webTestClient.delete() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", clusterName, connectName, connectorName) + .exchange() + .expectStatus().isOk(); + } + + @Test + public void shouldListConnectors() { + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName) + .exchange() + .expectStatus().isOk() + .expectBody() + .jsonPath(String.format("$[?(@ == '%s')]", connectorName)) + .exists(); + } + + @Test + public void shouldReturnNotFoundForNonExistingCluster() { + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", "nonExistingCluster", connectName) + .exchange() + .expectStatus().isNotFound(); + } + + @Test + public void shouldReturnNotFoundForNonExistingConnectName() { + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, "nonExistingConnect") + .exchange() + .expectStatus().isNotFound(); + } + + @Test + public void shouldRetrieveConnector() { + Connector expected = (Connector) new Connector() + .status(new ConnectorStatus() + .state(ConnectorStatus.StateEnum.RUNNING) + .workerId("kafka-connect:8083")) + .tasks(List.of(new TaskId() + .connector(connectorName) + .task(0))) + .type(Connector.TypeEnum.SINK) + .name(connectorName) + .config(config); + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", clusterName, connectName, connectorName) + .exchange() + .expectStatus().isOk() + .expectBody(Connector.class) + .value(connector -> assertEquals(expected, connector)); + } + + @Test + public void shouldUpdateConfig() { + webTestClient.put() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName) + .bodyValue(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "another-topic", + "file", "/tmp/new" + ) + ) + .exchange() + .expectStatus().isOk(); + + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName) + .exchange() + .expectStatus().isOk() + .expectBody(new ParameterizedTypeReference>() { + }) + .isEqualTo(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "another-topic", + "file", "/tmp/new", + "name", connectorName + )); + } + + @Test + public void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() { + var connectorName = UUID.randomUUID().toString(); + webTestClient.post() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName) + .bodyValue(Map.of( + "name", connectorName, + "config", Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "invalid number", + "topics", "another-topic", + "file", "/tmp/test" + )) + ) + .exchange() + .expectStatus().isBadRequest(); + + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName) + .exchange() + .expectStatus().isOk() + .expectBody() + .jsonPath(String.format("$[?(@ == '%s')]", connectorName)) + .doesNotExist(); + } + + @Test + public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() { + var connectorName = UUID.randomUUID().toString(); + webTestClient.post() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName) + .bodyValue(Map.of( + "name", connectorName, + "config", Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector" + )) + ) + .exchange() + .expectStatus().isBadRequest(); + + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors", clusterName, connectName) + .exchange() + .expectStatus().isOk() + .expectBody() + .jsonPath(String.format("$[?(@ == '%s')]", connectorName)) + .doesNotExist(); + } + + + @Test + public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() { + webTestClient.put() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName) + .bodyValue(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "invalid number", + "topics", "another-topic", + "file", "/tmp/test" + ) + ) + .exchange() + .expectStatus().isBadRequest(); + + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName) + .exchange() + .expectStatus().isOk() + .expectBody(new ParameterizedTypeReference>() { + }) + .isEqualTo(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "output-topic", + "file", "/tmp/test", + "name", connectorName + )); + } + + @Test + public void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() { + webTestClient.put() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName) + .bodyValue(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector" + ) + ) + .exchange() + .expectStatus().isBadRequest(); + + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", clusterName, connectName, connectorName) + .exchange() + .expectStatus().isOk() + .expectBody(new ParameterizedTypeReference>() { + }) + .isEqualTo(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "output-topic", + "file", "/tmp/test", + "name", connectorName + )); + } + + @Test + public void shouldRetrieveConnectorPlugins() { + webTestClient.get() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/plugins", clusterName, connectName) + .exchange() + .expectStatus().isOk() + .expectBodyList(ConnectorPlugin.class) + .value(plugins -> assertEquals(13, plugins.size())); + } + + @Test + public void shouldSuccessfullyValidateConnectorPluginConfiguration() { + var pluginName = "FileStreamSinkConnector"; + webTestClient.put() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", clusterName, connectName, pluginName) + .bodyValue(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "1", + "topics", "output-topic", + "file", "/tmp/test", + "name", connectorName + ) + ) + .exchange() + .expectStatus().isOk() + .expectBody(ConnectorPluginConfigValidationResponse.class) + .value(response -> assertEquals(0, response.getErrorCount())); + } + + @Test + public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() { + var pluginName = "FileStreamSinkConnector"; + webTestClient.put() + .uri("http://localhost:8080/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate", clusterName, connectName, pluginName) + .bodyValue(Map.of( + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "0", + "topics", "output-topic", + "file", "/tmp/test", + "name", connectorName + ) + ) + .exchange() + .expectStatus().isOk() + .expectBody(ConnectorPluginConfigValidationResponse.class) + .value(response -> { + assertEquals(1, response.getErrorCount()); + var error = response.getConfigs().stream() + .map(ConnectorPluginConfig::getValue) + .map(ConnectorPluginConfigValue::getErrors) + .filter(not(List::isEmpty)) + .findFirst().get(); + assertEquals( + "Invalid value 0 for configuration tasks.max: Value must be at least 1", + error.get(0) + ); + }); + } +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java index 9b459683bf..65005671d1 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/SchemaRegistryServiceTests.java @@ -41,7 +41,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest { } @Test - void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() { + public void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() { String unknownSchema = "unknown-schema"; webTestClient .get() @@ -51,7 +51,7 @@ class SchemaRegistryServiceTests extends AbstractBaseTest { } @Test - void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() { + public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() { webTestClient .get() .uri("http://localhost:8080/api/clusters/local/schemas/compatibility") diff --git a/kafka-ui-contract/pom.xml b/kafka-ui-contract/pom.xml index 1a26d0863e..9ddc5dadb3 100644 --- a/kafka-ui-contract/pom.xml +++ b/kafka-ui-contract/pom.xml @@ -33,6 +33,12 @@ jackson-databind-nullable ${jackson-databind-nullable.version} + + com.google.code.findbugs + jsr305 + 3.0.2 + provided + @@ -91,12 +97,50 @@ + + generate-connect-client + + generate + + + ${project.basedir}/src/main/resources/swagger/kafka-connect-api.yaml + + ${project.build.directory}/generated-sources/kafka-connect-client + java + false + false + + + com.provectus.kafka.ui.connect.model + com.provectus.kafka.ui.connect.api + kafka-connect-client + + true + webclient + + true + java8 + + + + + org.apache.maven.plugins + maven-clean-plugin + ${maven-clean-plugin.version} + + + + ${basedir}/${frontend-generated-sources-directory} + + + + org.apache.maven.plugins maven-resources-plugin - 3.1.0 + ${maven-resources-plugin.version} copy-resource-one @@ -106,7 +150,7 @@ - ${basedir}/..//kafka-ui-react-app/src/generated-sources + ${basedir}/${frontend-generated-sources-directory} ${project.build.directory}/generated-sources/frontend/ diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml new file mode 100644 index 0000000000..0581c5f3bc --- /dev/null +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml @@ -0,0 +1,501 @@ +openapi: 3.0.0 +info: + description: Api Documentation + version: 0.1.0 + title: Api Documentation + termsOfService: urn:tos + contact: {} + license: + name: Apache 2.0 + url: http://www.apache.org/licenses/LICENSE-2.0 +tags: + - name: /connect +servers: + - url: /localhost + +paths: + /connectors: + get: + tags: + - /connect + summary: get all connectors from Kafka Connect service + operationId: getConnectors + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + type: string + post: + tags: + - /connect + summary: create new connector + operationId: createConnector + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/NewConnector' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Connector' + 400: + description: Bad request + 409: + description: rebalance is in progress + 500: + description: Internal server error + + /connectors/{connectorName}: + get: + tags: + - /connect + summary: get information about the connector + operationId: getConnector + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Connector' + delete: + tags: + - /connect + summary: delete connector + operationId: deleteConnector + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + 409: + description: rebalance is in progress + + /connectors/{connectorName}/config: + get: + tags: + - /connect + summary: get connector configuration + operationId: getConnectorConfig + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorConfig' + put: + tags: + - /connect + summary: update or create connector with provided config + operationId: setConnectorConfig + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorConfig' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Connector' + 400: + description: Bad request + 409: + description: rebalance is in progress + 500: + description: Internal server error + + + /connectors/{connectorName}/status: + get: + tags: + - /connect + summary: get connector status + operationId: getConnectorStatus + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorStatus' + + /connectors/{connectorName}/restart: + post: + tags: + - /connect + summary: restart the connector + operationId: restartConnector + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + 409: + description: rebalance is in progress + + /connectors/{connectorName}/pause: + put: + tags: + - /connect + summary: pause the connector + operationId: pauseConnector + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 202: + description: Accepted + + /connectors/{connectorName}/resume: + put: + tags: + - /connect + summary: resume the connector + operationId: resumeConnector + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 202: + description: Accepted + + /connectors/{connectorName}/tasks: + get: + tags: + - /connect + summary: get connector tasks + operationId: getConnectorTasks + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ConnectorTask' + + /connectors/{connectorName}/tasks/{taskId}/status: + get: + tags: + - /connect + summary: get connector task status + operationId: getConnectorTaskStatus + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + - name: taskId + in: path + required: true + schema: + type: integer + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/TaskStatus' + + /connectors/{connectorName}/tasks/{taskId}/restart: + post: + tags: + - /connect + summary: restart connector task + operationId: restartConnectorTask + parameters: + - name: connectorName + in: path + required: true + schema: + type: string + - name: taskId + in: path + required: true + schema: + type: integer + responses: + 200: + description: OK + + /connector-plugins: + get: + tags: + - /connect + summary: get connector plugins + operationId: getConnectorPlugins + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ConnectorPlugin' + + /connector-plugins/{pluginName}/config/validate: + put: + tags: + - /connect + summary: validate connector plugin configuration + operationId: validateConnectorPluginConfig + parameters: + - name: pluginName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorConfig' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorPluginConfigValidationResponse' + +components: + schemas: + ConnectorConfig: + type: object + additionalProperties: + type: object + + Task: + type: object + properties: + connector: + type: string + task: + type: integer + + ConnectorTask: + type: object + properties: + id: + $ref: '#/components/schemas/Task' + config: + $ref: '#/components/schemas/ConnectorConfig' + + NewConnector: + type: object + properties: + name: + type: string + config: + $ref: '#/components/schemas/ConnectorConfig' + required: + - name + - config + + Connector: + allOf: + - $ref: '#/components/schemas/NewConnector' + - type: object + properties: + tasks: + type: array + items: + $ref: '#/components/schemas/Task' + type: + type: string + enum: + - source + - sink + + TaskStatus: + type: object + properties: + id: + type: integer + state: + type: string + enum: + - RUNNING + - FAILED + - PAUSED + - UNASSIGNED + worker_id: + type: string + + ConnectorStatus: + type: object + properties: + name: + type: string + connector: + type: object + properties: + state: + type: string + enum: + - RUNNING + - FAILED + - PAUSED + - UNASSIGNED + worker_id: + type: string + tasks: + type: array + items: + $ref: '#/components/schemas/TaskStatus' + + ConnectorPlugin: + type: object + properties: + class: + type: string + + ConnectorPluginConfigDefinition: + type: object + properties: + name: + type: string + type: + type: string + enum: + - BOOLEAN + - CLASS + - DOUBLE + - INT + - LIST + - LONG + - PASSWORD + - SHORT + - STRING + required: + type: boolean + default_value: + type: string + importance: + type: string + enum: + - LOW + - MEDIUM + - HIGH + documentation: + type: string + group: + type: string + width: + type: string + enum: + - SHORT + - MEDIUM + - LONG + - NONE + display_name: + type: string + dependents: + type: array + items: + type: string + order: + type: integer + + ConnectorPluginConfigValue: + type: object + properties: + name: + type: string + value: + type: string + recommended_values: + type: array + items: + type: string + errors: + type: array + items: + type: string + visible: + type: boolean + + ConnectorPluginConfig: + type: object + properties: + definition: + $ref: '#/components/schemas/ConnectorPluginConfigDefinition' + value: + $ref: '#/components/schemas/ConnectorPluginConfigValue' + + ConnectorPluginConfigValidationResponse: + type: object + properties: + name: + type: string + error_count: + type: integer + groups: + type: array + items: + type: string + configs: + type: array + items: + $ref: '#/components/schemas/ConnectorPluginConfig' + diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 51139ebd13..1a24dae624 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -10,6 +10,7 @@ info: url: http://www.apache.org/licenses/LICENSE-2.0 tags: - name: /api/clusters + - name: /api/clusters/connects servers: - url: /localhost @@ -641,6 +642,364 @@ paths: 404: description: Not Found + /api/clusters/{clusterName}/connects: + get: + tags: + - /api/clusters/connects + summary: get all kafka connect instances + operationId: getConnects + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Connect' + + /api/clusters/{clusterName}/connects/{connectName}/connectors: + get: + tags: + - /api/clusters/connects + summary: get all connectors from Kafka Connect service + operationId: getConnectors + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + type: string + post: + tags: + - /api/clusters/connects + summary: create new connector + operationId: createConnector + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/NewConnector' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Connector' + 409: + description: rebalance is in progress + + /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}: + get: + tags: + - /api/clusters/connects + summary: get information about the connector + operationId: getConnector + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Connector' + delete: + tags: + - /api/clusters/connects + summary: delete connector + operationId: deleteConnector + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + 409: + description: rebalance is in progress + + /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/action/{action}: + post: + tags: + - /api/clusters/connects + summary: update connector state (restart, pause or resume) + operationId: updateConnectorState + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + - name: connectorName + in: path + required: true + schema: + type: string + - name: action + in: path + required: true + schema: + $ref: '#/components/schemas/ConnectorAction' + responses: + 200: + description: OK + 409: + description: rebalance is in progress + + /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config: + get: + tags: + - /api/clusters/connects + summary: get connector configuration + operationId: getConnectorConfig + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorConfig' + put: + tags: + - /api/clusters/connects + summary: update or create connector with provided config + operationId: setConnectorConfig + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + - name: connectorName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorConfig' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/Connector' + 409: + description: rebalance is in progress + + /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks: + get: + tags: + - /api/clusters/connects + summary: get connector tasks + operationId: getConnectorTasks + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + - name: connectorName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Task' + + /api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/tasks/{taskId}/action/restart: + post: + tags: + - /api/clusters/connects + summary: restart connector task + operationId: restartConnectorTask + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + - name: connectorName + in: path + required: true + schema: + type: string + - name: taskId + in: path + required: true + schema: + type: integer + responses: + 200: + description: OK + + /api/clusters/{clusterName}/connects/{connectName}/plugins: + get: + tags: + - /api/clusters/connects + summary: get connector plugins + operationId: getConnectorPlugins + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ConnectorPlugin' + + /api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate: + put: + tags: + - /api/clusters/connects + summary: validate connector plugin configuration + operationId: validateConnectorPluginConfig + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: connectName + in: path + required: true + schema: + type: string + - name: pluginName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorConfig' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorPluginConfigValidationResponse' + components: schemas: Cluster: @@ -1020,3 +1379,200 @@ components: type: boolean required: - isCompatible + + Connect: + type: object + properties: + name: + type: string + address: + type: string + + ConnectorConfig: + type: object + additionalProperties: + type: object + + TaskId: + type: object + properties: + connector: + type: string + task: + type: integer + + Task: + type: object + properties: + id: + $ref: '#/components/schemas/TaskId' + status: + $ref: '#/components/schemas/TaskStatus' + config: + $ref: '#/components/schemas/ConnectorConfig' + + NewConnector: + type: object + properties: + name: + type: string + config: + $ref: '#/components/schemas/ConnectorConfig' + required: + - name + - config + + Connector: + allOf: + - $ref: '#/components/schemas/NewConnector' + - type: object + properties: + tasks: + type: array + items: + $ref: '#/components/schemas/TaskId' + type: + type: string + enum: + - source + - sink + status: + $ref: '#/components/schemas/ConnectorStatus' + + + TaskStatus: + type: object + properties: + id: + type: integer + state: + type: string + enum: + - RUNNING + - FAILED + - PAUSED + - UNASSIGNED + worker_id: + type: string + trace: + type: string + + ConnectorStatus: + type: object + properties: + state: + type: string + enum: + - RUNNING + - FAILED + - PAUSED + - UNASSIGNED + worker_id: + type: string + + ConnectorAction: + type: string + enum: + - restart + - pause + - resume + + TaskAction: + type: string + enum: + - restart + + ConnectorPlugin: + type: object + properties: + class: + type: string + + ConnectorPluginConfigDefinition: + type: object + properties: + name: + type: string + type: + type: string + enum: + - BOOLEAN + - CLASS + - DOUBLE + - INT + - LIST + - LONG + - PASSWORD + - SHORT + - STRING + required: + type: boolean + default_value: + type: string + importance: + type: string + enum: + - LOW + - MEDIUM + - HIGH + documentation: + type: string + group: + type: string + width: + type: string + enum: + - SHORT + - MEDIUM + - LONG + - NONE + display_name: + type: string + dependents: + type: array + items: + type: string + order: + type: integer + + ConnectorPluginConfigValue: + type: object + properties: + name: + type: string + value: + type: string + recommended_values: + type: array + items: + type: string + errors: + type: array + items: + type: string + visible: + type: boolean + + ConnectorPluginConfig: + type: object + properties: + definition: + $ref: '#/components/schemas/ConnectorPluginConfigDefinition' + value: + $ref: '#/components/schemas/ConnectorPluginConfigValue' + + ConnectorPluginConfigValidationResponse: + type: object + properties: + name: + type: string + error_count: + type: integer + groups: + type: array + items: + type: string + configs: + type: array + items: + $ref: '#/components/schemas/ConnectorPluginConfig' diff --git a/pom.xml b/pom.xml index 45a00f5aa3..62106bc7a2 100644 --- a/pom.xml +++ b/pom.xml @@ -24,6 +24,9 @@ 1.4.10 1.8.0 3.5.1 + 3.1.0 + 3.1.0 + 2.22.0 4.3.0 1.6.0 1.2.32 @@ -33,6 +36,8 @@ 2.2 1.15.1 5.4.0 + + ..//kafka-ui-react-app/src/generated-sources