diff --git a/README.md b/README.md index d2a35ebc31..33795ed2bc 100644 --- a/README.md +++ b/README.md @@ -170,6 +170,8 @@ For example, if you want to use an environment variable to set the `name` parame |`KAFKA_CLUSTERS_0_DISABLELOGDIRSCOLLECTION` |Disable collecting segments information. It should be true for confluent cloud. Default: false |`KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME` |Given name for the Kafka Connect cluster |`KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS` |Address of the Kafka Connect service endpoint +|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_USERNAME`| Kafka Connect cluster's basic authentication username +|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_PASSWORD`| Kafka Connect cluster's basic authentication password |`KAFKA_CLUSTERS_0_JMXSSL` |Enable SSL for JMX? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml` |`KAFKA_CLUSTERS_0_JMXUSERNAME` |Username for JMX authentication |`KAFKA_CLUSTERS_0_JMXPASSWORD` |Password for JMX authentication diff --git a/documentation/compose/jaas/kafka_connect.jaas b/documentation/compose/jaas/kafka_connect.jaas new file mode 100644 index 0000000000..98b4f06282 --- /dev/null +++ b/documentation/compose/jaas/kafka_connect.jaas @@ -0,0 +1,4 @@ +KafkaConnect { + org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required + file="/conf/kafka_connect.password"; +}; diff --git a/documentation/compose/jaas/kafka_connect.password b/documentation/compose/jaas/kafka_connect.password new file mode 100644 index 0000000000..aac9c81470 --- /dev/null +++ b/documentation/compose/jaas/kafka_connect.password @@ -0,0 +1 @@ +admin: admin-secret diff --git a/documentation/compose/kafka-ui-connectors-auth.yaml b/documentation/compose/kafka-ui-connectors-auth.yaml new file mode 100644 index 0000000000..a736791120 --- /dev/null +++ b/documentation/compose/kafka-ui-connectors-auth.yaml @@ -0,0 +1,150 @@ +--- +version: '2' +services: + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - zookeeper0 + - zookeeper1 + - kafka0 + - kafka1 + - schemaregistry0 + - kafka-connect0 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181 + KAFKA_CLUSTERS_0_JMXPORT: 9997 + KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085 + KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first + KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 + KAFKA_CLUSTERS_0_KAFKACONNECT_0_USERNAME: admin + KAFKA_CLUSTERS_0_KAFKACONNECT_0_PASSWORD: admin-secret + KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb:8088 + + zookeeper0: + image: confluentinc/cp-zookeeper:5.2.4 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 2181:2181 + + kafka0: + image: confluentinc/cp-kafka:5.3.1 + depends_on: + - zookeeper0 + ports: + - 9092:9092 + - 9997:9997 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + JMX_PORT: 9997 + KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 + + schemaregistry0: + image: confluentinc/cp-schema-registry:5.5.0 + ports: + - 8085:8085 + depends_on: + - zookeeper0 + - kafka0 + environment: + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 + SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181 + SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT + SCHEMA_REGISTRY_HOST_NAME: schemaregistry0 + SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085 + + SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" + SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO + SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas + + + kafka-connect0: + build: + context: ./kafka-connect + args: + image: confluentinc/cp-kafka-connect:6.0.1 + ports: + - 8083:8083 + depends_on: + - kafka0 + - schemaregistry0 + volumes: + - ./jaas:/conf + environment: + CONNECT_BOOTSTRAP_SERVERS: kafka0:29092 + CONNECT_GROUP_ID: compose-connect-group + CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: _connect_status + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 + CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 + CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" + CONNECT_REST_EXTENSION_CLASSES: "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension" + KAFKA_OPTS: "-Djava.security.auth.login.config=/conf/kafka_connect.jaas" + +# AWS_ACCESS_KEY_ID: "" +# AWS_SECRET_ACCESS_KEY: "" + + kafka-init-topics: + image: confluentinc/cp-kafka:5.3.1 + volumes: + - ./message.json:/data/message.json + depends_on: + - kafka1 + command: "bash -c 'echo Waiting for Kafka to be ready... && \ + cub kafka-ready -b kafka1:29092 1 30 && \ + kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \ + kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \ + kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \ + kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'" + + create-connectors: + image: ellerbrock/alpine-bash-curl-ssl + depends_on: + - postgres-db + - kafka-connect0 + volumes: + - ./connectors:/connectors + command: bash -c '/connectors/start.sh' + + ksqldb: + image: confluentinc/ksqldb-server:0.18.0 + depends_on: + - kafka0 + - kafka-connect0 + - schemaregistry0 + ports: + - 8088:8088 + environment: + KSQL_CUB_KAFKA_TIMEOUT: 120 + KSQL_LISTENERS: http://0.0.0.0:8088 + KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 + KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" + KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" + KSQL_KSQL_CONNECT_URL: http://kafka-connect0:8083 + KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 + KSQL_KSQL_SERVICE_ID: my_ksql_1 + KSQL_KSQL_HIDDEN_TOPICS: '^_.*' + KSQL_CACHE_MAX_BYTES_BUFFERING: 0 diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java index 2dd2ed585f..de0c9054ae 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KafkaConnectClients.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.client; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; +import com.provectus.kafka.ui.model.KafkaConnectCluster; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -12,7 +13,7 @@ public final class KafkaConnectClients { private static final Map CACHE = new ConcurrentHashMap<>(); - public static KafkaConnectClientApi withBaseUrl(String basePath) { - return CACHE.computeIfAbsent(basePath, RetryingKafkaConnectClient::new); + public static KafkaConnectClientApi withKafkaConnectConfig(KafkaConnectCluster config) { + return CACHE.computeIfAbsent(config.getAddress(), s -> new RetryingKafkaConnectClient(config)); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java index 9cdca46a3b..7071661373 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/RetryingKafkaConnectClient.java @@ -6,6 +6,7 @@ import com.provectus.kafka.ui.connect.model.Connector; import com.provectus.kafka.ui.connect.model.NewConnector; import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException; import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.KafkaConnectCluster; import java.time.Duration; import java.util.List; import java.util.Map; @@ -26,8 +27,8 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi { private static final int MAX_RETRIES = 5; private static final Duration RETRIES_DELAY = Duration.ofMillis(200); - public RetryingKafkaConnectClient(String basePath) { - super(new RetryingApiClient().setBasePath(basePath)); + public RetryingKafkaConnectClient(KafkaConnectCluster config) { + super(new RetryingApiClient(config)); } private static Retry conflictCodeRetry() { @@ -71,6 +72,14 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi { } private static class RetryingApiClient extends ApiClient { + + public RetryingApiClient(KafkaConnectCluster config) { + super(); + setBasePath(config.getAddress()); + setUsername(config.getUserName()); + setPassword(config.getPassword()); + } + @Override public Mono invokeAPI(String path, HttpMethod method, Map pathParams, MultiValueMap queryParams, Object body, diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index e101669666..0d83e143c5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -47,6 +47,8 @@ public class ClustersProperties { public static class ConnectCluster { String name; String address; + String userName; + String password; } @Data diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaConnectCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaConnectCluster.java index 1e858f0be3..6131f3fa9e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaConnectCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaConnectCluster.java @@ -11,4 +11,6 @@ import lombok.Data; public class KafkaConnectCluster { private final String name; private final String address; + private final String userName; + private final String password; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java index 393a219d8e..21680d3dd9 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaConnectService.java @@ -326,9 +326,8 @@ public class KafkaConnectService { private Mono withConnectClient(KafkaCluster cluster, String connectName) { return Mono.justOrEmpty(cluster.getKafkaConnect().stream() .filter(connect -> connect.getName().equals(connectName)) - .findFirst() - .map(KafkaConnectCluster::getAddress)) + .findFirst()) .switchIfEmpty(Mono.error(ConnectNotFoundException::new)) - .map(KafkaConnectClients::withBaseUrl); + .map(KafkaConnectClients::withKafkaConnectConfig); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java index 51386c3380..5de2b93baa 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/AbstractIntegrationTest.java @@ -64,6 +64,8 @@ public abstract class AbstractIntegrationTest { System.setProperty("kafka.clusters.0.schemaRegistry", String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s", SocketUtils.findAvailableTcpPort(), schemaRegistry.getUrl())); System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect"); + System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect"); + System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect"); System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget()); System.setProperty("kafka.clusters.1.name", SECOND_LOCAL); diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml index bd96bf0c46..7f19689755 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-connect-api.yaml @@ -362,6 +362,10 @@ paths: $ref: '#/components/schemas/ConnectorPluginConfigValidationResponse' components: + securitySchemes: + basicAuth: + type: http + scheme: basic schemas: ConnectorConfig: type: object @@ -555,3 +559,7 @@ components: items: type: string + +security: + - basicAuth: [] +