Kafka Connect: Implement basic auth (#1754)
* [TRIVIAL]: Enable basic auth for kafka connect cluster * Update README.md Co-authored-by: Naresh Kumar Reddy <naresh.kumar.reddy@dnb.no> Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
This commit is contained in:
parent
f2756295c4
commit
8d10eb69e8
11 changed files with 187 additions and 7 deletions
|
@ -170,6 +170,8 @@ For example, if you want to use an environment variable to set the `name` parame
|
||||||
|`KAFKA_CLUSTERS_0_DISABLELOGDIRSCOLLECTION` |Disable collecting segments information. It should be true for confluent cloud. Default: false
|
|`KAFKA_CLUSTERS_0_DISABLELOGDIRSCOLLECTION` |Disable collecting segments information. It should be true for confluent cloud. Default: false
|
||||||
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME` |Given name for the Kafka Connect cluster
|
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME` |Given name for the Kafka Connect cluster
|
||||||
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS` |Address of the Kafka Connect service endpoint
|
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS` |Address of the Kafka Connect service endpoint
|
||||||
|
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_USERNAME`| Kafka Connect cluster's basic authentication username
|
||||||
|
|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_PASSWORD`| Kafka Connect cluster's basic authentication password
|
||||||
|`KAFKA_CLUSTERS_0_JMXSSL` |Enable SSL for JMX? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
|
|`KAFKA_CLUSTERS_0_JMXSSL` |Enable SSL for JMX? `true` or `false`. For advanced setup, see `kafka-ui-jmx-secured.yml`
|
||||||
|`KAFKA_CLUSTERS_0_JMXUSERNAME` |Username for JMX authentication
|
|`KAFKA_CLUSTERS_0_JMXUSERNAME` |Username for JMX authentication
|
||||||
|`KAFKA_CLUSTERS_0_JMXPASSWORD` |Password for JMX authentication
|
|`KAFKA_CLUSTERS_0_JMXPASSWORD` |Password for JMX authentication
|
||||||
|
|
4
documentation/compose/jaas/kafka_connect.jaas
Normal file
4
documentation/compose/jaas/kafka_connect.jaas
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
KafkaConnect {
|
||||||
|
org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required
|
||||||
|
file="/conf/kafka_connect.password";
|
||||||
|
};
|
1
documentation/compose/jaas/kafka_connect.password
Normal file
1
documentation/compose/jaas/kafka_connect.password
Normal file
|
@ -0,0 +1 @@
|
||||||
|
admin: admin-secret
|
150
documentation/compose/kafka-ui-connectors-auth.yaml
Normal file
150
documentation/compose/kafka-ui-connectors-auth.yaml
Normal file
|
@ -0,0 +1,150 @@
|
||||||
|
---
|
||||||
|
version: '2'
|
||||||
|
services:
|
||||||
|
|
||||||
|
kafka-ui:
|
||||||
|
container_name: kafka-ui
|
||||||
|
image: provectuslabs/kafka-ui:latest
|
||||||
|
ports:
|
||||||
|
- 8080:8080
|
||||||
|
depends_on:
|
||||||
|
- zookeeper0
|
||||||
|
- zookeeper1
|
||||||
|
- kafka0
|
||||||
|
- kafka1
|
||||||
|
- schemaregistry0
|
||||||
|
- kafka-connect0
|
||||||
|
environment:
|
||||||
|
KAFKA_CLUSTERS_0_NAME: local
|
||||||
|
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
|
||||||
|
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
|
||||||
|
KAFKA_CLUSTERS_0_JMXPORT: 9997
|
||||||
|
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
|
||||||
|
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
|
||||||
|
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
|
||||||
|
KAFKA_CLUSTERS_0_KAFKACONNECT_0_USERNAME: admin
|
||||||
|
KAFKA_CLUSTERS_0_KAFKACONNECT_0_PASSWORD: admin-secret
|
||||||
|
KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb:8088
|
||||||
|
|
||||||
|
zookeeper0:
|
||||||
|
image: confluentinc/cp-zookeeper:5.2.4
|
||||||
|
environment:
|
||||||
|
ZOOKEEPER_CLIENT_PORT: 2181
|
||||||
|
ZOOKEEPER_TICK_TIME: 2000
|
||||||
|
ports:
|
||||||
|
- 2181:2181
|
||||||
|
|
||||||
|
kafka0:
|
||||||
|
image: confluentinc/cp-kafka:5.3.1
|
||||||
|
depends_on:
|
||||||
|
- zookeeper0
|
||||||
|
ports:
|
||||||
|
- 9092:9092
|
||||||
|
- 9997:9997
|
||||||
|
environment:
|
||||||
|
KAFKA_BROKER_ID: 1
|
||||||
|
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
|
||||||
|
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
|
||||||
|
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||||
|
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||||
|
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||||
|
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||||
|
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
|
||||||
|
JMX_PORT: 9997
|
||||||
|
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
|
||||||
|
|
||||||
|
schemaregistry0:
|
||||||
|
image: confluentinc/cp-schema-registry:5.5.0
|
||||||
|
ports:
|
||||||
|
- 8085:8085
|
||||||
|
depends_on:
|
||||||
|
- zookeeper0
|
||||||
|
- kafka0
|
||||||
|
environment:
|
||||||
|
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
|
||||||
|
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181
|
||||||
|
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
|
||||||
|
SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
|
||||||
|
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
|
||||||
|
|
||||||
|
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
|
||||||
|
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
|
||||||
|
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
|
||||||
|
|
||||||
|
|
||||||
|
kafka-connect0:
|
||||||
|
build:
|
||||||
|
context: ./kafka-connect
|
||||||
|
args:
|
||||||
|
image: confluentinc/cp-kafka-connect:6.0.1
|
||||||
|
ports:
|
||||||
|
- 8083:8083
|
||||||
|
depends_on:
|
||||||
|
- kafka0
|
||||||
|
- schemaregistry0
|
||||||
|
volumes:
|
||||||
|
- ./jaas:/conf
|
||||||
|
environment:
|
||||||
|
CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
|
||||||
|
CONNECT_GROUP_ID: compose-connect-group
|
||||||
|
CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
|
||||||
|
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
|
||||||
|
CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
|
||||||
|
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
|
||||||
|
CONNECT_STATUS_STORAGE_TOPIC: _connect_status
|
||||||
|
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
|
||||||
|
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
|
||||||
|
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
|
||||||
|
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
|
||||||
|
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
|
||||||
|
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
|
||||||
|
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
|
||||||
|
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
|
||||||
|
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
|
||||||
|
CONNECT_REST_EXTENSION_CLASSES: "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension"
|
||||||
|
KAFKA_OPTS: "-Djava.security.auth.login.config=/conf/kafka_connect.jaas"
|
||||||
|
|
||||||
|
# AWS_ACCESS_KEY_ID: ""
|
||||||
|
# AWS_SECRET_ACCESS_KEY: ""
|
||||||
|
|
||||||
|
kafka-init-topics:
|
||||||
|
image: confluentinc/cp-kafka:5.3.1
|
||||||
|
volumes:
|
||||||
|
- ./message.json:/data/message.json
|
||||||
|
depends_on:
|
||||||
|
- kafka1
|
||||||
|
command: "bash -c 'echo Waiting for Kafka to be ready... && \
|
||||||
|
cub kafka-ready -b kafka1:29092 1 30 && \
|
||||||
|
kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
|
||||||
|
kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
|
||||||
|
kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
|
||||||
|
kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'"
|
||||||
|
|
||||||
|
create-connectors:
|
||||||
|
image: ellerbrock/alpine-bash-curl-ssl
|
||||||
|
depends_on:
|
||||||
|
- postgres-db
|
||||||
|
- kafka-connect0
|
||||||
|
volumes:
|
||||||
|
- ./connectors:/connectors
|
||||||
|
command: bash -c '/connectors/start.sh'
|
||||||
|
|
||||||
|
ksqldb:
|
||||||
|
image: confluentinc/ksqldb-server:0.18.0
|
||||||
|
depends_on:
|
||||||
|
- kafka0
|
||||||
|
- kafka-connect0
|
||||||
|
- schemaregistry0
|
||||||
|
ports:
|
||||||
|
- 8088:8088
|
||||||
|
environment:
|
||||||
|
KSQL_CUB_KAFKA_TIMEOUT: 120
|
||||||
|
KSQL_LISTENERS: http://0.0.0.0:8088
|
||||||
|
KSQL_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
|
||||||
|
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
|
||||||
|
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
|
||||||
|
KSQL_KSQL_CONNECT_URL: http://kafka-connect0:8083
|
||||||
|
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
|
||||||
|
KSQL_KSQL_SERVICE_ID: my_ksql_1
|
||||||
|
KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
|
||||||
|
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
|
|
@ -1,6 +1,7 @@
|
||||||
package com.provectus.kafka.ui.client;
|
package com.provectus.kafka.ui.client;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
|
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
|
||||||
|
import com.provectus.kafka.ui.model.KafkaConnectCluster;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
@ -12,7 +13,7 @@ public final class KafkaConnectClients {
|
||||||
|
|
||||||
private static final Map<String, KafkaConnectClientApi> CACHE = new ConcurrentHashMap<>();
|
private static final Map<String, KafkaConnectClientApi> CACHE = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public static KafkaConnectClientApi withBaseUrl(String basePath) {
|
public static KafkaConnectClientApi withKafkaConnectConfig(KafkaConnectCluster config) {
|
||||||
return CACHE.computeIfAbsent(basePath, RetryingKafkaConnectClient::new);
|
return CACHE.computeIfAbsent(config.getAddress(), s -> new RetryingKafkaConnectClient(config));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import com.provectus.kafka.ui.connect.model.Connector;
|
||||||
import com.provectus.kafka.ui.connect.model.NewConnector;
|
import com.provectus.kafka.ui.connect.model.NewConnector;
|
||||||
import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
|
import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
|
||||||
import com.provectus.kafka.ui.exception.ValidationException;
|
import com.provectus.kafka.ui.exception.ValidationException;
|
||||||
|
import com.provectus.kafka.ui.model.KafkaConnectCluster;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -26,8 +27,8 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
|
||||||
private static final int MAX_RETRIES = 5;
|
private static final int MAX_RETRIES = 5;
|
||||||
private static final Duration RETRIES_DELAY = Duration.ofMillis(200);
|
private static final Duration RETRIES_DELAY = Duration.ofMillis(200);
|
||||||
|
|
||||||
public RetryingKafkaConnectClient(String basePath) {
|
public RetryingKafkaConnectClient(KafkaConnectCluster config) {
|
||||||
super(new RetryingApiClient().setBasePath(basePath));
|
super(new RetryingApiClient(config));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Retry conflictCodeRetry() {
|
private static Retry conflictCodeRetry() {
|
||||||
|
@ -71,6 +72,14 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class RetryingApiClient extends ApiClient {
|
private static class RetryingApiClient extends ApiClient {
|
||||||
|
|
||||||
|
public RetryingApiClient(KafkaConnectCluster config) {
|
||||||
|
super();
|
||||||
|
setBasePath(config.getAddress());
|
||||||
|
setUsername(config.getUserName());
|
||||||
|
setPassword(config.getPassword());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
|
public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
|
||||||
MultiValueMap<String, String> queryParams, Object body,
|
MultiValueMap<String, String> queryParams, Object body,
|
||||||
|
|
|
@ -47,6 +47,8 @@ public class ClustersProperties {
|
||||||
public static class ConnectCluster {
|
public static class ConnectCluster {
|
||||||
String name;
|
String name;
|
||||||
String address;
|
String address;
|
||||||
|
String userName;
|
||||||
|
String password;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
|
|
|
@ -11,4 +11,6 @@ import lombok.Data;
|
||||||
public class KafkaConnectCluster {
|
public class KafkaConnectCluster {
|
||||||
private final String name;
|
private final String name;
|
||||||
private final String address;
|
private final String address;
|
||||||
|
private final String userName;
|
||||||
|
private final String password;
|
||||||
}
|
}
|
||||||
|
|
|
@ -326,9 +326,8 @@ public class KafkaConnectService {
|
||||||
private Mono<KafkaConnectClientApi> withConnectClient(KafkaCluster cluster, String connectName) {
|
private Mono<KafkaConnectClientApi> withConnectClient(KafkaCluster cluster, String connectName) {
|
||||||
return Mono.justOrEmpty(cluster.getKafkaConnect().stream()
|
return Mono.justOrEmpty(cluster.getKafkaConnect().stream()
|
||||||
.filter(connect -> connect.getName().equals(connectName))
|
.filter(connect -> connect.getName().equals(connectName))
|
||||||
.findFirst()
|
.findFirst())
|
||||||
.map(KafkaConnectCluster::getAddress))
|
|
||||||
.switchIfEmpty(Mono.error(ConnectNotFoundException::new))
|
.switchIfEmpty(Mono.error(ConnectNotFoundException::new))
|
||||||
.map(KafkaConnectClients::withBaseUrl);
|
.map(KafkaConnectClients::withKafkaConnectConfig);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,8 @@ public abstract class AbstractIntegrationTest {
|
||||||
System.setProperty("kafka.clusters.0.schemaRegistry", String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",
|
System.setProperty("kafka.clusters.0.schemaRegistry", String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",
|
||||||
SocketUtils.findAvailableTcpPort(), schemaRegistry.getUrl()));
|
SocketUtils.findAvailableTcpPort(), schemaRegistry.getUrl()));
|
||||||
System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect");
|
System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect");
|
||||||
|
System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
|
||||||
|
System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
|
||||||
System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
|
System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
|
||||||
|
|
||||||
System.setProperty("kafka.clusters.1.name", SECOND_LOCAL);
|
System.setProperty("kafka.clusters.1.name", SECOND_LOCAL);
|
||||||
|
|
|
@ -362,6 +362,10 @@ paths:
|
||||||
$ref: '#/components/schemas/ConnectorPluginConfigValidationResponse'
|
$ref: '#/components/schemas/ConnectorPluginConfigValidationResponse'
|
||||||
|
|
||||||
components:
|
components:
|
||||||
|
securitySchemes:
|
||||||
|
basicAuth:
|
||||||
|
type: http
|
||||||
|
scheme: basic
|
||||||
schemas:
|
schemas:
|
||||||
ConnectorConfig:
|
ConnectorConfig:
|
||||||
type: object
|
type: object
|
||||||
|
@ -555,3 +559,7 @@ components:
|
||||||
items:
|
items:
|
||||||
type: string
|
type: string
|
||||||
|
|
||||||
|
|
||||||
|
security:
|
||||||
|
- basicAuth: []
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue